OpenDDS::DCPS::ReactorInterceptor Class Reference

#include <ReactorInterceptor.h>

Inheritance diagram for OpenDDS::DCPS::ReactorInterceptor:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReactorInterceptor:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner)
bool should_execute_immediately ()
void process_command_queue ()
template<typename T>
void execute_or_enqueue (T &t)
template<typename T>
void enqueue (T &t)
void wait ()
void destroy ()
virtual bool reactor_is_shut_down () const =0

Protected Member Functions

virtual ~ReactorInterceptor ()

Private Member Functions

int handle_exception (ACE_HANDLE)
int handle_exception_i (ACE_Guard< ACE_Thread_Mutex > &guard)
 OPENDDS_QUEUE (Command *) command_queue_

Private Attributes

ACE_thread_t owner_
ACE_Thread_Mutex mutex_
ACE_Condition_Thread_Mutex condition_
ACE_UINT64 registration_counter_
bool destroy_

Classes

class  Command

Detailed Description

Definition at line 21 of file ReactorInterceptor.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::ReactorInterceptor::ReactorInterceptor ( ACE_Reactor *  reactor,
ACE_thread_t  owner 
)

Definition at line 16 of file ReactorInterceptor.cpp.

00018   : owner_(owner)
00019   , condition_(mutex_)
00020   , registration_counter_(0)
00021   , destroy_(false)
00022 {
00023   if (reactor == 0) {
00024     ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n"));
00025   }
00026   this->reactor(reactor);
00027 }

OpenDDS::DCPS::ReactorInterceptor::~ReactorInterceptor (  )  [protected, virtual]

Definition at line 29 of file ReactorInterceptor.cpp.

00030 {
00031   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00032 
00033   // Dump the command queue.
00034   while (!command_queue_.empty ()) {
00035     delete command_queue_.front ();
00036     command_queue_.pop ();
00037   }
00038 }


Member Function Documentation

void OpenDDS::DCPS::ReactorInterceptor::destroy (  ) 

Definition at line 59 of file ReactorInterceptor.cpp.

References destroy_, reactor_is_shut_down(), and registration_counter_.

Referenced by OpenDDS::DCPS::DataWriterImpl::set_qos(), OpenDDS::DCPS::DataReaderImpl::set_qos(), OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl(), OpenDDS::DCPS::MulticastSession::~MulticastSession(), OpenDDS::DCPS::ReliableSession::~ReliableSession(), and OpenDDS::DCPS::TransportClient::~TransportClient().

00060 {
00061   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00062   if (!reactor_is_shut_down() && registration_counter_ > 0) {
00063     // Wait until we get handle exception.
00064     destroy_ = true;
00065   } else {
00066     guard.release();
00067     delete this;
00068   }
00069 }

template<typename T>
void OpenDDS::DCPS::ReactorInterceptor::enqueue ( T &  t  )  [inline]

Definition at line 49 of file ReactorInterceptor.h.

00050   {
00051     ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00052     command_queue_.push(new T(t));
00053     ++registration_counter_;
00054     this->reactor()->notify(this);
00055   }

template<typename T>
void OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue ( T &  t  )  [inline]

Definition at line 37 of file ReactorInterceptor.h.

Referenced by OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Watchdog::cancel_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::cancel_timer(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), OpenDDS::DCPS::Watchdog::reset_timer_interval(), OpenDDS::DCPS::DataLinkWatchdog::schedule(), OpenDDS::DCPS::DataLinkWatchdog::schedule_now(), OpenDDS::DCPS::Watchdog::schedule_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::schedule_timer(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::schedule_timer().

00038   {
00039     if (should_execute_immediately()) {
00040       ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00041       process_command_queue();
00042       t.execute();
00043     } else {
00044       enqueue(t);
00045     }
00046   }

int OpenDDS::DCPS::ReactorInterceptor::handle_exception ( ACE_HANDLE   )  [private]

Definition at line 71 of file ReactorInterceptor.cpp.

References handle_exception_i(), and registration_counter_.

00072 {
00073   ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0);
00074 
00075   --registration_counter_;
00076 
00077   return handle_exception_i(guard);
00078 }

int OpenDDS::DCPS::ReactorInterceptor::handle_exception_i ( ACE_Guard< ACE_Thread_Mutex > &  guard  )  [private]

Definition at line 90 of file ReactorInterceptor.cpp.

References condition_, destroy_, process_command_queue(), and registration_counter_.

Referenced by handle_exception(), and wait().

00091 {
00092   process_command_queue();
00093 
00094   condition_.signal();
00095 
00096   if (registration_counter_ == 0 && destroy_) {
00097     guard.release();
00098     delete this;
00099   }
00100   return 0;
00101 }

OpenDDS::DCPS::ReactorInterceptor::OPENDDS_QUEUE ( Command  )  [private]

void OpenDDS::DCPS::ReactorInterceptor::process_command_queue (  ) 

Definition at line 80 of file ReactorInterceptor.cpp.

Referenced by handle_exception_i().

00081 {
00082   while (!command_queue_.empty()) {
00083     Command* command = command_queue_.front();
00084     command_queue_.pop();
00085     command->execute();
00086     delete command;
00087   }
00088 }

virtual bool OpenDDS::DCPS::ReactorInterceptor::reactor_is_shut_down (  )  const [pure virtual]

Implemented in OpenDDS::DCPS::EndHistoricSamplesMissedSweeper, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer, OpenDDS::DCPS::RemoveAssociationSweeper< T >, OpenDDS::DCPS::TransportClient::PendingAssocTimer, OpenDDS::DCPS::SynWatchdog, OpenDDS::DCPS::NakWatchdog, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat, OpenDDS::DCPS::Watchdog, OpenDDS::DCPS::RemoveAssociationSweeper< OpenDDS::DCPS::DataReaderImpl >, and OpenDDS::DCPS::RemoveAssociationSweeper< OpenDDS::DCPS::RecorderImpl >.

Referenced by destroy(), and should_execute_immediately().

bool OpenDDS::DCPS::ReactorInterceptor::should_execute_immediately (  ) 

Definition at line 40 of file ReactorInterceptor.cpp.

References owner_, and reactor_is_shut_down().

Referenced by wait().

00041 {
00042   return ACE_OS::thr_equal(owner_, ACE_Thread::self()) ||
00043     reactor_is_shut_down();
00044 }

void OpenDDS::DCPS::ReactorInterceptor::wait (  ) 

Definition at line 46 of file ReactorInterceptor.cpp.

References condition_, handle_exception_i(), and should_execute_immediately().

Referenced by OpenDDS::DCPS::DataReaderImpl::cleanup(), OpenDDS::DCPS::Watchdog::schedule_timer(), OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl(), OpenDDS::DCPS::MulticastSession::~MulticastSession(), OpenDDS::DCPS::ReliableSession::~ReliableSession(), and OpenDDS::DCPS::TransportClient::~TransportClient().

00047 {
00048   ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00049 
00050   if (should_execute_immediately()) {
00051     handle_exception_i(guard);
00052   } else {
00053     while (!command_queue_.empty()) {
00054       condition_.wait();
00055     }
00056   }
00057 }


Member Data Documentation

ACE_Condition_Thread_Mutex OpenDDS::DCPS::ReactorInterceptor::condition_ [private]

Definition at line 71 of file ReactorInterceptor.h.

Referenced by handle_exception_i(), and wait().

bool OpenDDS::DCPS::ReactorInterceptor::destroy_ [private]

Definition at line 74 of file ReactorInterceptor.h.

Referenced by destroy(), and handle_exception_i().

ACE_Thread_Mutex OpenDDS::DCPS::ReactorInterceptor::mutex_ [private]

Definition at line 70 of file ReactorInterceptor.h.

ACE_thread_t OpenDDS::DCPS::ReactorInterceptor::owner_ [private]

Definition at line 69 of file ReactorInterceptor.h.

Referenced by should_execute_immediately().

ACE_UINT64 OpenDDS::DCPS::ReactorInterceptor::registration_counter_ [private]

Definition at line 73 of file ReactorInterceptor.h.

Referenced by destroy(), handle_exception(), and handle_exception_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:22 2016 for OpenDDS by  doxygen 1.4.7