OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::DataReaderImpl::LivelinessTimer Class Reference
Inheritance diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl::LivelinessTimer:
Collaboration graph
[legend]

Classes

class  CancelCommand
 
class  CheckLivelinessCommand
 
class  CommandBase
 

Public Member Functions

 LivelinessTimer (ACE_Reactor *reactor, ACE_thread_t owner, DataReaderImpl *data_reader)
 
void check_liveliness ()
 
void cancel_timer ()
 
virtual bool reactor_is_shut_down () const
 
- Public Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
CommandPtr execute_or_enqueue (CommandPtr command)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor () const
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Member Functions

 ~LivelinessTimer ()
 
void check_liveliness_i (bool cancel, const MonotonicTimePoint &now)
 
int handle_timeout (const ACE_Time_Value &current_time, const void *arg)
 

Private Attributes

WeakRcHandle< DataReaderImpldata_reader_
 
long liveliness_timer_id_
 liveliness timer id; -1 if no timer is set More...
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::ReactorInterceptor
typedef RcHandle< CommandCommandPtr
 
- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from OpenDDS::DCPS::ReactorInterceptor
enum  ReactorState { RS_NONE, RS_NOTIFIED, RS_PROCESSING }
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from OpenDDS::DCPS::ReactorInterceptor
 ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner)
 
virtual ~ReactorInterceptor ()
 
int handle_exception (ACE_HANDLE)
 
void process_command_queue_i (ACE_Guard< ACE_Thread_Mutex > &guard)
 
typedef OPENDDS_VECTOR (CommandPtr) Queue
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::ReactorInterceptor
ACE_thread_t owner_
 
ACE_Thread_Mutex mutex_
 
Queue command_queue_
 
ReactorState state_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 918 of file DataReaderImpl.h.

Constructor & Destructor Documentation

◆ LivelinessTimer()

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::LivelinessTimer ( ACE_Reactor reactor,
ACE_thread_t  owner,
DataReaderImpl data_reader 
)
inline

Definition at line 920 of file DataReaderImpl.h.

923  : ReactorInterceptor(reactor, owner)
924  , data_reader_(*data_reader)
926  { }
WeakRcHandle< DataReaderImpl > data_reader_
ReactorInterceptor(ACE_Reactor *reactor, ACE_thread_t owner)
long liveliness_timer_id_
liveliness timer id; -1 if no timer is set

◆ ~LivelinessTimer()

OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::~LivelinessTimer ( )
inlineprivate

Definition at line 941 of file DataReaderImpl.h.

941 { }

Member Function Documentation

◆ cancel_timer()

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::cancel_timer ( void  )
inline

Definition at line 930 of file DataReaderImpl.h.

931  {
932  execute_or_enqueue(make_rch<CancelCommand>(this));
933  }
CommandPtr execute_or_enqueue(CommandPtr command)

◆ check_liveliness()

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness ( )

Definition at line 1819 of file DataReaderImpl.cpp.

1820 {
1821  execute_or_enqueue(make_rch<CheckLivelinessCommand>(this));
1822 }
CommandPtr execute_or_enqueue(CommandPtr command)

◆ check_liveliness_i()

void OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::check_liveliness_i ( bool  cancel,
const MonotonicTimePoint now 
)
private

Definition at line 1835 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_READ_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::max_value, and OpenDDS::DCPS::TimeDuration::value().

1837 {
1838  // Working copy of the active timer Id.
1839 
1840  RcHandle<DataReaderImpl> data_reader = data_reader_.lock();
1841  if (! data_reader) {
1842  this->reactor()->purge_pending_notifications(this);
1843  return;
1844  }
1845 
1846  long local_timer_id = liveliness_timer_id_;
1847  bool timer_was_reset = false;
1848 
1849  if (local_timer_id != -1 && cancel) {
1850  if (DCPS_debug_level >= 5) {
1851  ACE_DEBUG((LM_DEBUG,
1852  ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1853  ACE_TEXT(" canceling timer for reader %C.\n"),
1854  LogGuid(data_reader->get_guid()).c_str()));
1855  }
1856 
1857  // called from add_associations and there is already a timer
1858  // so cancel the existing timer.
1859  if (this->reactor()->cancel_timer(local_timer_id) == -1) {
1860  // this could fail because the reactor's call and
1861  // the add_associations' call to this could overlap
1862  // so it is not a failure.
1863  ACE_DEBUG((LM_DEBUG,
1864  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1865  ACE_TEXT(" %p.\n"), ACE_TEXT("cancel_timer")));
1866  }
1867 
1868  timer_was_reset = true;
1869  }
1870 
1871  // Used after the lock scope ends.
1873  int alive_writers = 0;
1874 
1875  // This is a bit convoluted. The reasoning goes as follows:
1876  // 1) We grab the current timer Id value when we enter the method.
1877  // 2) We *might* cancel the timer if it is active.
1878  // 3) The timer *might* be rescheduled while we do not hold the sample lock.
1879  // 4) If we (or another thread) canceled the timer that we can tell, then
1880  // 5) we should clear the Id value,
1881  // 6) unless it has been rescheduled.
1882  // We are using a changed timer Id value as a proxy for having been
1883  // rescheduled.
1884  if( timer_was_reset && (liveliness_timer_id_ == local_timer_id)) {
1885  liveliness_timer_id_ = -1;
1886  }
1887 
1888  // Iterate over each writer to this reader
1889  {
1891  read_guard,
1892  data_reader->writers_lock_);
1893  WriterMapType writers = data_reader->writers_;
1894  read_guard.release();
1895 
1896  for (WriterMapType::iterator iter = writers.begin();
1897  iter != writers.end();
1898  ++iter) {
1899  // deal with possibly not being alive or
1900  // tell when it will not be alive next (if no activity)
1901  const MonotonicTimePoint next_absolute(iter->second->check_activity(now));
1902  if (!next_absolute.is_max()) {
1903  alive_writers++;
1904  smallest = std::min(smallest, next_absolute);
1905  }
1906  }
1907  }
1908 
1909  if (!alive_writers) {
1910  // no live writers so no need to schedule a timer
1911  // but be sure we don't try to cancel the timer later.
1912  liveliness_timer_id_ = -1;
1913  }
1914 
1915  if (DCPS_debug_level >= 5) {
1916  ACE_DEBUG((LM_DEBUG,
1917  ACE_TEXT("(%P|%t) DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1918  ACE_TEXT("reader %C has %d live writers; from_reactor=%d\n"),
1919  LogGuid(data_reader->get_guid()).c_str(),
1920  alive_writers,
1921  !cancel));
1922  }
1923 
1924  // Call into the reactor after releasing the sample lock.
1925  if (alive_writers) {
1926  // compare the time now with the earliest(smallest) deadline we found
1927  TimeDuration relative;
1928  if (now < smallest) {
1929  relative = smallest - now;
1930  } else {
1931  relative = TimeDuration(0, 1); // ASAP
1932  }
1933  liveliness_timer_id_ = this->reactor()->schedule_timer(this, 0, relative.value());
1934 
1935  if (liveliness_timer_id_ == -1) {
1936  ACE_ERROR((LM_ERROR,
1937  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::LivelinessTimer::check_liveliness_i: ")
1938  ACE_TEXT(" %p.\n"), ACE_TEXT("schedule_timer")));
1939  }
1940  }
1941 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
virtual ACE_Reactor * reactor() const
WeakRcHandle< DataReaderImpl > data_reader_
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
static const TimePoint_T< MonotonicClock > max_value
Definition: TimePoint_T.h:41
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
int purge_pending_notifications(ACE_Event_Handler *eh, ACE_Reactor_Mask=ACE_Event_Handler::ALL_EVENTS_MASK)
RcHandle< T > lock() const
Definition: RcObject.h:188
long liveliness_timer_id_
liveliness timer id; -1 if no timer is set

◆ handle_timeout()

int OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::handle_timeout ( const ACE_Time_Value current_time,
const void *  arg 
)
privatevirtual

Reimplemented from ACE_Event_Handler.

Definition at line 1825 of file DataReaderImpl.cpp.

References TheServiceParticipant.

1827 {
1828  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
1829 
1831  return 0;
1832 }
void check_liveliness_i(bool cancel, const MonotonicTimePoint &now)
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
#define TheServiceParticipant

◆ reactor_is_shut_down()

virtual bool OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::reactor_is_shut_down ( ) const
inlinevirtual

Implements OpenDDS::DCPS::ReactorInterceptor.

Definition at line 935 of file DataReaderImpl.h.

References TheServiceParticipant.

936  {
937  return TheServiceParticipant->is_shut_down();
938  }
#define TheServiceParticipant

Member Data Documentation

◆ data_reader_

WeakRcHandle<DataReaderImpl> OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::data_reader_
private

Definition at line 943 of file DataReaderImpl.h.

◆ liveliness_timer_id_

long OpenDDS::DCPS::DataReaderImpl::LivelinessTimer::liveliness_timer_id_
private

liveliness timer id; -1 if no timer is set

Definition at line 946 of file DataReaderImpl.h.


The documentation for this class was generated from the following files: