DataReaderListener
when the deadline period expires.
More...
#include <RequestedDeadlineWatchdog.h>
Inheritance diagram for OpenDDS::DCPS::RequestedDeadlineWatchdog:
Public Types | |
typedef ACE_Recursive_Thread_Mutex | lock_type |
typedef ACE_Reverse_Lock< lock_type > | reverse_lock_type |
Public Member Functions | |
RequestedDeadlineWatchdog (lock_type &lock, DDS::DeadlineQosPolicy qos, OpenDDS::DCPS::DataReaderImpl *reader_impl, DDS::DataReader_ptr reader, DDS::RequestedDeadlineMissedStatus &status, CORBA::Long &last_total_count) | |
Constructor. | |
virtual | ~RequestedDeadlineWatchdog () |
Destructor. | |
void | schedule_timer (OpenDDS::DCPS::SubscriptionInstance *instance) |
void | cancel_timer (OpenDDS::DCPS::SubscriptionInstance *instance) |
virtual void | execute (void const *act, bool timer_called) |
Operation to be executed when the associated timer expires. | |
virtual void | reschedule_deadline () |
Re-schedule timer for all instances of the DataReader. | |
Private Attributes | |
lock_type & | status_lock_ |
Lock for synchronization of status_ member. | |
reverse_lock_type | reverse_status_lock_ |
Reverse lock used for releasing the status_lock_ listener upcall. | |
OpenDDS::DCPS::DataReaderImpl *const | reader_impl_ |
DDS::DataReader_var | reader_ |
DDS::RequestedDeadlineMissedStatus & | status_ |
CORBA::Long & | last_total_count_ |
Last total_count when status was last checked. |
DataReaderListener
when the deadline period expires.
This watchdog object calls the on_requested_deadline_missed()
listener callback when the configured finite deadline period expires.
Definition at line 37 of file RequestedDeadlineWatchdog.h.
typedef ACE_Recursive_Thread_Mutex OpenDDS::DCPS::RequestedDeadlineWatchdog::lock_type |
Definition at line 40 of file RequestedDeadlineWatchdog.h.
typedef ACE_Reverse_Lock<lock_type> OpenDDS::DCPS::RequestedDeadlineWatchdog::reverse_lock_type |
Definition at line 41 of file RequestedDeadlineWatchdog.h.
OpenDDS::DCPS::RequestedDeadlineWatchdog::RequestedDeadlineWatchdog | ( | lock_type & | lock, | |
DDS::DeadlineQosPolicy | qos, | |||
OpenDDS::DCPS::DataReaderImpl * | reader_impl, | |||
DDS::DataReader_ptr | reader, | |||
DDS::RequestedDeadlineMissedStatus & | status, | |||
CORBA::Long & | last_total_count | |||
) |
Constructor.
Definition at line 17 of file RequestedDeadlineWatchdog.cpp.
00024 : Watchdog(duration_to_time_value(qos.period)) 00025 , status_lock_(lock) 00026 , reverse_status_lock_(status_lock_) 00027 , reader_impl_(reader_impl) 00028 , reader_(DDS::DataReader::_duplicate(reader)) 00029 , status_(status) 00030 , last_total_count_(last_total_count) 00031 { 00032 }
OpenDDS::DCPS::RequestedDeadlineWatchdog::~RequestedDeadlineWatchdog | ( | ) | [virtual] |
void OpenDDS::DCPS::RequestedDeadlineWatchdog::cancel_timer | ( | OpenDDS::DCPS::SubscriptionInstance * | instance | ) |
Definition at line 55 of file RequestedDeadlineWatchdog.cpp.
References OpenDDS::DCPS::Watchdog::cancel_timer(), OpenDDS::DCPS::DCPS_debug_level, and OpenDDS::DCPS::SubscriptionInstance::deadline_timer_id_.
Referenced by OpenDDS::DCPS::DataReaderImpl::cleanup(), OpenDDS::DCPS::DataReaderImpl::data_received(), and execute().
00057 { 00058 if (instance->deadline_timer_id_ != -1) { 00059 Watchdog::cancel_timer(instance->deadline_timer_id_); 00060 instance->deadline_timer_id_ = -1; 00061 if (DCPS_debug_level > 5) { 00062 ACE_DEBUG((LM_INFO, "Timer for instance %X cancelled \n", instance)); 00063 } 00064 } 00065 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::execute | ( | void const * | act, | |
bool | timer_called | |||
) | [virtual] |
Operation to be executed when the associated timer expires.
This Watchdog
object updates the DDS::RequestedDeadlineMissed
structure, and calls DataReaderListener::on_requested_deadline_missed()
.
Implements OpenDDS::DCPS::Watchdog.
Definition at line 68 of file RequestedDeadlineWatchdog.cpp.
References cancel_timer(), OpenDDS::DCPS::SubscriptionInstance::cur_sample_tv_, OpenDDS::DCPS::SubscriptionInstance::deadline_timer_id_, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::Watchdog::interval_, OpenDDS::DCPS::InstanceState::is_exclusive(), DDS::RequestedDeadlineMissedStatus::last_instance_handle, OpenDDS::DCPS::SubscriptionInstance::last_sample_tv_, last_total_count_, OpenDDS::DCPS::DataReaderImpl::listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::DataReaderImpl::owner_manager_, reader_impl_, OpenDDS::DCPS::OwnershipManager::remove_writers(), DDS::REQUESTED_DEADLINE_MISSED_STATUS, schedule_timer(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), status_, DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.
Referenced by OpenDDS::DCPS::DataReaderImpl::data_received().
00069 { 00070 SubscriptionInstance * instance = (SubscriptionInstance *)act; 00071 00072 if (instance->deadline_timer_id_ != -1) { 00073 bool missed = false; 00074 00075 if (instance->cur_sample_tv_ == ACE_Time_Value::zero) { // not received any sample. 00076 missed = true; 00077 00078 } else if (timer_called) { // handle_timeout is called 00079 ACE_Time_Value diff = ACE_OS::gettimeofday() - instance->cur_sample_tv_; 00080 missed = diff >= this->interval_; 00081 00082 } else { // upon receiving sample. 00083 ACE_Time_Value diff = instance->cur_sample_tv_ - instance->last_sample_tv_; 00084 missed = diff > this->interval_; 00085 } 00086 00087 if (missed) { 00088 ACE_GUARD(ACE_Recursive_Thread_Mutex, monitor, this->status_lock_); 00089 // Only update the status upon timer is called and not 00090 // when receiving a sample after the interval. 00091 // Otherwise the counter is doubled. 00092 if (timer_called) { 00093 ++this->status_.total_count; 00094 this->status_.total_count_change = 00095 this->status_.total_count - this->last_total_count_; 00096 this->status_.last_instance_handle = instance->instance_handle_; 00097 00098 this->reader_impl_->set_status_changed_flag( 00099 DDS::REQUESTED_DEADLINE_MISSED_STATUS, true); 00100 00101 DDS::DataReaderListener_var listener = 00102 this->reader_impl_->listener_for( 00103 DDS::REQUESTED_DEADLINE_MISSED_STATUS); 00104 00105 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE 00106 if (instance->instance_state_.is_exclusive()) { 00107 reader_impl_->owner_manager_->remove_writers (instance->instance_handle_); 00108 } 00109 #endif 00110 00111 if (!CORBA::is_nil(listener.in())) { 00112 // Copy before releasing the lock. 00113 DDS::RequestedDeadlineMissedStatus const status = this->status_; 00114 00115 // Release the lock during the upcall. 00116 ACE_GUARD(reverse_lock_type, reverse_monitor, this->reverse_status_lock_); 00117 // @todo Will this operation ever throw? If so we may want to 00118 // catch all exceptions, and act accordingly. 00119 listener->on_requested_deadline_missed(this->reader_.in(), 00120 status); 00121 } 00122 00123 this->reader_impl_->notify_status_condition(); 00124 } 00125 } 00126 00127 // This next part is without status_lock_ held to avoid reactor deadlock. 00128 if (!timer_called) { 00129 this->cancel_timer(instance); 00130 this->schedule_timer(instance); 00131 } 00132 00133 } else { 00134 // not an error - timer is scheduled asynchronously so we can get here 00135 // via DataReaderImpl::data_received() before schedule_timer() is done 00136 } 00137 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::reschedule_deadline | ( | ) | [virtual] |
Re-schedule timer for all instances of the DataReader.
Implements OpenDDS::DCPS::Watchdog.
Definition at line 140 of file RequestedDeadlineWatchdog.cpp.
References reader_impl_, and OpenDDS::DCPS::DataReaderImpl::reschedule_deadline().
00141 { 00142 this->reader_impl_->reschedule_deadline(); 00143 }
void OpenDDS::DCPS::RequestedDeadlineWatchdog::schedule_timer | ( | OpenDDS::DCPS::SubscriptionInstance * | instance | ) |
Definition at line 39 of file RequestedDeadlineWatchdog.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SubscriptionInstance::deadline_timer_id_, and OpenDDS::DCPS::Watchdog::schedule_timer().
Referenced by OpenDDS::DCPS::DataReaderImpl::data_received(), and execute().
00041 { 00042 if (instance->deadline_timer_id_ == -1) { 00043 instance->deadline_timer_id_ = Watchdog::schedule_timer((void*)instance, this->interval_); 00044 } 00045 if (instance->deadline_timer_id_ == -1) { 00046 ACE_ERROR((LM_ERROR, 00047 "ERROR Timer for instance %X should be scheduled, but is %d\n", 00048 instance, instance->deadline_timer_id_)); 00049 } else if (DCPS_debug_level > 5) { 00050 ACE_DEBUG((LM_INFO, "Timer for instance %X scheduled \n", instance)); 00051 } 00052 }
CORBA::Long& OpenDDS::DCPS::RequestedDeadlineWatchdog::last_total_count_ [private] |
Last total_count when status was last checked.
Definition at line 92 of file RequestedDeadlineWatchdog.h.
Referenced by execute().
DDS::DataReader_var OpenDDS::DCPS::RequestedDeadlineWatchdog::reader_ [private] |
Reference to DataReader passed to listener when the deadline expires.
Definition at line 85 of file RequestedDeadlineWatchdog.h.
OpenDDS::DCPS::DataReaderImpl* const OpenDDS::DCPS::RequestedDeadlineWatchdog::reader_impl_ [private] |
Pointer to the DataReaderImpl
object from which the DataReaderListener
is obtained.
Definition at line 81 of file RequestedDeadlineWatchdog.h.
Referenced by execute(), and reschedule_deadline().
Reverse lock used for releasing the status_lock_
listener upcall.
Definition at line 77 of file RequestedDeadlineWatchdog.h.
Reference to the missed requested deadline status structure.
Definition at line 89 of file RequestedDeadlineWatchdog.h.
Referenced by execute().
Lock for synchronization of status_
member.
Definition at line 75 of file RequestedDeadlineWatchdog.h.