Public Member Functions | |
LivelinessTimer (ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader) | |
void | check_liveliness () |
void | cancel_timer () |
virtual bool | reactor_is_shut_down () const |
Private Member Functions | |
~LivelinessTimer () | |
void | check_liveliness_i (bool cancel, const ACE_Time_Value ¤t_time) |
int | handle_timeout (const ACE_Time_Value ¤t_time, const void *arg) |
Private Attributes | |
DataReaderImpl * | data_reader_ |
long | liveliness_timer_id_ |
liveliness timer id; -1 if no timer is set | |
Classes | |
class | CancelCommand |
class | CheckLivelinessCommand |
class | CommandBase |
Definition at line 733 of file DataReaderImpl.h.
OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::LivelinessTimer | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner, | |||
DataReaderImpl * | data_reader | |||
) | [inline] |
Definition at line 735 of file DataReaderImpl.h.
00738 : ReactorInterceptor(reactor, owner) 00739 , data_reader_(data_reader) 00740 , liveliness_timer_id_(-1) 00741 { }
OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::~LivelinessTimer | ( | ) | [inline, private] |
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer | ( | ) | [inline] |
Definition at line 749 of file DataReaderImpl.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::cleanup(), and OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl().
00750 { 00751 CancelCommand c(this); 00752 execute_or_enqueue(c); 00753 }
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness | ( | ) | [inline] |
Definition at line 743 of file DataReaderImpl.h.
Referenced by OpenDDS::DCPS::DataReaderImpl::transport_assoc_done(), and OpenDDS::DCPS::DataReaderImpl::writer_became_alive().
00744 { 00745 CheckLivelinessCommand c(this); 00746 execute_or_enqueue(c); 00747 }
void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i | ( | bool | cancel, | |
const ACE_Time_Value & | current_time | |||
) | [private] |
Definition at line 1992 of file DataReaderImpl.cpp.
References data_reader_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), liveliness_timer_id_, OPENDDS_STRING, OpenDDS::DCPS::DataReaderImpl::writers_, and OpenDDS::DCPS::DataReaderImpl::writers_lock_.
Referenced by handle_timeout().
01994 { 01995 // Working copy of the active timer Id. 01996 long local_timer_id = liveliness_timer_id_; 01997 bool timer_was_reset = false; 01998 01999 if (local_timer_id != -1 && cancel) { 02000 if (DCPS_debug_level >= 5) { 02001 GuidConverter converter(data_reader_->get_subscription_id()); 02002 ACE_DEBUG((LM_DEBUG, 02003 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ") 02004 ACE_TEXT(" canceling timer for reader %C.\n"), 02005 OPENDDS_STRING(converter).c_str())); 02006 } 02007 02008 // called from add_associations and there is already a timer 02009 // so cancel the existing timer. 02010 if (this->reactor()->cancel_timer(local_timer_id) == -1) { 02011 // this could fail because the reactor's call and 02012 // the add_associations' call to this could overlap 02013 // so it is not a failure. 02014 ACE_DEBUG((LM_DEBUG, 02015 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ") 02016 ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer"))); 02017 } 02018 02019 timer_was_reset = true; 02020 } 02021 02022 // Used after the lock scope ends. 02023 ACE_Time_Value smallest(ACE_Time_Value::max_time); 02024 int alive_writers = 0; 02025 02026 // This is a bit convoluted. The reasoning goes as follows: 02027 // 1) We grab the current timer Id value when we enter the method. 02028 // 2) We *might* cancel the timer if it is active. 02029 // 3) The timer *might* be rescheduled while we do not hold the sample lock. 02030 // 4) If we (or another thread) canceled the timer that we can tell, then 02031 // 5) we should clear the Id value, 02032 // 6) unless it has been rescheduled. 02033 // We are using a changed timer Id value as a proxy for having been 02034 // rescheduled. 02035 if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) { 02036 liveliness_timer_id_ = -1; 02037 } 02038 02039 ACE_Time_Value next_absolute; 02040 02041 // Iterate over each writer to this reader 02042 { 02043 ACE_READ_GUARD(ACE_RW_Thread_Mutex, 02044 read_guard, 02045 data_reader_->writers_lock_); 02046 02047 for (WriterMapType::iterator iter = data_reader_->writers_.begin(); 02048 iter != data_reader_->writers_.end(); 02049 ++iter) { 02050 // deal with possibly not being alive or 02051 // tell when it will not be alive next (if no activity) 02052 next_absolute = iter->second->check_activity(now); 02053 02054 if (next_absolute != ACE_Time_Value::max_time) { 02055 alive_writers++; 02056 02057 if (next_absolute < smallest) { 02058 smallest = next_absolute; 02059 } 02060 } 02061 } 02062 } 02063 02064 if (!alive_writers) { 02065 // no live writers so no need to schedule a timer 02066 // but be sure we don't try to cancel the timer later. 02067 liveliness_timer_id_ = -1; 02068 } 02069 02070 if (DCPS_debug_level >= 5) { 02071 GuidConverter converter(data_reader_->get_subscription_id()); 02072 ACE_DEBUG((LM_DEBUG, 02073 ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ") 02074 ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"), 02075 OPENDDS_STRING(converter).c_str(), 02076 alive_writers, 02077 !cancel)); 02078 } 02079 02080 // Call into the reactor after releasing the sample lock. 02081 if (alive_writers) { 02082 ACE_Time_Value relative; 02083 02084 // compare the time now with the earliest(smallest) deadline we found 02085 if (now < smallest) 02086 relative = smallest - now; 02087 02088 else 02089 relative = ACE_Time_Value(0,1); // ASAP 02090 02091 liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative); 02092 02093 if (liveliness_timer_id_ == -1) { 02094 ACE_ERROR((LM_ERROR, 02095 ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ") 02096 ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer"))); 02097 } 02098 } 02099 }
int OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::handle_timeout | ( | const ACE_Time_Value & | current_time, | |
const void * | arg | |||
) | [private] |
Definition at line 1984 of file DataReaderImpl.cpp.
References check_liveliness_i().
01986 { 01987 check_liveliness_i(false, tv); 01988 return 0; 01989 }
virtual bool OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::reactor_is_shut_down | ( | ) | const [inline, virtual] |
Implements OpenDDS::DCPS::ReactorInterceptor.
Definition at line 755 of file DataReaderImpl.h.
References TheServiceParticipant.
00756 { 00757 return TheServiceParticipant->is_shut_down(); 00758 }
liveliness timer id; -1 if no timer is set
Definition at line 766 of file DataReaderImpl.h.
Referenced by check_liveliness_i().