RequestedDeadlineWatchdog.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009
00010 #include "RequestedDeadlineWatchdog.h"
00011 #include "DataReaderImpl.h"
00012 #include "DomainParticipantImpl.h"
00013 #include "Time_Helper.h"
00014
00015 #include "ace/Recursive_Thread_Mutex.h"
00016
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog(
00020 lock_type & lock,
00021 DDS::DeadlineQosPolicy qos,
00022 OpenDDS::DCPS::DataReaderImpl & reader_impl,
00023 DDS::RequestedDeadlineMissedStatus & status,
00024 CORBA::Long & last_total_count)
00025 : Watchdog(duration_to_time_value(qos.period))
00026 , status_lock_(lock)
00027 , reverse_status_lock_(status_lock_)
00028 , reader_impl_(reader_impl)
00029 , status_(status)
00030 , last_total_count_(last_total_count)
00031 {
00032 }
00033
00034 OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog()
00035 {
00036 }
00037
00038 void
00039 OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer(
00040 OpenDDS::DCPS::SubscriptionInstance_rch instance)
00041 {
00042 if (instance->deadline_timer_id_ == -1) {
00043 intptr_t handle = instance->instance_handle_;
00044 instance->deadline_timer_id_ = Watchdog::schedule_timer(reinterpret_cast<const void*>(handle), this->interval_);
00045 }
00046 if (instance->deadline_timer_id_ == -1) {
00047 ACE_ERROR((LM_ERROR,
00048 "ERROR Timer for instance %X should be scheduled, but is %d\n",
00049 instance.in(), instance->deadline_timer_id_));
00050 } else if (DCPS_debug_level > 5) {
00051 ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance.in()));
00052 }
00053 }
00054
00055 void
00056 OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer(
00057 OpenDDS::DCPS::SubscriptionInstance_rch instance)
00058 {
00059 if (instance->deadline_timer_id_ != -1) {
00060 Watchdog::cancel_timer(instance->deadline_timer_id_);
00061 instance->deadline_timer_id_ = -1;
00062 if (DCPS_debug_level > 5) {
00063 ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance.in()));
00064 }
00065 }
00066 }
00067
00068 int
00069 OpenDDS::DCPS::RequestedDeadlineWatchdog::handle_timeout(const ACE_Time_Value&, const void* act)
00070 {
00071 DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act));
00072 DataReaderImpl_rch reader = this->reader_impl_.lock();
00073 if (reader) {
00074 SubscriptionInstance_rch instance = reader->get_handle_instance(handle);
00075 if (instance)
00076 execute(instance, true);
00077 }
00078 else {
00079 this->reactor()->purge_pending_notifications(this);
00080 }
00081 return 0;
00082 }
00083
00084 void
00085 OpenDDS::DCPS::RequestedDeadlineWatchdog::execute(SubscriptionInstance_rch instance, bool timer_called)
00086 {
00087 if (instance->deadline_timer_id_ != -1) {
00088 bool missed = false;
00089
00090 if (instance->cur_sample_tv_ == ACE_Time_Value::zero) {
00091 missed = true;
00092
00093 } else if (timer_called) {
00094 ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_;
00095 missed = diff >= this->interval_;
00096
00097 } else {
00098 ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_;
00099 missed = diff > this->interval_;
00100 }
00101
00102 if (missed) {
00103 DataReaderImpl_rch reader = this->reader_impl_.lock();
00104 if (!reader)
00105 return;
00106
00107 ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_);
00108
00109
00110
00111 if (timer_called) {
00112 ++this->status_.total_count;
00113 this->status_.total_count_change =
00114 this->status_.total_count - this->last_total_count_;
00115 this->status_.last_instance_handle = instance->instance_handle_;
00116
00117 reader->set_status_changed_flag(
00118 DDS::REQUESTED_DEADLINE_MISSED_STATUS, true);
00119
00120 DDS::DataReaderListener_var listener =
00121 reader->listener_for(
00122 DDS::REQUESTED_DEADLINE_MISSED_STATUS);
00123
00124 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
00125 if (instance->instance_state_.is_exclusive()) {
00126 DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager();
00127 if (owner_manager)
00128 owner_manager->remove_writers (instance->instance_handle_);
00129 }
00130 #endif
00131
00132 if (!CORBA::is_nil(listener.in())) {
00133
00134 DDS::RequestedDeadlineMissedStatus const status = this->status_;
00135
00136
00137 ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_);
00138
00139
00140 listener->on_requested_deadline_missed(reader.in(),
00141 status);
00142 }
00143
00144 reader->notify_status_condition();
00145 }
00146 }
00147
00148
00149 if (!timer_called) {
00150 this->cancel_timer(instance);
00151 this->schedule_timer(instance);
00152 }
00153
00154 } else {
00155
00156
00157 }
00158 }
00159
00160 void
00161 OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline()
00162 {
00163 DataReaderImpl_rch reader = this->reader_impl_.lock();
00164 if (reader)
00165 reader->reschedule_deadline();
00166 }
00167
00168 OPENDDS_END_VERSIONED_NAMESPACE_DECL