00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "RequestedDeadlineWatchdog.h"
00011 #include "DataReaderImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Qos_Helper.h"
00014
00015 #include "ace/Recursive_Thread_Mutex.h"
00016
00017 OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog(
00018 lock_type & lock,
00019 DDS::DeadlineQosPolicy qos,
00020 OpenDDS::DCPS::DataReaderImpl * reader_impl,
00021 DDS::DataReader_ptr reader,
00022 DDS::RequestedDeadlineMissedStatus & status,
00023 CORBA::Long & last_total_count)
00024 : Watchdog(duration_to_time_value(qos.period))
00025 , status_lock_(lock)
00026 , reverse_status_lock_(status_lock_)
00027 , reader_impl_(reader_impl)
00028 , reader_(DDS::DataReader::_duplicate(reader))
00029 , status_(status)
00030 , last_total_count_(last_total_count)
00031 {
00032 }
00033
00034 OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog()
00035 {
00036 }
00037
00038 void
00039 OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(
00040 OpenDDS::DCPS::SubscriptionInstance* instance)
00041 {
00042 if (instance->deadline_timer_id_ == -1) {
00043 instance->deadline_timer_id_ = Watchdog::schedule_timer((void*)instance, this->interval_);
00044 }
00045 if (instance->deadline_timer_id_ == -1) {
00046 ACE_ERROR((LM_ERROR,
00047 "ERROR Timer for instance %X should be scheduled, but is %d\n",
00048 instance, instance->deadline_timer_id_));
00049 } else if (DCPS_debug_level > 5) {
00050 ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance));
00051 }
00052 }
00053
00054 void
00055 OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(
00056 OpenDDS::DCPS::SubscriptionInstance* instance)
00057 {
00058 if (instance->deadline_timer_id_ != -1) {
00059 Watchdog::cancel_timer(instance->deadline_timer_id_);
00060 instance->deadline_timer_id_ = -1;
00061 if (DCPS_debug_level > 5) {
00062 ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance));
00063 }
00064 }
00065 }
00066
00067 void
00068 OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(void const * act, bool timer_called)
00069 {
00070 SubscriptionInstance * instance = (SubscriptionInstance *)act;
00071
00072 if (instance->deadline_timer_id_ != -1) {
00073 bool missed = false;
00074
00075 if (instance->cur_sample_tv_ == ACE_Time_Value::zero) {
00076 missed = true;
00077
00078 } else if (timer_called) {
00079 ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00080 missed = diff >= this->interval_;
00081
00082 } else {
00083 ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00084 missed = diff > this->interval_;
00085 }
00086
00087 if (missed) {
00088 ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00089
00090
00091
00092 if (timer_called) {
00093 ++this->status_.total_count;
00094 this->status_.total_count_change =
00095 this->status_.total_count - this->last_total_count_;
00096 this->status_.last_instance_handle = instance->instance_handle_;
00097
00098 this->reader_impl_->set_status_changed_flag(
00099 DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
00100
00101 DDS::DataReaderListener_var listener =
00102 this->reader_impl_->listener_for(
00103 DDS::REQUESTED_DEADLINE_MISSED_STATUS);
00104
00105 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00106 if (instance->instance_state_.is_exclusive()) {
00107 reader_impl_->owner_manager_->remove_writers (instance->instance_handle_);
00108 }
00109 #endif
00110
00111 if (!CORBA::is_nil(listener.in())) {
00112
00113 DDS::RequestedDeadlineMissedStatus const status = this->status_;
00114
00115
00116 ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00117
00118
00119 listener->on_requested_deadline_missed(this->reader_.in(),
00120 status);
00121 }
00122
00123 this->reader_impl_->notify_status_condition();
00124 }
00125 }
00126
00127
00128 if (!timer_called) {
00129 this->cancel_timer(instance);
00130 this->schedule_timer(instance);
00131 }
00132
00133 } else {
00134
00135
00136 }
00137 }
00138
00139 void
00140 OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline()
00141 {
00142 this->reader_impl_->reschedule_deadline();
00143 }