00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 00009 00010 #include "ace/Log_Msg.h" 00011 #include "ace/Synch.h" 00012 00013 #include "ReactorInterceptor.h" 00014 00015 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00016 00017 namespace OpenDDS { 00018 namespace DCPS { 00019 00020 ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor, 00021 ACE_thread_t owner) 00022 : owner_(owner) 00023 , condition_(mutex_) 00024 { 00025 if (reactor == 0) { 00026 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: ReactorInterceptor initialized with null reactor\n")); 00027 } 00028 this->reactor(reactor); 00029 } 00030 00031 ReactorInterceptor::~ReactorInterceptor() 00032 { 00033 } 00034 00035 bool ReactorInterceptor::should_execute_immediately() 00036 { 00037 return ACE_OS::thr_equal(owner_, ACE_Thread::self()) || 00038 reactor_is_shut_down(); 00039 } 00040 00041 void ReactorInterceptor::wait() 00042 { 00043 ACE_GUARD(ACE_Thread_Mutex, guard, this->mutex_); 00044 00045 if (should_execute_immediately()) { 00046 handle_exception_i(guard); 00047 reactor()->purge_pending_notifications(this); 00048 } else { 00049 while (!command_queue_.empty()) { 00050 condition_.wait(); 00051 } 00052 } 00053 } 00054 00055 00056 int ReactorInterceptor::handle_exception(ACE_HANDLE /*fd*/) 00057 { 00058 ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->mutex_, 0); 00059 00060 return handle_exception_i(guard); 00061 } 00062 00063 void ReactorInterceptor::process_command_queue() 00064 { 00065 while (!command_queue_.empty()) { 00066 CommandPtr command = move(command_queue_.front()); 00067 command_queue_.pop(); 00068 if (command) 00069 command->execute(); 00070 } 00071 } 00072 00073 int ReactorInterceptor::handle_exception_i(ACE_Guard<ACE_Thread_Mutex>&) 00074 { 00075 process_command_queue(); 00076 condition_.signal(); 00077 return 0; 00078 } 00079 00080 } 00081 } 00082 00083 OPENDDS_END_VERSIONED_NAMESPACE_DECL