#include <ReactorInterceptor.h>
Inheritance diagram for OpenDDS::DCPS::ReactorInterceptor:
Public Member Functions | |
ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner) | |
bool | should_execute_immediately () |
void | process_command_queue () |
template<typename T> | |
void | execute_or_enqueue (T &t) |
template<typename T> | |
void | enqueue (T &t) |
void | wait () |
void | destroy () |
virtual bool | reactor_is_shut_down () const =0 |
Protected Member Functions | |
virtual | ~ReactorInterceptor () |
Private Member Functions | |
int | handle_exception (ACE_HANDLE) |
int | handle_exception_i (ACE_Guard< ACE_Thread_Mutex > &guard) |
OPENDDS_QUEUE (Command *) command_queue_ | |
Private Attributes | |
ACE_thread_t | owner_ |
ACE_Thread_Mutex | mutex_ |
ACE_Condition_Thread_Mutex | condition_ |
ACE_UINT64 | registration_counter_ |
bool | destroy_ |
Classes | |
class | Command |
Definition at line 21 of file ReactorInterceptor.h.
OpenDDS::DCPS::ReactorInterceptor::ReactorInterceptor | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner | |||
) |
Definition at line 16 of file ReactorInterceptor.cpp.
00018 : owner_(owner) 00019 , condition_(mutex_) 00020 , registration_counter_(0) 00021 , destroy_(false) 00022 { 00023 if (reactor == 0) { 00024 ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n")); 00025 } 00026 this->reactor(reactor); 00027 }
OpenDDS::DCPS::ReactorInterceptor::~ReactorInterceptor | ( | ) | [protected, virtual] |
Definition at line 29 of file ReactorInterceptor.cpp.
00030 { 00031 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00032 00033 // Dump the command queue. 00034 while (!command_queue_.empty ()) { 00035 delete command_queue_.front (); 00036 command_queue_.pop (); 00037 } 00038 }
void OpenDDS::DCPS::ReactorInterceptor::destroy | ( | ) |
Definition at line 59 of file ReactorInterceptor.cpp.
References destroy_, reactor_is_shut_down(), and registration_counter_.
Referenced by OpenDDS::DCPS::DataWriterImpl::set_qos(), OpenDDS::DCPS::DataReaderImpl::set_qos(), OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl(), OpenDDS::DCPS::MulticastSession::~MulticastSession(), OpenDDS::DCPS::ReliableSession::~ReliableSession(), and OpenDDS::DCPS::TransportClient::~TransportClient().
00060 { 00061 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00062 if (!reactor_is_shut_down() && registration_counter_ > 0) { 00063 // Wait until we get handle exception. 00064 destroy_ = true; 00065 } else { 00066 guard.release(); 00067 delete this; 00068 } 00069 }
void OpenDDS::DCPS::ReactorInterceptor::enqueue | ( | T & | t | ) | [inline] |
Definition at line 49 of file ReactorInterceptor.h.
00050 { 00051 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00052 command_queue_.push(new T(t)); 00053 ++registration_counter_; 00054 this->reactor()->notify(this); 00055 }
void OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue | ( | T & | t | ) | [inline] |
Definition at line 37 of file ReactorInterceptor.h.
Referenced by OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Watchdog::cancel_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::cancel_timer(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), OpenDDS::DCPS::Watchdog::reset_timer_interval(), OpenDDS::DCPS::DataLinkWatchdog::schedule(), OpenDDS::DCPS::DataLinkWatchdog::schedule_now(), OpenDDS::DCPS::Watchdog::schedule_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::schedule_timer(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::schedule_timer().
00038 { 00039 if (should_execute_immediately()) { 00040 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00041 process_command_queue(); 00042 t.execute(); 00043 } else { 00044 enqueue(t); 00045 } 00046 }
int OpenDDS::DCPS::ReactorInterceptor::handle_exception | ( | ACE_HANDLE | ) | [private] |
Definition at line 71 of file ReactorInterceptor.cpp.
References handle_exception_i(), and registration_counter_.
00072 { 00073 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0); 00074 00075 --registration_counter_; 00076 00077 return handle_exception_i(guard); 00078 }
int OpenDDS::DCPS::ReactorInterceptor::handle_exception_i | ( | ACE_Guard< ACE_Thread_Mutex > & | guard | ) | [private] |
Definition at line 90 of file ReactorInterceptor.cpp.
References condition_, destroy_, process_command_queue(), and registration_counter_.
Referenced by handle_exception(), and wait().
00091 { 00092 process_command_queue(); 00093 00094 condition_.signal(); 00095 00096 if (registration_counter_ == 0 && destroy_) { 00097 guard.release(); 00098 delete this; 00099 } 00100 return 0; 00101 }
OpenDDS::DCPS::ReactorInterceptor::OPENDDS_QUEUE | ( | Command * | ) | [private] |
void OpenDDS::DCPS::ReactorInterceptor::process_command_queue | ( | ) |
Definition at line 80 of file ReactorInterceptor.cpp.
Referenced by handle_exception_i().
00081 { 00082 while (!command_queue_.empty()) { 00083 Command* command = command_queue_.front(); 00084 command_queue_.pop(); 00085 command->execute(); 00086 delete command; 00087 } 00088 }
virtual bool OpenDDS::DCPS::ReactorInterceptor::reactor_is_shut_down | ( | ) | const [pure virtual] |
Implemented in OpenDDS::DCPS::EndHistoricSamplesMissedSweeper, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer, OpenDDS::DCPS::RemoveAssociationSweeper< T >, OpenDDS::DCPS::TransportClient::PendingAssocTimer, OpenDDS::DCPS::SynWatchdog, OpenDDS::DCPS::NakWatchdog, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat, OpenDDS::DCPS::Watchdog, OpenDDS::DCPS::RemoveAssociationSweeper< OpenDDS::DCPS::DataReaderImpl >, and OpenDDS::DCPS::RemoveAssociationSweeper< OpenDDS::DCPS::RecorderImpl >.
Referenced by destroy(), and should_execute_immediately().
bool OpenDDS::DCPS::ReactorInterceptor::should_execute_immediately | ( | ) |
Definition at line 40 of file ReactorInterceptor.cpp.
References owner_, and reactor_is_shut_down().
Referenced by wait().
00041 { 00042 return ACE_OS::thr_equal(owner_, ACE_Thread::self()) || 00043 reactor_is_shut_down(); 00044 }
void OpenDDS::DCPS::ReactorInterceptor::wait | ( | ) |
Definition at line 46 of file ReactorInterceptor.cpp.
References condition_, handle_exception_i(), and should_execute_immediately().
Referenced by OpenDDS::DCPS::DataReaderImpl::cleanup(), OpenDDS::DCPS::Watchdog::schedule_timer(), OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl(), OpenDDS::DCPS::MulticastSession::~MulticastSession(), OpenDDS::DCPS::ReliableSession::~ReliableSession(), and OpenDDS::DCPS::TransportClient::~TransportClient().
00047 { 00048 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00049 00050 if (should_execute_immediately()) { 00051 handle_exception_i(guard); 00052 } else { 00053 while (!command_queue_.empty()) { 00054 condition_.wait(); 00055 } 00056 } 00057 }
ACE_Condition_Thread_Mutex OpenDDS::DCPS::ReactorInterceptor::condition_ [private] |
bool OpenDDS::DCPS::ReactorInterceptor::destroy_ [private] |
Definition at line 74 of file ReactorInterceptor.h.
Referenced by destroy(), and handle_exception_i().
ACE_Thread_Mutex OpenDDS::DCPS::ReactorInterceptor::mutex_ [private] |
Definition at line 70 of file ReactorInterceptor.h.
ACE_thread_t OpenDDS::DCPS::ReactorInterceptor::owner_ [private] |
ACE_UINT64 OpenDDS::DCPS::ReactorInterceptor::registration_counter_ [private] |
Definition at line 73 of file ReactorInterceptor.h.
Referenced by destroy(), handle_exception(), and handle_exception_i().