#include <ReactorInterceptor.h>
Classes | |
class | Command |
Public Types | |
typedef container_supported_unique_ptr < Command > | CommandPtr |
Public Member Functions | |
bool | should_execute_immediately () |
void | process_command_queue () |
template<typename T > | |
void | execute_or_enqueue (T &t) |
template<typename T > | |
void | enqueue (T &t) |
void | wait () |
virtual bool | reactor_is_shut_down () const =0 |
Protected Member Functions | |
ReactorInterceptor (ACE_Reactor *reactor, ACE_thread_t owner) | |
virtual | ~ReactorInterceptor () |
int | handle_exception (ACE_HANDLE) |
int | handle_exception_i (ACE_Guard< ACE_Thread_Mutex > &guard) |
OPENDDS_QUEUE (CommandPtr) command_queue_ | |
Protected Attributes | |
ACE_thread_t | owner_ |
ACE_Thread_Mutex | mutex_ |
ACE_Condition_Thread_Mutex | condition_ |
Definition at line 25 of file ReactorInterceptor.h.
Definition at line 35 of file ReactorInterceptor.h.
OpenDDS::DCPS::ReactorInterceptor::ReactorInterceptor | ( | ACE_Reactor * | reactor, | |
ACE_thread_t | owner | |||
) | [protected] |
Definition at line 20 of file ReactorInterceptor.cpp.
References LM_ERROR, and ACE_Event_Handler::reactor().
00022 : owner_(owner) 00023 , condition_(mutex_) 00024 { 00025 if (reactor == 0) { 00026 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n")); 00027 } 00028 this->reactor(reactor); 00029 }
OpenDDS::DCPS::ReactorInterceptor::~ReactorInterceptor | ( | ) | [protected, virtual] |
Definition at line 31 of file ReactorInterceptor.cpp.
void OpenDDS::DCPS::ReactorInterceptor::enqueue | ( | T & | t | ) | [inline] |
Definition at line 53 of file ReactorInterceptor.h.
00054 { 00055 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00056 command_queue_.push(CommandPtr(new T(t))); 00057 this->reactor()->notify(this); 00058 }
void OpenDDS::DCPS::ReactorInterceptor::execute_or_enqueue | ( | T & | t | ) | [inline] |
Definition at line 41 of file ReactorInterceptor.h.
Referenced by OpenDDS::DCPS::DataLinkWatchdog::cancel(), OpenDDS::DCPS::Watchdog::cancel_all(), OpenDDS::DCPS::Watchdog::cancel_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::cancel_timer(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::cancel_timer(), OpenDDS::DCPS::Watchdog::reset_timer_interval(), OpenDDS::DCPS::DataLinkWatchdog::schedule(), OpenDDS::DCPS::DataLinkWatchdog::schedule_now(), OpenDDS::DCPS::Watchdog::schedule_timer(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::schedule_timer(), and OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::schedule_timer().
00042 { 00043 if (should_execute_immediately()) { 00044 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00045 process_command_queue(); 00046 t.execute(); 00047 } else { 00048 enqueue(t); 00049 } 00050 }
int OpenDDS::DCPS::ReactorInterceptor::handle_exception | ( | ACE_HANDLE | ) | [protected, virtual] |
Reimplemented from ACE_Event_Handler.
Definition at line 56 of file ReactorInterceptor.cpp.
References handle_exception_i(), and mutex_.
00057 { 00058 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0); 00059 00060 return handle_exception_i(guard); 00061 }
int OpenDDS::DCPS::ReactorInterceptor::handle_exception_i | ( | ACE_Guard< ACE_Thread_Mutex > & | guard | ) | [protected] |
Definition at line 73 of file ReactorInterceptor.cpp.
References condition_, process_command_queue(), and ACE_Condition< ACE_Thread_Mutex >::signal().
Referenced by handle_exception(), and wait().
00074 { 00075 process_command_queue(); 00076 condition_.signal(); 00077 return 0; 00078 }
OpenDDS::DCPS::ReactorInterceptor::OPENDDS_QUEUE | ( | CommandPtr | ) | [protected] |
void OpenDDS::DCPS::ReactorInterceptor::process_command_queue | ( | ) |
Definition at line 63 of file ReactorInterceptor.cpp.
References OpenDDS::DCPS::move().
Referenced by handle_exception_i().
00064 { 00065 while (!command_queue_.empty()) { 00066 CommandPtr command = move(command_queue_.front()); 00067 command_queue_.pop(); 00068 if (command) 00069 command->execute(); 00070 } 00071 }
virtual bool OpenDDS::DCPS::ReactorInterceptor::reactor_is_shut_down | ( | ) | const [pure virtual] |
Implemented in OpenDDS::DCPS::EndHistoricSamplesMissedSweeper, OpenDDS::DCPS::DataReaderImpl::LivelinessTimer, OpenDDS::DCPS::RemoveAssociationSweeper< T >, OpenDDS::DCPS::TransportClient::PendingAssocTimer, OpenDDS::DCPS::SynWatchdog, OpenDDS::DCPS::NakWatchdog, OpenDDS::DCPS::RtpsUdpDataLink::HeartBeat, OpenDDS::DCPS::Watchdog, OpenDDS::DCPS::RemoveAssociationSweeper< DataReaderImpl >, and OpenDDS::DCPS::RemoveAssociationSweeper< RecorderImpl >.
Referenced by should_execute_immediately().
bool OpenDDS::DCPS::ReactorInterceptor::should_execute_immediately | ( | ) |
Definition at line 35 of file ReactorInterceptor.cpp.
References owner_, reactor_is_shut_down(), ACE_Thread::self(), and ACE_OS::thr_equal().
Referenced by wait().
00036 { 00037 return ACE_OS::thr_equal(owner_, ACE_Thread::self()) || 00038 reactor_is_shut_down(); 00039 }
void OpenDDS::DCPS::ReactorInterceptor::wait | ( | void | ) |
Definition at line 41 of file ReactorInterceptor.cpp.
References condition_, handle_exception_i(), mutex_, ACE_Reactor::purge_pending_notifications(), ACE_Event_Handler::reactor(), should_execute_immediately(), and ACE_Condition< ACE_Thread_Mutex >::wait().
Referenced by OpenDDS::DCPS::Watchdog::schedule_timer().
00042 { 00043 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00044 00045 if (should_execute_immediately()) { 00046 handle_exception_i(guard); 00047 reactor()->purge_pending_notifications(this); 00048 } else { 00049 while (!command_queue_.empty()) { 00050 condition_.wait(); 00051 } 00052 } 00053 }
Definition at line 73 of file ReactorInterceptor.h.
Referenced by handle_exception_i(), and wait().
Definition at line 72 of file ReactorInterceptor.h.
Referenced by handle_exception(), OpenDDS::DCPS::RemoveAssociationSweeper< T >::handle_timeout(), OpenDDS::DCPS::EndHistoricSamplesMissedSweeper::handle_timeout(), and wait().
Definition at line 71 of file ReactorInterceptor.h.
Referenced by should_execute_immediately().