#include <TransportReactorTask.h>
Public Member Functions | |
TransportReactorTask (bool useAsyncSend) | |
virtual | ~TransportReactorTask () |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long flags=0) |
void | stop () |
ACE_Reactor * | get_reactor () |
const ACE_Reactor * | get_reactor () const |
ACE_thread_t | get_reactor_owner () const |
ACE_Proactor * | get_proactor () |
const ACE_Proactor * | get_proactor () const |
void | wait_for_startup () |
bool | is_shut_down () const |
Private Types | |
enum | State { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING } |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
Private Attributes | |
ACE_Barrier | barrier_ |
LockType | lock_ |
State | state_ |
ConditionType | condition_ |
ACE_Reactor * | reactor_ |
ACE_thread_t | reactor_owner_ |
ACE_Proactor * | proactor_ |
Definition at line 29 of file TransportReactorTask.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::TransportReactorTask::ConditionType [private] |
Definition at line 60 of file TransportReactorTask.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportReactorTask::GuardType [private] |
Definition at line 59 of file TransportReactorTask.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportReactorTask::LockType [private] |
Definition at line 58 of file TransportReactorTask.h.
enum OpenDDS::DCPS::TransportReactorTask::State [private] |
Definition at line 62 of file TransportReactorTask.h.
00062 { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING };
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TransportReactorTask::TransportReactorTask | ( | bool | useAsyncSend | ) |
Definition at line 23 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, ACE_WIN32_Proactor::get_handle(), NULL_thread, proactor_, reactor_, and ACE_Reactor::register_handler().
00024 : barrier_(2) 00025 , state_(STATE_NOT_RUNNING) 00026 , condition_(this->lock_) 00027 , reactor_owner_(ACE_OS::NULL_thread) 00028 { 00029 DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6); 00030 00031 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00032 // Set our reactor and proactor pointers to a new reactor/proactor objects. 00033 if (useAsyncSend) { 00034 this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1); 00035 00036 ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1); 00037 this->proactor_ = new ACE_Proactor(proactor_impl, 1); 00038 this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle()); 00039 return; 00040 } 00041 #else 00042 ACE_UNUSED_ARG(useAsyncSend); 00043 #endif 00044 00045 this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1); 00046 this->proactor_ = 0; 00047 }
OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask | ( | ) | [virtual] |
Definition at line 49 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, ACE_Proactor_Impl::get_handle(), ACE_Proactor::implementation(), proactor_, reactor_, and ACE_Reactor::remove_handler().
00050 { 00051 DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6); 00052 00053 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 00054 if (this->proactor_) { 00055 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(), 00056 ACE_Event_Handler::DONT_CALL); 00057 delete this->proactor_; 00058 } 00059 #endif 00060 00061 delete this->reactor_; 00062 }
int OpenDDS::DCPS::TransportReactorTask::close | ( | u_long | flags = 0 |
) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 167 of file TransportReactorTask.cpp.
References OpenDDS::DCPS::RcObject::_remove_ref(), and DBG_ENTRY_LVL.
00168 { 00169 DBG_ENTRY_LVL("TransportReactorTask","close",6); 00170 ACE_UNUSED_ARG(flags); 00171 // This is called after the reactor threads exit. 00172 // We should not set state here since we are not 00173 // sure how many reactor threads we will use. 00174 // If there is one reactor thread then we should 00175 // set the state so the stop will not call 00176 // end_reactor_event_loop. 00177 // If there are multiple reactor threads, we still 00178 // need call end_reactor_event_loop in stop() while 00179 // one reactor thread already exited. 00180 //MJM: Right. 00181 00182 this->_remove_ref(); 00183 return 0; 00184 }
ACE_INLINE const ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor | ( | ) | const |
Definition at line 42 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and proactor_.
00043 { 00044 DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6); 00045 return this->proactor_; 00046 }
ACE_INLINE ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor | ( | ) |
Definition at line 35 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and proactor_.
Referenced by OpenDDS::DCPS::MulticastDataLink::get_proactor().
00036 { 00037 DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6); 00038 return this->proactor_; 00039 }
ACE_INLINE const ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor | ( | void | ) | const |
Definition at line 21 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_.
00022 { 00023 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00024 return this->reactor_; 00025 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor | ( | void | ) |
Definition at line 14 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_.
Referenced by OpenDDS::DCPS::UdpDataLink::get_reactor(), and OpenDDS::DCPS::MulticastDataLink::get_reactor().
00015 { 00016 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00017 return this->reactor_; 00018 }
ACE_INLINE ACE_thread_t OpenDDS::DCPS::TransportReactorTask::get_reactor_owner | ( | ) | const |
Definition at line 28 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_owner_.
00029 { 00030 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00031 return this->reactor_owner_; 00032 }
bool OpenDDS::DCPS::TransportReactorTask::is_shut_down | ( | ) | const [inline] |
Definition at line 52 of file TransportReactorTask.h.
00052 { return state_ == STATE_NOT_RUNNING; }
int OpenDDS::DCPS::TransportReactorTask::open | ( | void * | ) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 65 of file TransportReactorTask.cpp.
References ACE_Task_Base::activate(), condition_, DBG_ENTRY_LVL, LM_ERROR, lock_, state_, STATE_NOT_RUNNING, STATE_OPENING, ACE_Condition< MUTEX >::wait(), and wait_for_startup().
00066 { 00067 DBG_ENTRY_LVL("TransportReactorTask","open",6); 00068 00069 GuardType guard(this->lock_); 00070 00071 // Reset our state. 00072 this->state_ = STATE_NOT_RUNNING; 00073 00074 // For now, we only support one thread to run the reactor event loop. 00075 // Parts of the logic in this class would need to change to support 00076 // more than one thread running the reactor. 00077 00078 // Attempt to activate ourselves. If successful, a new thread will be 00079 // started and it will invoke our svc() method. Note that we still have 00080 // a hold on our lock while we do this. 00081 if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) { 00082 ACE_ERROR_RETURN((LM_ERROR, 00083 "(%P|%t) ERROR: TransportReactorTask Failed to activate " 00084 "itself.\n"), 00085 -1); 00086 } 00087 00088 this->wait_for_startup(); 00089 00090 // Here we need to wait until a condition is triggered by the new thread(s) 00091 // that we created. Note that this will cause us to release the lock and 00092 // wait until the condition_ is signal()'ed. When it is signaled, the 00093 // condition will attempt to obtain the lock again, and then return to us 00094 // here. We can then go on. 00095 if (this->state_ == STATE_NOT_RUNNING) { 00096 this->state_ = STATE_OPENING; 00097 this->condition_.wait(); 00098 } 00099 00100 return 0; 00101 }
void OpenDDS::DCPS::TransportReactorTask::stop | ( | void | ) |
Definition at line 187 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, ACE_Reactor::end_reactor_event_loop(), ACE_Proactor_Impl::get_handle(), ACE_Proactor::implementation(), lock_, proactor_, reactor_, ACE_Reactor::remove_handler(), state_, STATE_NOT_RUNNING, and ACE_Task_Base::wait().
00188 { 00189 DBG_ENTRY_LVL("TransportReactorTask","stop",6); 00190 { 00191 GuardType guard(this->lock_); 00192 00193 if (this->state_ == STATE_NOT_RUNNING) { 00194 // We are already "stopped". Just return. 00195 return; 00196 } 00197 00198 this->state_ = STATE_NOT_RUNNING; 00199 } 00200 00201 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 00202 // Remove the proactor handler so the reactor stops forwarding messages. 00203 if (this->proactor_) { 00204 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(), 00205 ACE_Event_Handler::DONT_CALL); 00206 } 00207 #endif 00208 00209 this->reactor_->end_reactor_event_loop(); 00210 00211 // Let's wait for the reactor task's thread to complete before we 00212 // leave this stop method. 00213 this->wait(); 00214 }
int OpenDDS::DCPS::TransportReactorTask::svc | ( | void | ) | [virtual] |
Reimplemented from ACE_Task_Base.
Definition at line 104 of file TransportReactorTask.cpp.
References OpenDDS::DCPS::RcObject::_add_ref(), condition_, DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), LM_ERROR, lock_, ACE_Reactor::owner(), reactor_, reactor_owner_, ACE_Reactor::run_reactor_event_loop(), SIG_SETMASK, ACE_OS::sigfillset(), ACE_Condition< MUTEX >::signal(), sigset_t, state_, STATE_OPENING, STATE_RUNNING, ACE_Thread_Manager::thr_self(), ACE_OS::thr_sigsetmask(), and wait_for_startup().
00105 { 00106 DBG_ENTRY_LVL("TransportReactorTask","svc",6); 00107 00108 // First off - We need to obtain our own reference to ourselves such 00109 // that we don't get deleted while still running in our own thread. 00110 // In essence, our current thread "owns" a copy of our reference. 00111 // It's all done with the magic of intrusive reference counting! 00112 this->_add_ref(); 00113 00114 // Ignore all signals to avoid 00115 // ERROR: <something descriptive> Interrupted system call 00116 // The main thread will handle signals. 00117 sigset_t set; 00118 ACE_OS::sigfillset(&set); 00119 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00120 00121 // VERY IMPORTANT!Tell the reactor that this task's thread will be 00122 // its "owner". 00123 if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) { 00124 ACE_ERROR((LM_ERROR, 00125 "(%P|%t) ERROR: Failed to change the reactor's owner().\n")); 00126 } 00127 this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self(); 00128 this->wait_for_startup(); 00129 00130 00131 { 00132 // Obtain the lock. This should only happen once the open() has hit 00133 // the condition_.wait() line of code, and has released the lock. 00134 GuardType guard(this->lock_); 00135 00136 if (this->state_ == STATE_OPENING) { 00137 // Advance the state. 00138 this->state_ = STATE_RUNNING; 00139 00140 // Signal the condition_ that we are here. 00141 this->condition_.signal(); 00142 } 00143 } 00144 00145 //TBD: Should the reactor continue running if there are some exceptions 00146 // are caught while handling events? 00147 //MJM: Put this in a loop with a state conditional and use the state to 00148 //MJM: indicate whether or not to terminate. But I can think of no 00149 //MJM: reason to have anything in the conditional, so just expire. 00150 //MJM: Nevermind. 00151 try { 00152 // Tell the reactor to handle events. 00153 this->reactor_->run_reactor_event_loop(); 00154 } catch (const std::exception& e) { 00155 ACE_ERROR((LM_ERROR, 00156 "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n", 00157 e.what())); 00158 } catch (...) { 00159 ACE_ERROR((LM_ERROR, 00160 "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n")); 00161 } 00162 00163 return 0; 00164 }
void OpenDDS::DCPS::TransportReactorTask::wait_for_startup | ( | ) | [inline] |
Definition at line 64 of file TransportReactorTask.h.
Definition at line 67 of file TransportReactorTask.h.
Definition at line 65 of file TransportReactorTask.h.
Definition at line 70 of file TransportReactorTask.h.
Referenced by get_proactor(), stop(), TransportReactorTask(), and ~TransportReactorTask().
Reimplemented from ACE_Event_Handler.
Definition at line 68 of file TransportReactorTask.h.
Referenced by get_reactor(), stop(), svc(), TransportReactorTask(), and ~TransportReactorTask().
Definition at line 69 of file TransportReactorTask.h.
Referenced by get_reactor_owner(), and svc().
Definition at line 66 of file TransportReactorTask.h.