00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "OfferedDeadlineWatchdog.h"
00011 #include "DataWriterImpl.h"
00012 #include "Qos_Helper.h"
00013 #include "ace/Recursive_Thread_Mutex.h"
00014
00015 OpenDDS::DCPS::OfferedDeadlineWatchdog::OfferedDeadlineWatchdog(
00016 lock_type & lock,
00017 DDS::DeadlineQosPolicy qos,
00018 OpenDDS::DCPS::DataWriterImpl * writer_impl,
00019 DDS::DataWriter_ptr writer,
00020 DDS::OfferedDeadlineMissedStatus & status,
00021 CORBA::Long & last_total_count)
00022 : Watchdog(duration_to_time_value(qos.period))
00023 , status_lock_(lock)
00024 , reverse_status_lock_(status_lock_)
00025 , writer_impl_(writer_impl)
00026 , writer_(DDS::DataWriter::_duplicate(writer))
00027 , status_(status)
00028 , last_total_count_(last_total_count)
00029 {
00030 }
00031
00032 OpenDDS::DCPS::OfferedDeadlineWatchdog::~OfferedDeadlineWatchdog()
00033 {
00034 }
00035
00036 void
00037 OpenDDS::DCPS::OfferedDeadlineWatchdog::schedule_timer(OpenDDS::DCPS::PublicationInstance* instance)
00038 {
00039 if (instance->deadline_timer_id_ == -1) {
00040 instance->deadline_timer_id_ = Watchdog::schedule_timer((void*)instance, this->interval_);
00041 }
00042 }
00043
00044 void
00045 OpenDDS::DCPS::OfferedDeadlineWatchdog::cancel_timer(OpenDDS::DCPS::PublicationInstance* instance)
00046 {
00047 if (instance->deadline_timer_id_ != -1) {
00048 Watchdog::cancel_timer(instance->deadline_timer_id_);
00049 instance->deadline_timer_id_ = -1;
00050 }
00051 }
00052
00053 void
00054 OpenDDS::DCPS::OfferedDeadlineWatchdog::execute(void const * act, bool timer_called)
00055 {
00056 OpenDDS::DCPS::PublicationInstance * instance = (OpenDDS::DCPS::PublicationInstance *)act;
00057
00058 if (instance->deadline_timer_id_ != -1) {
00059 bool missed = false;
00060
00061 if (instance->cur_sample_tv_ == ACE_Time_Value::zero) {
00062 missed = true;
00063
00064 } else if (timer_called) {
00065 ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00066 missed = diff >= this->interval_;
00067
00068 } else if (instance->last_sample_tv_ != ACE_Time_Value::zero) {
00069 ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00070 missed = diff > this->interval_;
00071 }
00072
00073 if (missed) {
00074 ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00075
00076 if (timer_called) {
00077 ++this->status_.total_count;
00078 this->status_.total_count_change =
00079 this->status_.total_count - this->last_total_count_;
00080 this->status_.last_instance_handle = instance->instance_handle_;
00081
00082 this->writer_impl_->set_status_changed_flag(
00083 DDS::OFFERED_DEADLINE_MISSED_STATUS, true);
00084
00085 DDS::DataWriterListener_var listener =
00086 this->writer_impl_->listener_for(
00087 DDS::OFFERED_DEADLINE_MISSED_STATUS);
00088
00089 if (! CORBA::is_nil(listener.in())) {
00090
00091 DDS::OfferedDeadlineMissedStatus const status = this->status_;
00092
00093
00094 ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00095
00096
00097
00098 listener->on_offered_deadline_missed(this->writer_.in(),
00099 status);
00100 }
00101
00102 this->writer_impl_->notify_status_condition();
00103 }
00104 }
00105
00106
00107 if (!timer_called) {
00108 this->cancel_timer(instance);
00109 this->schedule_timer(instance);
00110 }
00111
00112 } else {
00113
00114
00115 }
00116 }
00117
00118 void
00119 OpenDDS::DCPS::OfferedDeadlineWatchdog::reschedule_deadline()
00120 {
00121 this->writer_impl_->reschedule_deadline();
00122 }