Classes | |
class | CancelCommand |
class | CheckLivelinessCommand |
class | CommandBase |
Public Member Functions | |
LivelinessTimer (ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader) | |
void | check_liveliness () |
void | cancel_timer () |
virtual bool | reactor_is_shut_down () const |
Private Member Functions | |
~LivelinessTimer () | |
void | check_liveliness_i (bool cancel, const ACE_Time_Value ¤t_time) |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *arg) |
Private Attributes | |
WeakRcHandle< DataReaderImpl > | data_reader_ |
long | liveliness_timer_id_ |
liveliness timer id; -1 if no timer is set |
Definition at line 746 of file DataReaderImpl.h.
OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::LivelinessTimer | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
DataReaderImpl * | data_reader | |||
) | [inline] |
Definition at line 748 of file DataReaderImpl.h.
00751 : ReactorInterceptor(reactor, owner) 00752 , data_reader_(*data_reader) 00753 , liveliness_timer_id_(-1) 00754 { }
OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::~LivelinessTimer | ( | ) | [inline, private] |
Definition at line 774 of file DataReaderImpl.h.
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer | ( | void | ) | [inline] |
Definition at line 762 of file DataReaderImpl.h.
References c.
Referenced by check_liveliness_i().
00763 { 00764 CancelCommand c(this); 00765 execute_or_enqueue(c); 00766 }
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness | ( | ) | [inline] |
Definition at line 756 of file DataReaderImpl.h.
References c.
00757 { 00758 CheckLivelinessCommand c(this); 00759 execute_or_enqueue(c); 00760 }
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i | ( | bool | cancel, | |
const ACE_Time_Value & | current_time | |||
) | [private] |
Definition at line 1851 of file DataReaderImpl.cpp.
References ACE_TEXT(), cancel_timer(), data_reader_, OpenDDS::DCPS::DCPS_debug_level, liveliness_timer_id_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), ACE_Time_Value::max_time, OPENDDS_STRING, ACE_Reactor::purge_pending_notifications(), ACE_Event_Handler::reactor(), and ACE_Reactor::schedule_timer().
Referenced by handle_timeout().
01853 { 01854 // Working copy of the active timer Id. 01855 01856 RcHandle<DataReaderImpl> data_reader = data_reader_.lock(); 01857 if (! data_reader) { 01858 this->reactor()->purge_pending_notifications(this); 01859 return; 01860 } 01861 01862 long local_timer_id = liveliness_timer_id_; 01863 bool timer_was_reset = false; 01864 01865 if (local_timer_id != -1 && cancel) { 01866 if (DCPS_debug_level >= 5) { 01867 GuidConverter converter(data_reader->get_subscription_id()); 01868 ACE_DEBUG((LM_DEBUG, 01869 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ") 01870 ACE_TEXT(" canceling timer for reader %C.\n"), 01871 OPENDDS_STRING(converter).c_str())); 01872 } 01873 01874 // called from add_associations and there is already a timer 01875 // so cancel the existing timer. 01876 if (this->reactor()->cancel_timer(local_timer_id) == -1) { 01877 // this could fail because the reactor's call and 01878 // the add_associations' call to this could overlap 01879 // so it is not a failure. 01880 ACE_DEBUG((LM_DEBUG, 01881 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ") 01882 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer"))); 01883 } 01884 01885 timer_was_reset = true; 01886 } 01887 01888 // Used after the lock scope ends. 01889 ACE_Time_Value smallest(ACE_Time_Value::max_time); 01890 int alive_writers = 0; 01891 01892 // This is a bit convoluted. The reasoning goes as follows: 01893 // 1) We grab the current timer Id value when we enter the method. 01894 // 2) We *might* cancel the timer if it is active. 01895 // 3) The timer *might* be rescheduled while we do not hold the sample lock. 01896 // 4) If we (or another thread) canceled the timer that we can tell, then 01897 // 5) we should clear the Id value, 01898 // 6) unless it has been rescheduled. 01899 // We are using a changed timer Id value as a proxy for having been 01900 // rescheduled. 01901 if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) { 01902 liveliness_timer_id_ = -1; 01903 } 01904 01905 ACE_Time_Value next_absolute; 01906 01907 // Iterate over each writer to this reader 01908 { 01909 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 01910 read_guard, 01911 data_reader->writers_lock_); 01912 01913 for (WriterMapType::iterator iter = data_reader->writers_.begin(); 01914 iter != data_reader->writers_.end(); 01915 ++iter) { 01916 // deal with possibly not being alive or 01917 // tell when it will not be alive next (if no activity) 01918 next_absolute = iter->second->check_activity(now); 01919 01920 if (next_absolute != ACE_Time_Value::max_time) { 01921 alive_writers++; 01922 01923 if (next_absolute < smallest) { 01924 smallest = next_absolute; 01925 } 01926 } 01927 } 01928 } 01929 01930 if (!alive_writers) { 01931 // no live writers so no need to schedule a timer 01932 // but be sure we don't try to cancel the timer later. 01933 liveliness_timer_id_ = -1; 01934 } 01935 01936 if (DCPS_debug_level >= 5) { 01937 GuidConverter converter(data_reader->get_subscription_id()); 01938 ACE_DEBUG((LM_DEBUG, 01939 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ") 01940 ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"), 01941 OPENDDS_STRING(converter).c_str(), 01942 alive_writers, 01943 !cancel)); 01944 } 01945 01946 // Call into the reactor after releasing the sample lock. 01947 if (alive_writers) { 01948 ACE_Time_Value relative; 01949 01950 // compare the time now with the earliest(smallest) deadline we found 01951 if (now < smallest) { 01952 relative = smallest - now; 01953 } else { 01954 relative = ACE_Time_Value(0,1); // ASAP 01955 } 01956 01957 liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative); 01958 01959 if (liveliness_timer_id_ == -1) { 01960 ACE_ERROR((LM_ERROR, 01961 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ") 01962 ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer"))); 01963 } 01964 } 01965 }
int OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | arg | |||
) | [private, virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 1843 of file DataReaderImpl.cpp.
References check_liveliness_i().
01845 { 01846 check_liveliness_i(false, tv); 01847 return 0; 01848 }
virtual bool OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::reactor_is_shut_down | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::ReactorInterceptor.
Definition at line 768 of file DataReaderImpl.h.
References TheServiceParticipant.
00769 { 00770 return TheServiceParticipant->is_shut_down(); 00771 }
Definition at line 776 of file DataReaderImpl.h.
Referenced by check_liveliness_i().
liveliness timer id; -1 if no timer is set
Definition at line 779 of file DataReaderImpl.h.
Referenced by check_liveliness_i().