OfferedDeadlineWatchdog.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "OfferedDeadlineWatchdog.h"
00011 #include "DataWriterImpl.h"
00012 #include "Time_Helper.h"
00013 #include "ace/Recursive_Thread_Mutex.h"
00014 
00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00016 
00017 OpenDDS::DCPS::OfferedDeadlineWatchdog::OfferedDeadlineWatchdog(
00018   lock_type & lock,
00019   DDS::DeadlineQosPolicy qos,
00020   OpenDDS::DCPS::DataWriterImpl & writer_impl,
00021   DDS::OfferedDeadlineMissedStatus & status,
00022   CORBA::Long & last_total_count)
00023   : Watchdog(duration_to_time_value(qos.period))
00024   , status_lock_(lock)
00025   , reverse_status_lock_(status_lock_)
00026   , writer_impl_(writer_impl)
00027   , status_(status)
00028   , last_total_count_(last_total_count)
00029 {
00030 }
00031 
00032 OpenDDS::DCPS::OfferedDeadlineWatchdog::~OfferedDeadlineWatchdog()
00033 {
00034 }
00035 
00036 void
00037 OpenDDS::DCPS::OfferedDeadlineWatchdog::schedule_timer(OpenDDS::DCPS::PublicationInstance_rch instance)
00038 {
00039   if (instance->deadline_timer_id_ == -1) {
00040     intptr_t handle = instance->instance_handle_;
00041     instance->deadline_timer_id_ = Watchdog::schedule_timer(reinterpret_cast<const void*>(handle), this->interval_);
00042   }
00043 }
00044 
00045 void
00046 OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(OpenDDS::DCPS::PublicationInstance_rch instance)
00047 {
00048   if (instance->deadline_timer_id_ != -1) {
00049     Watchdog::cancel_timer(instance->deadline_timer_id_);
00050     instance->deadline_timer_id_ = -1;
00051   }
00052 }
00053 
00054 int
00055 OpenDDS::DCPS::OfferedDeadlineWatchdog::handle_timeout(const ACE_Time_Value&, const void* act)
00056 {
00057   DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
00058   RcHandle<DataWriterImpl> writer = writer_impl_.lock();
00059   if (writer) {
00060     OpenDDS::DCPS::PublicationInstance_rch instance =
00061       writer->get_handle_instance(handle);
00062     if (instance)
00063       execute(*writer, instance, true);
00064   }
00065   return 0;
00066 }
00067 
00068 
00069 void
00070 OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(
00071   DataWriterImpl& writer,
00072   PublicationInstance_rch instance,
00073   bool timer_called)
00074 {
00075   if (instance->deadline_timer_id_ != -1) {
00076     bool missed = false;
00077 
00078     if (instance->cur_sample_tv_  == ACE_Time_Value::zero) { // not write any sample.
00079       missed = true;
00080 
00081     } else if (timer_called) { // handle_timeout is called
00082       ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00083       missed = diff >= this->interval_;
00084 
00085     } else if (instance->last_sample_tv_ != ACE_Time_Value::zero) { // upon writing sample.
00086       ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00087       missed = diff > this->interval_;
00088     }
00089 
00090     if (missed) {
00091       ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00092 
00093       if (timer_called) {
00094         ++this->status_.total_count;
00095         this->status_.total_count_change =
00096           this->status_.total_count - this->last_total_count_;
00097         this->status_.last_instance_handle = instance->instance_handle_;
00098 
00099         writer.set_status_changed_flag(
00100           DDS::OFFERED_DEADLINE_MISSED_STATUS, true);
00101 
00102         DDS::DataWriterListener_var listener =
00103           writer.listener_for(
00104             DDS::OFFERED_DEADLINE_MISSED_STATUS);
00105 
00106         if (! CORBA::is_nil(listener.in())) {
00107           // Copy before releasing the lock.
00108           DDS::OfferedDeadlineMissedStatus const status = this->status_;
00109 
00110           // Release the lock during the upcall.
00111           ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00112 
00113           // @todo Will this operation ever throw?  If so we may want to
00114           //       catch all exceptions, and act accordingly.
00115           listener->on_offered_deadline_missed(&writer,
00116                                               status);
00117         }
00118 
00119         writer.notify_status_condition();
00120       }
00121     }
00122 
00123     // This next part is without status_lock_ held to avoid reactor deadlock.
00124     if (!timer_called) {
00125       this->cancel_timer(instance);
00126       this->schedule_timer(instance);
00127     }
00128 
00129   } else {
00130     // not an error - timer is scheduled asynchronously so we can get here
00131     // via WriteDataContainer::enqueue() before schedule_timer() is done
00132   }
00133 }
00134 
00135 void
00136 OpenDDS::DCPS::OfferedDeadlineWatchdog::reschedule_deadline()
00137 {
00138   RcHandle<DataWriterImpl> writer = writer_impl_.lock();
00139   if (writer)
00140     writer->reschedule_deadline();
00141 }
00142 
00143 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1