OpenDDS::DCPS::DataReaderImpl::LivelinessTimer Class Reference

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 LivelinessTimer (ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader)
void check_liveliness ()
void cancel_timer ()
virtual bool reactor_is_shut_down () const

Private Member Functions

 ~LivelinessTimer ()
void check_liveliness_i (bool cancel, const ACE_Time_Value &current_time)
int handle_timeout (const ACE_Time_Value &current_time, const void *arg)

Private Attributes

DataReaderImpldata_reader_
long liveliness_timer_id_
 liveliness timer id; -1 if no timer is set

Classes

class  CancelCommand
class  CheckLivelinessCommand
class  CommandBase

Detailed Description

Definition at line 733 of file DataReaderImpl.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::LivelinessTimer ( ACE_Reactor *  reactor,
ACE_thread_t  owner,
DataReaderImpl data_reader 
) [inline]

Definition at line 735 of file DataReaderImpl.h.

00738       : ReactorInterceptor(reactor, owner)
00739       , data_reader_(data_reader)
00740       , liveliness_timer_id_(-1)
00741     { }

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::~LivelinessTimer (  )  [inline, private]

Definition at line 761 of file DataReaderImpl.h.

00761 { }


Member Function Documentation

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer (  )  [inline]

Definition at line 749 of file DataReaderImpl.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::cleanup(), and OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl().

00750     {
00751       CancelCommand c(this);
00752       execute_or_enqueue(c);
00753     }

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness (  )  [inline]

Definition at line 743 of file DataReaderImpl.h.

Referenced by OpenDDS::DCPS::DataReaderImpl::transport_assoc_done(), and OpenDDS::DCPS::DataReaderImpl::writer_became_alive().

00744     {
00745       CheckLivelinessCommand c(this);
00746       execute_or_enqueue(c);
00747     }

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i ( bool  cancel,
const ACE_Time_Value &  current_time 
) [private]

Definition at line 1992 of file DataReaderImpl.cpp.

References data_reader_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataReaderImpl::get_subscription_id(), liveliness_timer_id_, OPENDDS_STRING, OpenDDS::DCPS::DataReaderImpl::writers_, and OpenDDS::DCPS::DataReaderImpl::writers_lock_.

Referenced by handle_timeout().

01994 {
01995   // Working copy of the active timer Id.
01996   long local_timer_id = liveliness_timer_id_;
01997   bool timer_was_reset = false;
01998 
01999   if (local_timer_id != -1 && cancel) {
02000     if (DCPS_debug_level >= 5) {
02001       GuidConverter converter(data_reader_->get_subscription_id());
02002       ACE_DEBUG((LM_DEBUG,
02003                  ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02004                  ACE_TEXT(" canceling timer for reader %C.\n"),
02005                  OPENDDS_STRING(converter).c_str()));
02006     }
02007 
02008     // called from add_associations and there is already a timer
02009     // so cancel the existing timer.
02010     if (this->reactor()->cancel_timer(local_timer_id) == -1) {
02011       // this could fail because the reactor's call and
02012       // the add_associations' call to this could overlap
02013       // so it is not a failure.
02014       ACE_DEBUG((LM_DEBUG,
02015                  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02016                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
02017     }
02018 
02019     timer_was_reset = true;
02020   }
02021 
02022   // Used after the lock scope ends.
02023   ACE_Time_Value smallest(ACE_Time_Value::max_time);
02024   int alive_writers = 0;
02025 
02026   // This is a bit convoluted.  The reasoning goes as follows:
02027   // 1) We grab the current timer Id value when we enter the method.
02028   // 2) We *might* cancel the timer if it is active.
02029   // 3) The timer *might* be rescheduled while we do not hold the sample lock.
02030   // 4) If we (or another thread) canceled the timer that we can tell, then
02031   // 5) we should clear the Id value,
02032   // 6) unless it has been rescheduled.
02033   // We are using a changed timer Id value as a proxy for having been
02034   // rescheduled.
02035   if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
02036     liveliness_timer_id_ = -1;
02037   }
02038 
02039   ACE_Time_Value next_absolute;
02040 
02041   // Iterate over each writer to this reader
02042   {
02043     ACE_READ_GUARD(ACE_RW_Thread_Mutex,
02044         read_guard,
02045         data_reader_->writers_lock_);
02046 
02047     for (WriterMapType::iterator iter = data_reader_->writers_.begin();
02048         iter != data_reader_->writers_.end();
02049         ++iter) {
02050       // deal with possibly not being alive or
02051       // tell when it will not be alive next (if no activity)
02052       next_absolute = iter->second->check_activity(now);
02053 
02054       if (next_absolute != ACE_Time_Value::max_time) {
02055         alive_writers++;
02056 
02057         if (next_absolute < smallest) {
02058           smallest = next_absolute;
02059         }
02060       }
02061     }
02062   }
02063 
02064   if (!alive_writers) {
02065     // no live writers so no need to schedule a timer
02066     // but be sure we don't try to cancel the timer later.
02067     liveliness_timer_id_ = -1;
02068   }
02069 
02070   if (DCPS_debug_level >= 5) {
02071     GuidConverter converter(data_reader_->get_subscription_id());
02072     ACE_DEBUG((LM_DEBUG,
02073         ACE_TEXT("(%P|%t) DataReaderImpl::handle_timeout: ")
02074         ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
02075         OPENDDS_STRING(converter).c_str(),
02076         alive_writers,
02077         !cancel));
02078   }
02079 
02080   // Call into the reactor after releasing the sample lock.
02081   if (alive_writers) {
02082     ACE_Time_Value relative;
02083 
02084     // compare the time now with the earliest(smallest) deadline we found
02085     if (now < smallest)
02086       relative = smallest - now;
02087 
02088     else
02089       relative = ACE_Time_Value(0,1); // ASAP
02090 
02091     liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative);
02092 
02093     if (liveliness_timer_id_ == -1) {
02094       ACE_ERROR((LM_ERROR,
02095           ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::handle_timeout: ")
02096           ACE_TEXT(" %p. \n"), ACE_TEXT("schedule_timer")));
02097     }
02098   }
02099 }

int OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::handle_timeout ( const ACE_Time_Value &  current_time,
const void *  arg 
) [private]

Definition at line 1984 of file DataReaderImpl.cpp.

References check_liveliness_i().

01986 {
01987   check_liveliness_i(false, tv);
01988   return 0;
01989 }

virtual bool OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::reactor_is_shut_down (  )  const [inline, virtual]

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 755 of file DataReaderImpl.h.

References TheServiceParticipant.

00756     {
00757       return TheServiceParticipant->is_shut_down();
00758     }


Member Data Documentation

DataReaderImpl* OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::data_reader_ [private]

Definition at line 763 of file DataReaderImpl.h.

Referenced by check_liveliness_i().

long OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::liveliness_timer_id_ [private]

liveliness timer id; -1 if no timer is set

Definition at line 766 of file DataReaderImpl.h.

Referenced by check_liveliness_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:12 2016 for OpenDDS by  doxygen 1.4.7