RequestedDeadlineWatchdog.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "RequestedDeadlineWatchdog.h"
00011 #include "DataReaderImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014 
00015 #include "ace/Recursive_Thread_Mutex.h"
00016 
00017 OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog(
00018   lock_type & lock,
00019   DDS::DeadlineQosPolicy qos,
00020   OpenDDS::DCPS::DataReaderImpl * reader_impl,
00021   DDS::DataReader_ptr reader,
00022   DDS::RequestedDeadlineMissedStatus & status,
00023   CORBA::Long & last_total_count)
00024   : Watchdog(duration_to_time_value(qos.period))
00025   , status_lock_(lock)
00026   , reverse_status_lock_(status_lock_)
00027   , reader_impl_(reader_impl)
00028   , reader_(DDS::DataReader::_duplicate(reader))
00029   , status_(status)
00030   , last_total_count_(last_total_count)
00031 {
00032 }
00033 
00034 OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog()
00035 {
00036 }
00037 
00038 void
00039 OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(
00040   OpenDDS::DCPS::SubscriptionInstance* instance)
00041 {
00042   if (instance->deadline_timer_id_ == -1) {
00043     instance->deadline_timer_id_ = Watchdog::schedule_timer((void*)instance, this->interval_);
00044   }
00045   if (instance->deadline_timer_id_ == -1) {
00046     ACE_ERROR((LM_ERROR,
00047                "ERROR Timer for instance %X should be scheduled, but is %d\n",
00048                instance, instance->deadline_timer_id_));
00049   } else if (DCPS_debug_level > 5) {
00050     ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance));
00051   }
00052 }
00053 
00054 void
00055 OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(
00056   OpenDDS::DCPS::SubscriptionInstance* instance)
00057 {
00058   if (instance->deadline_timer_id_ != -1) {
00059     Watchdog::cancel_timer(instance->deadline_timer_id_);
00060     instance->deadline_timer_id_ = -1;
00061     if (DCPS_debug_level > 5) {
00062       ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance));
00063     }
00064   }
00065 }
00066 
00067 void
00068 OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(void const * act, bool timer_called)
00069 {
00070   SubscriptionInstance * instance = (SubscriptionInstance *)act;
00071 
00072   if (instance->deadline_timer_id_ != -1) {
00073     bool missed = false;
00074 
00075     if (instance->cur_sample_tv_  == ACE_Time_Value::zero) { // not received any sample.
00076       missed = true;
00077 
00078     } else if (timer_called) { // handle_timeout is called
00079       ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00080       missed = diff >= this->interval_;
00081 
00082     } else { // upon receiving sample.
00083       ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00084       missed = diff > this->interval_;
00085     }
00086 
00087     if (missed) {
00088       ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00089       // Only update the status upon timer is called and not
00090       // when receiving a sample after the interval.
00091       // Otherwise the counter is doubled.
00092       if (timer_called) {
00093         ++this->status_.total_count;
00094         this->status_.total_count_change =
00095           this->status_.total_count - this->last_total_count_;
00096         this->status_.last_instance_handle = instance->instance_handle_;
00097 
00098         this->reader_impl_->set_status_changed_flag(
00099           DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
00100 
00101         DDS::DataReaderListener_var listener =
00102           this->reader_impl_->listener_for(
00103             DDS::REQUESTED_DEADLINE_MISSED_STATUS);
00104 
00105 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00106         if (instance->instance_state_.is_exclusive()) {
00107           reader_impl_->owner_manager_->remove_writers (instance->instance_handle_);
00108         }
00109 #endif
00110 
00111         if (!CORBA::is_nil(listener.in())) {
00112           // Copy before releasing the lock.
00113           DDS::RequestedDeadlineMissedStatus const status = this->status_;
00114 
00115           // Release the lock during the upcall.
00116           ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00117           // @todo Will this operation ever throw?  If so we may want to
00118           //       catch all exceptions, and act accordingly.
00119           listener->on_requested_deadline_missed(this->reader_.in(),
00120                                                 status);
00121         }
00122 
00123         this->reader_impl_->notify_status_condition();
00124       }
00125     }
00126 
00127     // This next part is without status_lock_ held to avoid reactor deadlock.
00128     if (!timer_called) {
00129       this->cancel_timer(instance);
00130       this->schedule_timer(instance);
00131     }
00132 
00133   } else {
00134     // not an error - timer is scheduled asynchronously so we can get here
00135     // via DataReaderImpl::data_received() before schedule_timer() is done
00136   }
00137 }
00138 
00139 void
00140 OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline()
00141 {
00142   this->reader_impl_->reschedule_deadline();
00143 }

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7