00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "ReactorInterceptor.h"
00010
00011 #include "ace/Log_Msg.h"
00012
00013 namespace OpenDDS {
00014 namespace DCPS {
00015
00016 ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor,
00017 ACE_thread_t owner)
00018 : owner_(owner)
00019 , condition_(mutex_)
00020 , registration_counter_(0)
00021 , destroy_(false)
00022 {
00023 if (reactor == 0) {
00024 ACE_DEBUG((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n"));
00025 }
00026 this->reactor(reactor);
00027 }
00028
00029 ReactorInterceptor::~ReactorInterceptor()
00030 {
00031 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00032
00033
00034 while (!command_queue_.empty ()) {
00035 delete command_queue_.front ();
00036 command_queue_.pop ();
00037 }
00038 }
00039
00040 bool ReactorInterceptor::should_execute_immediately()
00041 {
00042 return ACE_OS::thr_equal(owner_, ACE_Thread::self()) ||
00043 reactor_is_shut_down();
00044 }
00045
00046 void ReactorInterceptor::wait()
00047 {
00048 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00049
00050 if (should_execute_immediately()) {
00051 handle_exception_i(guard);
00052 } else {
00053 while (!command_queue_.empty()) {
00054 condition_.wait();
00055 }
00056 }
00057 }
00058
00059 void ReactorInterceptor::destroy()
00060 {
00061 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_);
00062 if (!reactor_is_shut_down() && registration_counter_ > 0) {
00063
00064 destroy_ = true;
00065 } else {
00066 guard.release();
00067 delete this;
00068 }
00069 }
00070
00071 int ReactorInterceptor::handle_exception(ACE_HANDLE )
00072 {
00073 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0);
00074
00075 --registration_counter_;
00076
00077 return handle_exception_i(guard);
00078 }
00079
00080 void ReactorInterceptor::process_command_queue()
00081 {
00082 while (!command_queue_.empty()) {
00083 Command* command = command_queue_.front();
00084 command_queue_.pop();
00085 command->execute();
00086 delete command;
00087 }
00088 }
00089
00090 int ReactorInterceptor::handle_exception_i(ACE_Guard<ACE_Thread_Mutex>& guard)
00091 {
00092 process_command_queue();
00093
00094 condition_.signal();
00095
00096 if (registration_counter_ == 0 && destroy_) {
00097 guard.release();
00098 delete this;
00099 }
00100 return 0;
00101 }
00102
00103 }
00104
00105 }