RequestedDeadlineWatchdog.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "RequestedDeadlineWatchdog.h"
00011 #include "DataReaderImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Time_Helper.h"
00014 
00015 #include "ace/Recursive_Thread_Mutex.h"
00016 
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018 
00019 OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog(
00020   lock_type & lock,
00021   DDS::DeadlineQosPolicy qos,
00022   OpenDDS::DCPS::DataReaderImpl & reader_impl,
00023   DDS::RequestedDeadlineMissedStatus & status,
00024   CORBA::Long & last_total_count)
00025   : Watchdog(duration_to_time_value(qos.period))
00026   , status_lock_(lock)
00027   , reverse_status_lock_(status_lock_)
00028   , reader_impl_(reader_impl)
00029   , status_(status)
00030   , last_total_count_(last_total_count)
00031 {
00032 }
00033 
00034 OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog()
00035 {
00036 }
00037 
00038 void
00039 OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(
00040   OpenDDS::DCPS::SubscriptionInstance_rch instance)
00041 {
00042   if (instance->deadline_timer_id_ == -1) {
00043     intptr_t handle = instance->instance_handle_;
00044     instance->deadline_timer_id_ = Watchdog::schedule_timer(reinterpret_cast<const void*>(handle), this->interval_);
00045   }
00046   if (instance->deadline_timer_id_ == -1) {
00047     ACE_ERROR((LM_ERROR,
00048                "ERROR Timer for instance %X should be scheduled, but is %d\n",
00049                instance.in(), instance->deadline_timer_id_));
00050   } else if (DCPS_debug_level > 5) {
00051     ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance.in()));
00052   }
00053 }
00054 
00055 void
00056 OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(
00057   OpenDDS::DCPS::SubscriptionInstance_rch instance)
00058 {
00059   if (instance->deadline_timer_id_ != -1) {
00060     Watchdog::cancel_timer(instance->deadline_timer_id_);
00061     instance->deadline_timer_id_ = -1;
00062     if (DCPS_debug_level > 5) {
00063       ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance.in()));
00064     }
00065   }
00066 }
00067 
00068 int
00069 OpenDDS::DCPS::RequestedDeadlineWatchdog::handle_timeout(const ACE_Time_Value&, const void* act)
00070 {
00071   DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
00072   DataReaderImpl_rch reader = this->reader_impl_.lock();
00073   if (reader) {
00074     SubscriptionInstance_rch instance = reader->get_handle_instance(handle);
00075     if (instance)
00076       execute(instance, true);
00077   }
00078   else {
00079     this->reactor()->purge_pending_notifications(this);
00080   }
00081   return 0;
00082 }
00083 
00084 void
00085 OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(SubscriptionInstance_rch instance, bool timer_called)
00086 {
00087   if (instance->deadline_timer_id_ != -1) {
00088     bool missed = false;
00089 
00090     if (instance->cur_sample_tv_  == ACE_Time_Value::zero) { // not received any sample.
00091       missed = true;
00092 
00093     } else if (timer_called) { // handle_timeout is called
00094       ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00095       missed = diff >= this->interval_;
00096 
00097     } else { // upon receiving sample.
00098       ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00099       missed = diff > this->interval_;
00100     }
00101 
00102     if (missed) {
00103       DataReaderImpl_rch reader = this->reader_impl_.lock();
00104       if (!reader)
00105         return;
00106 
00107       ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00108       // Only update the status upon timer is called and not
00109       // when receiving a sample after the interval.
00110       // Otherwise the counter is doubled.
00111       if (timer_called) {
00112         ++this->status_.total_count;
00113         this->status_.total_count_change =
00114           this->status_.total_count - this->last_total_count_;
00115         this->status_.last_instance_handle = instance->instance_handle_;
00116 
00117         reader->set_status_changed_flag(
00118           DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
00119 
00120         DDS::DataReaderListener_var listener =
00121           reader->listener_for(
00122             DDS::REQUESTED_DEADLINE_MISSED_STATUS);
00123 
00124 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00125         if (instance->instance_state_.is_exclusive()) {
00126           DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
00127           if (owner_manager)
00128             owner_manager->remove_writers (instance->instance_handle_);
00129         }
00130 #endif
00131 
00132         if (!CORBA::is_nil(listener.in())) {
00133           // Copy before releasing the lock.
00134           DDS::RequestedDeadlineMissedStatus const status = this->status_;
00135 
00136           // Release the lock during the upcall.
00137           ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00138           // @todo Will this operation ever throw?  If so we may want to
00139           //       catch all exceptions, and act accordingly.
00140           listener->on_requested_deadline_missed(reader.in(),
00141                                                 status);
00142         }
00143 
00144         reader->notify_status_condition();
00145       }
00146     }
00147 
00148     // This next part is without status_lock_ held to avoid reactor deadlock.
00149     if (!timer_called) {
00150       this->cancel_timer(instance);
00151       this->schedule_timer(instance);
00152     }
00153 
00154   } else {
00155     // not an error - timer is scheduled asynchronously so we can get here
00156     // via DataReaderImpl::data_received() before schedule_timer() is done
00157   }
00158 }
00159 
00160 void
00161 OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline()
00162 {
00163   DataReaderImpl_rch reader = this->reader_impl_.lock();
00164   if (reader)
00165     reader->reschedule_deadline();
00166 }
00167 
00168 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1