Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/ 9 : 10 : #include "ReactorTask.h" 11 : #if !defined (__ACE_INLINE__) 12 : #include "ReactorTask.inl" 13 : #endif /* __ACE_INLINE__ */ 14 : 15 : #include "Service_Participant.h" 16 : 17 : #include <ace/Select_Reactor.h> 18 : #include <ace/WFMO_Reactor.h> 19 : #include <ace/Proactor.h> 20 : #include <ace/Proactor_Impl.h> 21 : #include <ace/WIN32_Proactor.h> 22 : #include <ace/OS_NS_Thread.h> 23 : 24 : #include <exception> 25 : #include <cstring> 26 : 27 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 28 : 29 : namespace OpenDDS { 30 : namespace DCPS { 31 : 32 1 : ReactorTask::ReactorTask(bool useAsyncSend) 33 1 : : condition_(lock_) 34 1 : , state_(STATE_UNINITIALIZED) 35 1 : , reactor_(0) 36 1 : , reactor_owner_(ACE_OS::NULL_thread) 37 1 : , proactor_(0) 38 : #ifdef OPENDDS_REACTOR_TASK_ASYNC 39 : , use_async_send_(useAsyncSend) 40 : #endif 41 1 : , timer_queue_(0) 42 2 : , thread_status_manager_(0) 43 : { 44 : ACE_UNUSED_ARG(useAsyncSend); 45 1 : } 46 : 47 1 : ReactorTask::~ReactorTask() 48 : { 49 1 : cleanup(); 50 1 : } 51 : 52 17 : void ReactorTask::wait_for_startup_i() const 53 : { 54 17 : while (state_ == STATE_UNINITIALIZED || state_ == STATE_OPENING) { 55 0 : condition_.wait(thread_status_manager_ ? 56 : *thread_status_manager_ : 57 0 : TheServiceParticipant->get_thread_status_manager()); 58 : } 59 17 : } 60 : 61 10 : void ReactorTask::cleanup() 62 : { 63 : #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 64 10 : if (proactor_) { 65 0 : reactor_->remove_handler( 66 0 : proactor_->implementation()->get_handle(), 67 : ACE_Event_Handler::DONT_CALL); 68 0 : delete proactor_; 69 0 : proactor_ = 0; 70 : } 71 : #endif 72 : 73 10 : delete reactor_; 74 10 : reactor_ = 0; 75 10 : delete timer_queue_; 76 10 : timer_queue_ = 0; 77 10 : } 78 : 79 9 : int ReactorTask::open_reactor_task(void*, 80 : ThreadStatusManager* thread_status_manager, 81 : const String& name) 82 : { 83 9 : GuardType guard(lock_); 84 : 85 : // If we've already been opened, let's clean up the old stuff 86 9 : cleanup(); 87 : 88 : // thread status reporting support 89 9 : thread_status_manager_ = thread_status_manager; 90 9 : name_ = name; 91 : 92 : // Set our reactor and proactor pointers to a new reactor/proactor objects. 93 : #ifdef OPENDDS_REACTOR_TASK_ASYNC 94 : if (use_async_send_ && !reactor_) { 95 : reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1); 96 : 97 : ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1); 98 : proactor_ = new ACE_Proactor(proactor_impl, 1); 99 : reactor_->register_handler(proactor_impl, proactor_impl->get_handle()); 100 : } else 101 : #endif 102 9 : if (!reactor_) { 103 9 : reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true); 104 9 : proactor_ = 0; 105 : } 106 : 107 9 : if (!timer_queue_) { 108 9 : timer_queue_ = new TimerQueueType(); 109 9 : reactor_->timer_queue(timer_queue_); 110 : } 111 : 112 9 : state_ = STATE_OPENING; 113 9 : condition_.notify_all(); 114 : 115 9 : if (activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) { 116 0 : ACE_ERROR_RETURN((LM_ERROR, 117 : "(%P|%t) ERROR: ReactorTask Failed to activate " 118 : "itself.\n"), 119 : -1); 120 : } 121 : 122 18 : while (state_ != STATE_RUNNING) { 123 9 : condition_.wait(*thread_status_manager); 124 : } 125 : 126 9 : return 0; 127 9 : } 128 : 129 9 : int ReactorTask::svc() 130 : { 131 9 : ThreadStatusManager::Start s(*thread_status_manager_, name_); 132 : 133 : { 134 9 : GuardType guard(lock_); 135 : 136 : // First off - We need to obtain our own reference to ourselves such 137 : // that we don't get deleted while still running in our own thread. 138 : // In essence, our current thread "owns" a copy of our reference. 139 : // It's all done with the magic of intrusive reference counting! 140 9 : _add_ref(); 141 : 142 : // Ignore all signals to avoid 143 : // ERROR: <something descriptive> Interrupted system call 144 : // The main thread will handle signals. 145 : sigset_t set; 146 9 : ACE_OS::sigfillset(&set); 147 9 : ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 148 : 149 : // Tell the reactor that this thread will be its owner 150 9 : if (reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) { 151 0 : ACE_ERROR((LM_ERROR, 152 : "(%P|%t) ERROR: Failed to change the reactor's owner().\n")); 153 : } 154 9 : reactor_owner_ = ACE_Thread_Manager::instance()->thr_self(); 155 : 156 9 : interceptor_ = make_rch<Interceptor>(this, reactor_, reactor_owner_); 157 : 158 : // Advance the state. 159 9 : state_ = STATE_RUNNING; 160 9 : condition_.notify_all(); 161 9 : } 162 : 163 : // Tell the reactor to handle events. 164 9 : if (thread_status_manager_->update_thread_status()) { 165 0 : while (state_ == STATE_RUNNING) { 166 0 : ACE_Time_Value t = thread_status_manager_->thread_status_interval().value(); 167 0 : ThreadStatusManager::Sleeper sleeper(*thread_status_manager_); 168 0 : reactor_->run_reactor_event_loop(t, 0); 169 0 : } 170 : 171 : } else { 172 9 : reactor_->run_reactor_event_loop(); 173 : } 174 : 175 9 : return 0; 176 9 : } 177 : 178 9 : int ReactorTask::close(u_long flags) 179 : { 180 : ACE_UNUSED_ARG(flags); 181 : // This is called after the reactor threads exit. 182 : // We should not set state here since we are not 183 : // sure how many reactor threads we will use. 184 : // If there is one reactor thread then we should 185 : // set the state so the stop will not call 186 : // end_reactor_event_loop. 187 : // If there are multiple reactor threads, we still 188 : // need call end_reactor_event_loop in stop() while 189 : // one reactor thread already exited. 190 : //MJM: Right. 191 : 192 9 : _remove_ref(); 193 9 : return 0; 194 : } 195 : 196 9 : void ReactorTask::stop() 197 : { 198 9 : ACE_Reactor* reactor = 0; 199 : { 200 9 : GuardType guard(lock_); 201 : 202 9 : if (state_ == STATE_UNINITIALIZED || state_ == STATE_SHUT_DOWN) { 203 : // We are already "stopped". Just return. 204 0 : return; 205 : } 206 : 207 9 : state_ = STATE_SHUT_DOWN; 208 : 209 : #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 210 : // Remove the proactor handler so the reactor stops forwarding messages. 211 9 : if (proactor_) { 212 0 : reactor_->remove_handler( 213 0 : proactor_->implementation()->get_handle(), 214 : ACE_Event_Handler::DONT_CALL); 215 : } 216 : #endif 217 9 : reactor = reactor_; 218 9 : } 219 : 220 9 : if (reactor) { 221 : // We can't hold the lock when we call this, because the reactor threads may need to 222 : // access the lock as part of normal execution before they return to the reactor control loop 223 9 : reactor->end_reactor_event_loop(); 224 : } 225 : 226 : { 227 9 : GuardType guard(lock_); 228 : 229 : // In the future, we will likely want to replace this assert with a new "SHUTTING_DOWN" state 230 : // which can be used to delay any potential new calls to open_reactor_task() 231 9 : OPENDDS_ASSERT(state_ == STATE_SHUT_DOWN); 232 : 233 : // Let's wait for the reactor task's thread to complete before we 234 : // leave this stop method. 235 9 : if (thread_status_manager_) { 236 9 : ThreadStatusManager::Sleeper sleeper(*thread_status_manager_); 237 9 : wait(); 238 : 239 : // Reset the thread manager in case it goes away before the next open. 240 9 : thr_mgr(0); 241 9 : } 242 9 : } 243 : } 244 : 245 0 : void ReactorTask::reactor(ACE_Reactor* reactor) 246 : { 247 0 : ACE_Event_Handler::reactor(reactor); 248 0 : } 249 : 250 0 : ACE_Reactor* ReactorTask::reactor() const 251 : { 252 0 : return ACE_Event_Handler::reactor(); 253 : } 254 : 255 : } // namespace DCPS 256 : } // namespace OpenDDS 257 : 258 : OPENDDS_END_VERSIONED_NAMESPACE_DECL