Watchdog responsible calling the DataReaderListener
when the deadline period expires.
More...
#include <RequestedDeadlineWatchdog.h>
Public Types | |
typedef ACE_Recursive_Thread_Mutex | lock_type |
typedef ACE_Reverse_Lock < lock_type > | reverse_lock_type |
Public Member Functions | |
RequestedDeadlineWatchdog (lock_type &lock, DDS::DeadlineQosPolicy qos, DataReaderImpl &reader_impl, DDS::RequestedDeadlineMissedStatus &status, CORBA::Long &last_total_count) | |
virtual | ~RequestedDeadlineWatchdog () |
void | schedule_timer (OpenDDS::DCPS::SubscriptionInstance_rch instance) |
Schedule timer for the supplied instance. | |
void | cancel_timer (OpenDDS::DCPS::SubscriptionInstance_rch instance) |
Cancel timer for the supplied instance. | |
virtual int | handle_timeout (const ACE_Time_Value &, const void *act) |
void | execute (OpenDDS::DCPS::SubscriptionInstance_rch, bool timer_called) |
Operation to be executed when the associated timer expires. | |
virtual void | reschedule_deadline () |
Re-schedule timer for all instances of the DataReader. | |
Private Attributes | |
lock_type & | status_lock_ |
Lock for synchronization of status_ member. | |
reverse_lock_type | reverse_status_lock_ |
Reverse lock used for releasing the status_lock_ listener upcall. | |
WeakRcHandle< DataReaderImpl > | reader_impl_ |
DDS::RequestedDeadlineMissedStatus & | status_ |
CORBA::Long & | last_total_count_ |
Last total_count when status was last checked. |
Watchdog responsible calling the DataReaderListener
when the deadline period expires.
This watchdog object calls the on_requested_deadline_missed()
listener callback when the configured finite deadline period expires.
Definition at line 39 of file RequestedDeadlineWatchdog.h.
Definition at line 42 of file RequestedDeadlineWatchdog.h.
Definition at line 43 of file RequestedDeadlineWatchdog.h.
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog | ( | lock_type & | lock, | |
DDS::DeadlineQosPolicy | qos, | |||
DataReaderImpl & | reader_impl, | |||
DDS::RequestedDeadlineMissedStatus & | status, | |||
CORBA::Long & | last_total_count | |||
) |
Definition at line 19 of file RequestedDeadlineWatchdog.cpp.
00025 : Watchdog(duration_to_time_value(qos.period)) 00026 , status_lock_(lock) 00027 , reverse_status_lock_(status_lock_) 00028 , reader_impl_(reader_impl) 00029 , status_(status) 00030 , last_total_count_(last_total_count) 00031 { 00032 }
OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog | ( | ) | [virtual] |
Definition at line 34 of file RequestedDeadlineWatchdog.cpp.
void OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer | ( | OpenDDS::DCPS::SubscriptionInstance_rch | instance | ) |
Cancel timer for the supplied instance.
Definition at line 56 of file RequestedDeadlineWatchdog.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), and LM_INFO.
Referenced by execute().
00058 { 00059 if (instance->deadline_timer_id_ != -1) { 00060 Watchdog::cancel_timer(instance->deadline_timer_id_); 00061 instance->deadline_timer_id_ = -1; 00062 if (DCPS_debug_level > 5) { 00063 ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance.in())); 00064 } 00065 } 00066 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::execute | ( | OpenDDS::DCPS::SubscriptionInstance_rch | instance, | |
bool | timer_called | |||
) |
Operation to be executed when the associated timer expires.
This Watchdog
object updates the DDS::RequestedDeadlineMissed
structure, and calls DataReaderListener::on_requested_deadline_missed()
.
Definition at line 85 of file RequestedDeadlineWatchdog.cpp.
References cancel_timer(), ACE_OS::gettimeofday(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::Watchdog::interval_, CORBA::is_nil(), DDS::RequestedDeadlineMissedStatus::last_instance_handle, last_total_count_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), reader_impl_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, reverse_status_lock_, schedule_timer(), status, status_, status_lock_, DDS::RequestedDeadlineMissedStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count_change, and ACE_Time_Value::zero.
Referenced by handle_timeout().
00086 { 00087 if (instance->deadline_timer_id_ != -1) { 00088 bool missed = false; 00089 00090 if (instance->cur_sample_tv_ == ACE_Time_Value::zero) { // not received any sample. 00091 missed = true; 00092 00093 } else if (timer_called) { // handle_timeout is called 00094 ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_; 00095 missed = diff >= this->interval_; 00096 00097 } else { // upon receiving sample. 00098 ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_; 00099 missed = diff > this->interval_; 00100 } 00101 00102 if (missed) { 00103 DataReaderImpl_rch reader = this->reader_impl_.lock(); 00104 if (!reader) 00105 return; 00106 00107 ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_); 00108 // Only update the status upon timer is called and not 00109 // when receiving a sample after the interval. 00110 // Otherwise the counter is doubled. 00111 if (timer_called) { 00112 ++this->status_.total_count; 00113 this->status_.total_count_change = 00114 this->status_.total_count - this->last_total_count_; 00115 this->status_.last_instance_handle = instance->instance_handle_; 00116 00117 reader->set_status_changed_flag( 00118 DDS::REQUESTED_DEADLINE_MISSED_STATUS, true); 00119 00120 DDS::DataReaderListener_var listener = 00121 reader->listener_for( 00122 DDS::REQUESTED_DEADLINE_MISSED_STATUS); 00123 00124 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00125 if (instance->instance_state_.is_exclusive()) { 00126 DataReaderImpl::OwnershipManagerPtr owner_manager = reader->ownership_manager(); 00127 if (owner_manager) 00128 owner_manager->remove_writers (instance->instance_handle_); 00129 } 00130 #endif 00131 00132 if (!CORBA::is_nil(listener.in())) { 00133 // Copy before releasing the lock. 00134 DDS::RequestedDeadlineMissedStatus const status = this->status_; 00135 00136 // Release the lock during the upcall. 00137 ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_); 00138 // @todo Will this operation ever throw? If so we may want to 00139 // catch all exceptions, and act accordingly. 00140 listener->on_requested_deadline_missed(reader.in(), 00141 status); 00142 } 00143 00144 reader->notify_status_condition(); 00145 } 00146 } 00147 00148 // This next part is without status_lock_ held to avoid reactor deadlock. 00149 if (!timer_called) { 00150 this->cancel_timer(instance); 00151 this->schedule_timer(instance); 00152 } 00153 00154 } else { 00155 // not an error - timer is scheduled asynchronously so we can get here 00156 // via DataReaderImpl::data_received() before schedule_timer() is done 00157 } 00158 }
int OpenDDS::DCPS::RequestedDeadlineWatchdog::handle_timeout | ( | const ACE_Time_Value & | , | |
const void * | act | |||
) | [virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 69 of file RequestedDeadlineWatchdog.cpp.
References execute(), OpenDDS::DCPS::WeakRcHandle< T >::lock(), ACE_Reactor::purge_pending_notifications(), ACE_Event_Handler::reactor(), and reader_impl_.
00070 { 00071 DDS::InstanceHandle_t handle = static_cast<DDS::InstanceHandle_t>(reinterpret_cast<intptr_t>(act)); 00072 DataReaderImpl_rch reader = this->reader_impl_.lock(); 00073 if (reader) { 00074 SubscriptionInstance_rch instance = reader->get_handle_instance(handle); 00075 if (instance) 00076 execute(instance, true); 00077 } 00078 else { 00079 this->reactor()->purge_pending_notifications(this); 00080 } 00081 return 0; 00082 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline | ( | ) | [virtual] |
Re-schedule timer for all instances of the DataReader.
Implements OpenDDS::DCPS::Watchdog.
Definition at line 161 of file RequestedDeadlineWatchdog.cpp.
References OpenDDS::DCPS::WeakRcHandle< T >::lock(), and reader_impl_.
00162 { 00163 DataReaderImpl_rch reader = this->reader_impl_.lock(); 00164 if (reader) 00165 reader->reschedule_deadline(); 00166 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer | ( | OpenDDS::DCPS::SubscriptionInstance_rch | instance | ) |
Schedule timer for the supplied instance.
Definition at line 39 of file RequestedDeadlineWatchdog.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::Watchdog::interval_, LM_ERROR, and LM_INFO.
Referenced by execute().
00041 { 00042 if (instance->deadline_timer_id_ == -1) { 00043 intptr_t handle = instance->instance_handle_; 00044 instance->deadline_timer_id_ = Watchdog::schedule_timer(reinterpret_cast<const void*>(handle), this->interval_); 00045 } 00046 if (instance->deadline_timer_id_ == -1) { 00047 ACE_ERROR((LM_ERROR, 00048 "ERROR Timer for instance %X should be scheduled, but is %d\n", 00049 instance.in(), instance->deadline_timer_id_)); 00050 } else if (DCPS_debug_level > 5) { 00051 ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance.in())); 00052 } 00053 }
Last total_count when status was last checked.
Definition at line 89 of file RequestedDeadlineWatchdog.h.
Referenced by execute().
Pointer to the DataReaderImpl
object from which the DataReaderListener
is obtained.
Definition at line 82 of file RequestedDeadlineWatchdog.h.
Referenced by execute(), handle_timeout(), and reschedule_deadline().
Reverse lock used for releasing the status_lock_
listener upcall.
Definition at line 78 of file RequestedDeadlineWatchdog.h.
Referenced by execute().
Reference to the missed requested deadline status structure.
Definition at line 86 of file RequestedDeadlineWatchdog.h.
Referenced by execute().
Lock for synchronization of status_
member.
Definition at line 76 of file RequestedDeadlineWatchdog.h.
Referenced by execute().