Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "ace/Log_Msg.h" 11 : #include "ace/Reverse_Lock_T.h" 12 : #include "ace/Synch.h" 13 : 14 : #include "ReactorInterceptor.h" 15 : #include "Service_Participant.h" 16 : 17 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 18 : 19 : namespace OpenDDS { 20 : namespace DCPS { 21 : 22 32 : ReactorInterceptor::Command::Command() 23 32 : : reactor_(0) 24 32 : {} 25 : 26 21 : ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor, 27 0 : ACE_thread_t owner) 28 21 : : owner_(owner) 29 21 : , state_(RS_NONE) 30 : { 31 21 : RcEventHandler::reactor(reactor); 32 21 : } 33 : 34 21 : ReactorInterceptor::~ReactorInterceptor() 35 : { 36 21 : } 37 : 38 32 : ReactorInterceptor::CommandPtr ReactorInterceptor::execute_or_enqueue(CommandPtr command) 39 : { 40 32 : OPENDDS_ASSERT(command); 41 : 42 32 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 43 : 44 : // Only allow immediate execution if running on the reactor thread, otherwise we risk deadlock 45 : // when calling into the reactor object. 46 32 : const bool is_owner = ACE_OS::thr_equal(owner_, ACE_Thread::self()); 47 : 48 : // If state is set to processing, the conents of command_queue_ have been swapped out 49 : // so immediate execution may run jobs out of the expected order. 50 32 : const bool is_not_processing = state_ != RS_PROCESSING; 51 : 52 : // If the command_queue_ is not empty, allowing execution will potentially run unexpected code 53 : // which is problematic since we may be holding locks used by the unexpected code. 54 32 : const bool is_empty = command_queue_.empty(); 55 : 56 : // If all three of these conditions are met, it should be safe to execute 57 32 : const bool is_safe_to_execute = is_owner && is_not_processing && is_empty; 58 : 59 : // Even if it's not normally safe to execute, allow immediate execution if the reactor is shut down 60 32 : const bool immediate = is_safe_to_execute || reactor_is_shut_down(); 61 : 62 : // Always set reactor and push to the queue 63 32 : ACE_Reactor* local_reactor = ACE_Event_Handler::reactor(); 64 32 : command->set_reactor(local_reactor); 65 32 : command_queue_.push_back(command); 66 : 67 : // But depending on whether we're running it immediately or not, we either process or notify 68 32 : if (immediate) { 69 9 : process_command_queue_i(guard); 70 23 : } else if (state_ == RS_NONE) { 71 22 : state_ = RS_NOTIFIED; 72 22 : guard.release(); 73 22 : local_reactor->notify(this); 74 : } 75 64 : return command; 76 32 : } 77 : 78 22 : int ReactorInterceptor::handle_exception(ACE_HANDLE /*fd*/) 79 : { 80 22 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager()); 81 : 82 22 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 83 22 : process_command_queue_i(guard); 84 22 : return 0; 85 22 : } 86 : 87 31 : void ReactorInterceptor::process_command_queue_i(ACE_Guard<ACE_Thread_Mutex>& guard) 88 : { 89 31 : Queue cq; 90 31 : ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(mutex_); 91 : 92 31 : state_ = RS_PROCESSING; 93 31 : if (!command_queue_.empty()) { 94 31 : cq.swap(command_queue_); 95 31 : ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock); 96 63 : for (Queue::const_iterator pos = cq.begin(), limit = cq.end(); pos != limit; ++pos) { 97 32 : (*pos)->execute(); 98 : } 99 31 : } 100 31 : if (!command_queue_.empty()) { 101 0 : state_ = RS_NOTIFIED; 102 0 : ACE_Reactor* const reactor = ACE_Event_Handler::reactor(); 103 0 : guard.release(); 104 0 : reactor->notify(this); 105 : } else { 106 31 : state_ = RS_NONE; 107 : } 108 31 : } 109 : 110 0 : void ReactorInterceptor::reactor(ACE_Reactor *reactor) 111 : { 112 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 113 0 : ACE_Event_Handler::reactor(reactor); 114 0 : } 115 : 116 45 : ACE_Reactor* ReactorInterceptor::reactor() const 117 : { 118 45 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 119 90 : return ACE_Event_Handler::reactor(); 120 45 : } 121 : 122 0 : void RegisterHandler::execute() 123 : { 124 0 : if (reactor()->register_handler(io_handle_, event_handler_, mask_) != 0) { 125 0 : if (log_level >= LogLevel::Error) { 126 0 : ACE_ERROR((LM_ERROR, 127 : "(%P|%t) ERROR: RegisterHandler::execute: failed to register handler for socket %d\n", 128 : io_handle_)); 129 : } 130 : } 131 0 : } 132 : 133 0 : void RemoveHandler::execute() 134 : { 135 0 : if (reactor()->remove_handler(io_handle_, mask_) != 0) { 136 0 : if (log_level >= LogLevel::Error) { 137 0 : ACE_ERROR((LM_ERROR, 138 : "(%P|%t) ERROR: UnregisterHandler::execute: failed to remove handler for socket %d\n", 139 : io_handle_)); 140 : } 141 : } 142 0 : } 143 : 144 : } 145 : } 146 : 147 : OPENDDS_END_VERSIONED_NAMESPACE_DECL