#include <TransportReactorTask.h>
Inheritance diagram for OpenDDS::DCPS::TransportReactorTask:
Public Member Functions | |
TransportReactorTask (bool useAsyncSend) | |
virtual | ~TransportReactorTask () |
virtual int | open (void *) |
virtual int | svc () |
virtual int | close (u_long flags=0) |
void | stop () |
ACE_Reactor * | get_reactor () |
const ACE_Reactor * | get_reactor () const |
ACE_thread_t | get_reactor_owner () const |
ACE_Proactor * | get_proactor () |
const ACE_Proactor * | get_proactor () const |
void | wait_for_startup () |
bool | is_shut_down () const |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
STATE_NOT_RUNNING | |
STATE_OPENING | |
STATE_RUNNING | |
enum | State { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING } |
Private Attributes | |
ACE_Barrier | barrier_ |
LockType | lock_ |
State | state_ |
ConditionType | condition_ |
ACE_Reactor * | reactor_ |
ACE_thread_t | reactor_owner_ |
ACE_Proactor * | proactor_ |
Definition at line 25 of file TransportReactorTask.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::TransportReactorTask::ConditionType [private] |
Definition at line 56 of file TransportReactorTask.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportReactorTask::GuardType [private] |
Definition at line 55 of file TransportReactorTask.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportReactorTask::LockType [private] |
Definition at line 54 of file TransportReactorTask.h.
enum OpenDDS::DCPS::TransportReactorTask::State [private] |
Definition at line 58 of file TransportReactorTask.h.
00058 { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING };
OpenDDS::DCPS::TransportReactorTask::TransportReactorTask | ( | bool | useAsyncSend | ) |
Definition at line 21 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, proactor_, and reactor_.
00022 : barrier_(2) 00023 , state_(STATE_NOT_RUNNING) 00024 , condition_(this->lock_) 00025 { 00026 DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6); 00027 00028 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO) 00029 // Set our reactor and proactor pointers to a new reactor/proactor objects. 00030 if (useAsyncSend) { 00031 this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1); 00032 00033 ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1); 00034 this->proactor_ = new ACE_Proactor(proactor_impl, 1); 00035 this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle()); 00036 return; 00037 } 00038 #else 00039 ACE_UNUSED_ARG(useAsyncSend); 00040 #endif 00041 00042 this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1); 00043 this->proactor_ = 0; 00044 }
OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask | ( | ) | [virtual] |
Definition at line 46 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, proactor_, and reactor_.
00047 { 00048 DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6); 00049 00050 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 00051 if (this->proactor_) { 00052 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(), 00053 ACE_Event_Handler::DONT_CALL); 00054 delete this->proactor_; 00055 } 00056 #endif 00057 00058 delete this->reactor_; 00059 }
int OpenDDS::DCPS::TransportReactorTask::close | ( | u_long | flags = 0 |
) | [virtual] |
Definition at line 164 of file TransportReactorTask.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), and DBG_ENTRY_LVL.
00165 { 00166 DBG_ENTRY_LVL("TransportReactorTask","close",6); 00167 ACE_UNUSED_ARG(flags); 00168 // This is called after the reactor threads exit. 00169 // We should not set state here since we are not 00170 // sure how many reactor threads we will use. 00171 // If there is one reactor thread then we should 00172 // set the state so the stop will not call 00173 // end_reactor_event_loop. 00174 // If there are multiple reactor threads, we still 00175 // need call end_reactor_event_loop in stop() while 00176 // one reactor thread already exited. 00177 //MJM: Right. 00178 00179 this->_remove_ref(); 00180 return 0; 00181 }
ACE_INLINE const ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor | ( | ) | const |
Definition at line 40 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and proactor_.
00041 { 00042 DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6); 00043 return this->proactor_; 00044 }
ACE_INLINE ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor | ( | ) |
Definition at line 33 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and proactor_.
Referenced by OpenDDS::DCPS::MulticastDataLink::get_proactor().
00034 { 00035 DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6); 00036 return this->proactor_; 00037 }
ACE_INLINE const ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor | ( | ) | const |
Definition at line 19 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_.
00020 { 00021 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00022 return this->reactor_; 00023 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor | ( | ) |
Definition at line 12 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_.
Referenced by OpenDDS::DCPS::UdpDataLink::get_reactor(), and OpenDDS::DCPS::MulticastDataLink::get_reactor().
00013 { 00014 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00015 return this->reactor_; 00016 }
ACE_INLINE ACE_thread_t OpenDDS::DCPS::TransportReactorTask::get_reactor_owner | ( | ) | const |
Definition at line 26 of file TransportReactorTask.inl.
References DBG_ENTRY_LVL, and reactor_owner_.
00027 { 00028 DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6); 00029 return this->reactor_owner_; 00030 }
bool OpenDDS::DCPS::TransportReactorTask::is_shut_down | ( | ) | const [inline] |
int OpenDDS::DCPS::TransportReactorTask::open | ( | void * | ) | [virtual] |
Definition at line 62 of file TransportReactorTask.cpp.
References condition_, DBG_ENTRY_LVL, state_, STATE_NOT_RUNNING, STATE_OPENING, and wait_for_startup().
00063 { 00064 DBG_ENTRY_LVL("TransportReactorTask","open",6); 00065 00066 GuardType guard(this->lock_); 00067 00068 // Reset our state. 00069 this->state_ = STATE_NOT_RUNNING; 00070 00071 // For now, we only support one thread to run the reactor event loop. 00072 // Parts of the logic in this class would need to change to support 00073 // more than one thread running the reactor. 00074 00075 // Attempt to activate ourselves. If successful, a new thread will be 00076 // started and it will invoke our svc() method. Note that we still have 00077 // a hold on our lock while we do this. 00078 if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) { 00079 ACE_ERROR_RETURN((LM_ERROR, 00080 "(%P|%t) ERROR: TransportReactorTask Failed to activate " 00081 "itself.\n"), 00082 -1); 00083 } 00084 00085 this->wait_for_startup(); 00086 00087 // Here we need to wait until a condition is triggered by the new thread(s) 00088 // that we created. Note that this will cause us to release the lock and 00089 // wait until the condition_ is signal()'ed. When it is signaled, the 00090 // condition will attempt to obtain the lock again, and then return to us 00091 // here. We can then go on. 00092 if (this->state_ == STATE_NOT_RUNNING) { 00093 this->state_ = STATE_OPENING; 00094 this->condition_.wait(); 00095 } 00096 00097 return 0; 00098 }
void OpenDDS::DCPS::TransportReactorTask::stop | ( | ) |
Definition at line 184 of file TransportReactorTask.cpp.
References DBG_ENTRY_LVL, reactor_, state_, and STATE_NOT_RUNNING.
00185 { 00186 DBG_ENTRY_LVL("TransportReactorTask","stop",6); 00187 { 00188 GuardType guard(this->lock_); 00189 00190 if (this->state_ == STATE_NOT_RUNNING) { 00191 // We are already "stopped". Just return. 00192 return; 00193 } 00194 00195 this->state_ = STATE_NOT_RUNNING; 00196 } 00197 00198 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS) 00199 // Remove the proactor handler so the reactor stops forwarding messages. 00200 if (this->proactor_) { 00201 this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(), 00202 ACE_Event_Handler::DONT_CALL); 00203 } 00204 #endif 00205 00206 this->reactor_->end_reactor_event_loop(); 00207 00208 // Let's wait for the reactor task's thread to complete before we 00209 // leave this stop method. 00210 this->wait(); 00211 }
int OpenDDS::DCPS::TransportReactorTask::svc | ( | ) | [virtual] |
Definition at line 101 of file TransportReactorTask.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), condition_, DBG_ENTRY_LVL, reactor_, reactor_owner_, state_, STATE_OPENING, STATE_RUNNING, and wait_for_startup().
00102 { 00103 DBG_ENTRY_LVL("TransportReactorTask","svc",6); 00104 00105 // First off - We need to obtain our own reference to ourselves such 00106 // that we don't get deleted while still running in our own thread. 00107 // In essence, our current thread "owns" a copy of our reference. 00108 // It's all done with the magic of intrusive reference counting! 00109 this->_add_ref(); 00110 00111 // Ignore all signals to avoid 00112 // ERROR: <something descriptive> Interrupted system call 00113 // The main thread will handle signals. 00114 sigset_t set; 00115 ACE_OS::sigfillset(&set); 00116 ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL); 00117 00118 // VERY IMPORTANT!Tell the reactor that this task's thread will be 00119 // its "owner". 00120 if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) { 00121 ACE_ERROR((LM_ERROR, 00122 "(%P|%t) ERROR: Failed to change the reactor's owner().\n")); 00123 } 00124 this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self(); 00125 this->wait_for_startup(); 00126 00127 00128 { 00129 // Obtain the lock. This should only happen once the open() has hit 00130 // the condition_.wait() line of code, and has released the lock. 00131 GuardType guard(this->lock_); 00132 00133 if (this->state_ == STATE_OPENING) { 00134 // Advance the state. 00135 this->state_ = STATE_RUNNING; 00136 00137 // Signal the condition_ that we are here. 00138 this->condition_.signal(); 00139 } 00140 } 00141 00142 //TBD: Should the reactor continue running if there are some exceptions 00143 // are caught while handling events? 00144 //MJM: Put this in a loop with a state conditional and use the state to 00145 //MJM: indicate whether or not to terminate. But I can think of no 00146 //MJM: reason to have anything in the conditional, so just expire. 00147 //MJM: Nevermind. 00148 try { 00149 // Tell the reactor to handle events. 00150 this->reactor_->run_reactor_event_loop(); 00151 } catch (const std::exception& e) { 00152 ACE_ERROR((LM_ERROR, 00153 "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n", 00154 e.what())); 00155 } catch (...) { 00156 ACE_ERROR((LM_ERROR, 00157 "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n")); 00158 } 00159 00160 return 0; 00161 }
void OpenDDS::DCPS::TransportReactorTask::wait_for_startup | ( | ) | [inline] |
Definition at line 46 of file TransportReactorTask.h.
Referenced by open(), and svc().
00046 { barrier_.wait(); }
ACE_Barrier OpenDDS::DCPS::TransportReactorTask::barrier_ [private] |
Definition at line 60 of file TransportReactorTask.h.
Definition at line 61 of file TransportReactorTask.h.
ACE_Proactor* OpenDDS::DCPS::TransportReactorTask::proactor_ [private] |
Definition at line 66 of file TransportReactorTask.h.
Referenced by get_proactor(), TransportReactorTask(), and ~TransportReactorTask().
ACE_Reactor* OpenDDS::DCPS::TransportReactorTask::reactor_ [private] |
Definition at line 64 of file TransportReactorTask.h.
Referenced by get_reactor(), stop(), svc(), TransportReactorTask(), and ~TransportReactorTask().
ACE_thread_t OpenDDS::DCPS::TransportReactorTask::reactor_owner_ [private] |