OfferedDeadlineWatchdog.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "OfferedDeadlineWatchdog.h"
00011 #include "DataWriterImpl.h"
00012 #include "Qos_Helper.h"
00013 #include "ace/Recursive_Thread_Mutex.h"
00014 
00015 OpenDDS::DCPS::OfferedDeadlineWatchdog::OfferedDeadlineWatchdog(
00016   lock_type & lock,
00017   DDS::DeadlineQosPolicy qos,
00018   OpenDDS::DCPS::DataWriterImpl * writer_impl,
00019   DDS::DataWriter_ptr writer,
00020   DDS::OfferedDeadlineMissedStatus & status,
00021   CORBA::Long & last_total_count)
00022   : Watchdog(duration_to_time_value(qos.period))
00023   , status_lock_(lock)
00024   , reverse_status_lock_(status_lock_)
00025   , writer_impl_(writer_impl)
00026   , writer_(DDS::DataWriter::_duplicate(writer))
00027   , status_(status)
00028   , last_total_count_(last_total_count)
00029 {
00030 }
00031 
00032 OpenDDS::DCPS::OfferedDeadlineWatchdog::~OfferedDeadlineWatchdog()
00033 {
00034 }
00035 
00036 void
00037 OpenDDS::DCPS::OfferedDeadlineWatchdog::schedule_timer(OpenDDS::DCPS::PublicationInstance* instance)
00038 {
00039   if (instance->deadline_timer_id_ == -1) {
00040     instance->deadline_timer_id_ = Watchdog::schedule_timer((void*)instance, this->interval_);
00041   }
00042 }
00043 
00044 void
00045 OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(OpenDDS::DCPS::PublicationInstance* instance)
00046 {
00047   if (instance->deadline_timer_id_ != -1) {
00048     Watchdog::cancel_timer(instance->deadline_timer_id_);
00049     instance->deadline_timer_id_ = -1;
00050   }
00051 }
00052 
00053 void
00054 OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(void const * act, bool timer_called)
00055 {
00056   OpenDDS::DCPS::PublicationInstance * instance = (OpenDDS::DCPS::PublicationInstance *)act;
00057 
00058   if (instance->deadline_timer_id_ != -1) {
00059     bool missed = false;
00060 
00061     if (instance->cur_sample_tv_  == ACE_Time_Value::zero) { // not write any sample.
00062       missed = true;
00063 
00064     } else if (timer_called) { // handle_timeout is called
00065       ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00066       missed = diff >= this->interval_;
00067 
00068     } else if (instance->last_sample_tv_ != ACE_Time_Value::zero) { // upon writing sample.
00069       ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00070       missed = diff > this->interval_;
00071     }
00072 
00073     if (missed) {
00074       ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00075 
00076       if (timer_called) {
00077         ++this->status_.total_count;
00078         this->status_.total_count_change =
00079           this->status_.total_count - this->last_total_count_;
00080         this->status_.last_instance_handle = instance->instance_handle_;
00081 
00082         this->writer_impl_->set_status_changed_flag(
00083           DDS::OFFERED_DEADLINE_MISSED_STATUS, true);
00084 
00085         DDS::DataWriterListener_var listener =
00086           this->writer_impl_->listener_for(
00087             DDS::OFFERED_DEADLINE_MISSED_STATUS);
00088 
00089         if (! CORBA::is_nil(listener.in())) {
00090           // Copy before releasing the lock.
00091           DDS::OfferedDeadlineMissedStatus const status = this->status_;
00092 
00093           // Release the lock during the upcall.
00094           ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00095 
00096           // @todo Will this operation ever throw?  If so we may want to
00097           //       catch all exceptions, and act accordingly.
00098           listener->on_offered_deadline_missed(this->writer_.in(),
00099                                               status);
00100         }
00101 
00102         this->writer_impl_->notify_status_condition();
00103       }
00104     }
00105 
00106     // This next part is without status_lock_ held to avoid reactor deadlock.
00107     if (!timer_called) {
00108       this->cancel_timer(instance);
00109       this->schedule_timer(instance);
00110     }
00111 
00112   } else {
00113     // not an error - timer is scheduled asynchronously so we can get here
00114     // via WriteDataContainer::enqueue() before schedule_timer() is done
00115   }
00116 }
00117 
00118 void
00119 OpenDDS::DCPS::OfferedDeadlineWatchdog::reschedule_deadline()
00120 {
00121   this->writer_impl_->reschedule_deadline();
00122 }

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7