OpenDDS::DCPS::DataReaderImpl::LivelinessTimer Class Reference

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:
Collaboration graph
[legend]

List of all members.

Classes

class  CancelCommand
class  CheckLivelinessCommand
class  CommandBase

Public Member Functions

 LivelinessTimer (ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader)
void check_liveliness ()
void cancel_timer ()
virtual bool reactor_is_shut_down () const

Private Member Functions

 ~LivelinessTimer ()
void check_liveliness_i (bool cancel, const ACE_Time_Value &current_time)
int handle_timeout (const ACE_Time_Value &current_time, const void *arg)

Private Attributes

WeakRcHandle< DataReaderImpldata_reader_
long liveliness_timer_id_
 liveliness timer id; -1 if no timer is set

Detailed Description

Definition at line 746 of file DataReaderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::LivelinessTimer ( ACE_Reactor reactor,
ACE_thread_t  owner,
DataReaderImpl data_reader 
) [inline]

Definition at line 748 of file DataReaderImpl.h.

00751       : ReactorInterceptor(reactor, owner)
00752       , data_reader_(*data_reader)
00753       , liveliness_timer_id_(-1)
00754     { }

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::~LivelinessTimer (  )  [inline, private]

Definition at line 774 of file DataReaderImpl.h.

00774 { }


Member Function Documentation

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer ( void   )  [inline]

Definition at line 762 of file DataReaderImpl.h.

References c.

Referenced by check_liveliness_i().

00763     {
00764       CancelCommand c(this);
00765       execute_or_enqueue(c);
00766     }

Here is the caller graph for this function:

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness (  )  [inline]

Definition at line 756 of file DataReaderImpl.h.

References c.

00757     {
00758       CheckLivelinessCommand c(this);
00759       execute_or_enqueue(c);
00760     }

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i ( bool  cancel,
const ACE_Time_Value current_time 
) [private]

Definition at line 1851 of file DataReaderImpl.cpp.

References ACE_TEXT(), cancel_timer(), data_reader_, OpenDDS::DCPS::DCPS_debug_level, liveliness_timer_id_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WeakRcHandle< T >::lock(), ACE_Time_Value::max_time, OPENDDS_STRING, ACE_Reactor::purge_pending_notifications(), ACE_Event_Handler::reactor(), and ACE_Reactor::schedule_timer().

Referenced by handle_timeout().

01853 {
01854   // Working copy of the active timer Id.
01855 
01856   RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
01857   if (! data_reader) {
01858     this->reactor()->purge_pending_notifications(this);
01859     return;
01860   }
01861 
01862   long local_timer_id = liveliness_timer_id_;
01863   bool timer_was_reset = false;
01864 
01865   if (local_timer_id != -1 && cancel) {
01866     if (DCPS_debug_level >= 5) {
01867       GuidConverter converter(data_reader->get_subscription_id());
01868       ACE_DEBUG((LM_DEBUG,
01869                  ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01870                  ACE_TEXT(" canceling timer for reader %C.\n"),
01871                  OPENDDS_STRING(converter).c_str()));
01872     }
01873 
01874     // called from add_associations and there is already a timer
01875     // so cancel the existing timer.
01876     if (this->reactor()->cancel_timer(local_timer_id) == -1) {
01877       // this could fail because the reactor's call and
01878       // the add_associations' call to this could overlap
01879       // so it is not a failure.
01880       ACE_DEBUG((LM_DEBUG,
01881                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01882                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
01883     }
01884 
01885     timer_was_reset = true;
01886   }
01887 
01888   // Used after the lock scope ends.
01889   ACE_Time_Value smallest(ACE_Time_Value::max_time);
01890   int alive_writers = 0;
01891 
01892   // This is a bit convoluted.  The reasoning goes as follows:
01893   // 1) We grab the current timer Id value when we enter the method.
01894   // 2) We *might* cancel the timer if it is active.
01895   // 3) The timer *might* be rescheduled while we do not hold the sample lock.
01896   // 4) If we (or another thread) canceled the timer that we can tell, then
01897   // 5) we should clear the Id value,
01898   // 6) unless it has been rescheduled.
01899   // We are using a changed timer Id value as a proxy for having been
01900   // rescheduled.
01901   if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
01902     liveliness_timer_id_ = -1;
01903   }
01904 
01905   ACE_Time_Value next_absolute;
01906 
01907   // Iterate over each writer to this reader
01908   {
01909     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
01910         read_guard,
01911         data_reader->writers_lock_);
01912 
01913     for (WriterMapType::iterator iter = data_reader->writers_.begin();
01914         iter != data_reader->writers_.end();
01915         ++iter) {
01916       // deal with possibly not being alive or
01917       // tell when it will not be alive next (if no activity)
01918       next_absolute = iter->second->check_activity(now);
01919 
01920       if (next_absolute != ACE_Time_Value::max_time) {
01921         alive_writers++;
01922 
01923         if (next_absolute < smallest) {
01924           smallest = next_absolute;
01925         }
01926       }
01927     }
01928   }
01929 
01930   if (!alive_writers) {
01931     // no live writers so no need to schedule a timer
01932     // but be sure we don't try to cancel the timer later.
01933     liveliness_timer_id_ = -1;
01934   }
01935 
01936   if (DCPS_debug_level >= 5) {
01937     GuidConverter converter(data_reader->get_subscription_id());
01938     ACE_DEBUG((LM_DEBUG,
01939         ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
01940         ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
01941         OPENDDS_STRING(converter).c_str(),
01942         alive_writers,
01943         !cancel));
01944   }
01945 
01946   // Call into the reactor after releasing the sample lock.
01947   if (alive_writers) {
01948     ACE_Time_Value relative;
01949 
01950     // compare the time now with the earliest(smallest) deadline we found
01951     if (now < smallest) {
01952       relative = smallest - now;
01953     } else {
01954       relative = ACE_Time_Value(0,1); // ASAP
01955     }
01956 
01957     liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
01958 
01959     if (liveliness_timer_id_ == -1) {
01960       ACE_ERROR((LM_ERROR,
01961           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
01962           ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
01963     }
01964   }
01965 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::handle_timeout ( const ACE_Time_Value current_time,
const void *  arg 
) [private, virtual]

Reimplemented from ACE_Event_Handler.

Definition at line 1843 of file DataReaderImpl.cpp.

References check_liveliness_i().

01845 {
01846   check_liveliness_i(false, tv);
01847   return 0;
01848 }

Here is the call graph for this function:

virtual bool OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::reactor_is_shut_down (  )  const [inline, virtual]

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 768 of file DataReaderImpl.h.

References TheServiceParticipant.

00769     {
00770       return TheServiceParticipant->is_shut_down();
00771     }


Member Data Documentation

Definition at line 776 of file DataReaderImpl.h.

Referenced by check_liveliness_i().

liveliness timer id; -1 if no timer is set

Definition at line 779 of file DataReaderImpl.h.

Referenced by check_liveliness_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1