OpenDDS::DCPS::TransportReactorTask Class Reference

#include <TransportReactorTask.h>

Inheritance diagram for OpenDDS::DCPS::TransportReactorTask:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReactorTask:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 TransportReactorTask (bool useAsyncSend)
virtual ~TransportReactorTask ()
virtual int open (void *)
virtual int svc ()
virtual int close (u_long flags=0)
void stop ()
ACE_Reactor * get_reactor ()
const ACE_Reactor * get_reactor () const
ACE_thread_t get_reactor_owner () const
ACE_Proactor * get_proactor ()
const ACE_Proactor * get_proactor () const
void wait_for_startup ()
bool is_shut_down () const

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
 STATE_NOT_RUNNING
 STATE_OPENING
 STATE_RUNNING
enum  State { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING }

Private Attributes

ACE_Barrier barrier_
LockType lock_
State state_
ConditionType condition_
ACE_Reactor * reactor_
ACE_thread_t reactor_owner_
ACE_Proactor * proactor_

Detailed Description

Definition at line 25 of file TransportReactorTask.h.


Member Typedef Documentation

typedef ACE_Condition<LockType> OpenDDS::DCPS::TransportReactorTask::ConditionType [private]

Definition at line 56 of file TransportReactorTask.h.

typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportReactorTask::GuardType [private]

Definition at line 55 of file TransportReactorTask.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportReactorTask::LockType [private]

Definition at line 54 of file TransportReactorTask.h.


Member Enumeration Documentation

enum OpenDDS::DCPS::TransportReactorTask::State [private]

Enumerator:
STATE_NOT_RUNNING 
STATE_OPENING 
STATE_RUNNING 

Definition at line 58 of file TransportReactorTask.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportReactorTask::TransportReactorTask ( bool  useAsyncSend  ) 

Definition at line 21 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, proactor_, and reactor_.

00022   : barrier_(2)
00023   , state_(STATE_NOT_RUNNING)
00024   , condition_(this->lock_)
00025 {
00026   DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00027 
00028 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00029   // Set our reactor and proactor pointers to a new reactor/proactor objects.
00030   if (useAsyncSend) {
00031     this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00032 
00033     ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00034     this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00035     this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00036     return;
00037   }
00038 #else
00039   ACE_UNUSED_ARG(useAsyncSend);
00040 #endif
00041 
00042   this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00043   this->proactor_ = 0;
00044 }

OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask (  )  [virtual]

Definition at line 46 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, proactor_, and reactor_.

00047 {
00048   DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00049 
00050 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00051   if (this->proactor_) {
00052     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00053                                    ACE_Event_Handler::DONT_CALL);
00054     delete this->proactor_;
00055   }
00056 #endif
00057 
00058   delete this->reactor_;
00059 }


Member Function Documentation

int OpenDDS::DCPS::TransportReactorTask::close ( u_long  flags = 0  )  [virtual]

Definition at line 164 of file TransportReactorTask.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), and DBG_ENTRY_LVL.

00165 {
00166   DBG_ENTRY_LVL("TransportReactorTask","close",6);
00167   ACE_UNUSED_ARG(flags);
00168   // This is called after the reactor threads exit.
00169   // We should not set state here since we are not
00170   // sure how many reactor threads we will use.
00171   // If there is one reactor thread then we should
00172   // set the state so the stop will not call
00173   // end_reactor_event_loop.
00174   // If there are multiple reactor threads, we still
00175   // need call end_reactor_event_loop in stop() while
00176   // one reactor thread already exited.
00177 //MJM: Right.
00178 
00179   this->_remove_ref();
00180   return 0;
00181 }

ACE_INLINE const ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor (  )  const

Definition at line 40 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and proactor_.

00041 {
00042   DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6);
00043   return this->proactor_;
00044 }

ACE_INLINE ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor (  ) 

Definition at line 33 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and proactor_.

Referenced by OpenDDS::DCPS::MulticastDataLink::get_proactor().

00034 {
00035   DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6);
00036   return this->proactor_;
00037 }

ACE_INLINE const ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor (  )  const

Definition at line 19 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_.

00020 {
00021   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00022   return this->reactor_;
00023 }

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor (  ) 

Definition at line 12 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_.

Referenced by OpenDDS::DCPS::UdpDataLink::get_reactor(), and OpenDDS::DCPS::MulticastDataLink::get_reactor().

00013 {
00014   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00015   return this->reactor_;
00016 }

ACE_INLINE ACE_thread_t OpenDDS::DCPS::TransportReactorTask::get_reactor_owner (  )  const

Definition at line 26 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_owner_.

00027 {
00028   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00029   return this->reactor_owner_;
00030 }

bool OpenDDS::DCPS::TransportReactorTask::is_shut_down (  )  const [inline]

Definition at line 48 of file TransportReactorTask.h.

00048 { return state_ == STATE_NOT_RUNNING; }

int OpenDDS::DCPS::TransportReactorTask::open ( void *   )  [virtual]

Definition at line 62 of file TransportReactorTask.cpp.

References condition_, DBG_ENTRY_LVL, state_, STATE_NOT_RUNNING, STATE_OPENING, and wait_for_startup().

00063 {
00064   DBG_ENTRY_LVL("TransportReactorTask","open",6);
00065 
00066   GuardType guard(this->lock_);
00067 
00068   // Reset our state.
00069   this->state_ = STATE_NOT_RUNNING;
00070 
00071   // For now, we only support one thread to run the reactor event loop.
00072   // Parts of the logic in this class would need to change to support
00073   // more than one thread running the reactor.
00074 
00075   // Attempt to activate ourselves.  If successful, a new thread will be
00076   // started and it will invoke our svc() method.  Note that we still have
00077   // a hold on our lock while we do this.
00078   if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00079     ACE_ERROR_RETURN((LM_ERROR,
00080                       "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00081                       "itself.\n"),
00082                      -1);
00083   }
00084 
00085   this->wait_for_startup();
00086 
00087   // Here we need to wait until a condition is triggered by the new thread(s)
00088   // that we created.  Note that this will cause us to release the lock and
00089   // wait until the condition_ is signal()'ed.  When it is signaled, the
00090   // condition will attempt to obtain the lock again, and then return to us
00091   // here.  We can then go on.
00092   if (this->state_ == STATE_NOT_RUNNING) {
00093     this->state_ = STATE_OPENING;
00094     this->condition_.wait();
00095   }
00096 
00097   return 0;
00098 }

void OpenDDS::DCPS::TransportReactorTask::stop (  ) 

Definition at line 184 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, reactor_, state_, and STATE_NOT_RUNNING.

00185 {
00186   DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00187   {
00188     GuardType guard(this->lock_);
00189 
00190     if (this->state_ == STATE_NOT_RUNNING) {
00191       // We are already "stopped".  Just return.
00192       return;
00193     }
00194 
00195     this->state_ = STATE_NOT_RUNNING;
00196   }
00197 
00198 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00199   // Remove the proactor handler so the reactor stops forwarding messages.
00200   if (this->proactor_) {
00201     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00202                                    ACE_Event_Handler::DONT_CALL);
00203   }
00204 #endif
00205 
00206   this->reactor_->end_reactor_event_loop();
00207 
00208   // Let's wait for the reactor task's thread to complete before we
00209   // leave this stop method.
00210   this->wait();
00211 }

int OpenDDS::DCPS::TransportReactorTask::svc (  )  [virtual]

Definition at line 101 of file TransportReactorTask.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), condition_, DBG_ENTRY_LVL, reactor_, reactor_owner_, state_, STATE_OPENING, STATE_RUNNING, and wait_for_startup().

00102 {
00103   DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00104 
00105   // First off - We need to obtain our own reference to ourselves such
00106   // that we don't get deleted while still running in our own thread.
00107   // In essence, our current thread "owns" a copy of our reference.
00108   // It's all done with the magic of intrusive reference counting!
00109   this->_add_ref();
00110 
00111   // Ignore all signals to avoid
00112   //     ERROR: <something descriptive> Interrupted system call
00113   // The main thread will handle signals.
00114   sigset_t set;
00115   ACE_OS::sigfillset(&set);
00116   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00117 
00118   // VERY IMPORTANT!Tell the reactor that this task's thread will be
00119   //                  its "owner".
00120   if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00121     ACE_ERROR((LM_ERROR,
00122                "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00123   }
00124   this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00125   this->wait_for_startup();
00126 
00127 
00128   {
00129     // Obtain the lock.  This should only happen once the open() has hit
00130     // the condition_.wait() line of code, and has released the lock.
00131     GuardType guard(this->lock_);
00132 
00133     if (this->state_ == STATE_OPENING) {
00134       // Advance the state.
00135       this->state_ = STATE_RUNNING;
00136 
00137       // Signal the condition_ that we are here.
00138       this->condition_.signal();
00139     }
00140   }
00141 
00142   //TBD: Should the reactor continue running if there are some exceptions
00143   //     are caught while handling events?
00144 //MJM: Put this in a loop with a state conditional and use the state to
00145 //MJM: indicate whether or not to terminate.  But I can think of no
00146 //MJM: reason to have anything in the conditional, so just expire.
00147 //MJM: Nevermind.
00148   try {
00149     // Tell the reactor to handle events.
00150     this->reactor_->run_reactor_event_loop();
00151   } catch (const std::exception& e) {
00152     ACE_ERROR((LM_ERROR,
00153                "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00154                e.what()));
00155   } catch (...) {
00156     ACE_ERROR((LM_ERROR,
00157                "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00158   }
00159 
00160   return 0;
00161 }

void OpenDDS::DCPS::TransportReactorTask::wait_for_startup (  )  [inline]

Definition at line 46 of file TransportReactorTask.h.

Referenced by open(), and svc().

00046 { barrier_.wait(); }


Member Data Documentation

ACE_Barrier OpenDDS::DCPS::TransportReactorTask::barrier_ [private]

Definition at line 60 of file TransportReactorTask.h.

ConditionType OpenDDS::DCPS::TransportReactorTask::condition_ [private]

Definition at line 63 of file TransportReactorTask.h.

Referenced by open(), and svc().

LockType OpenDDS::DCPS::TransportReactorTask::lock_ [private]

Definition at line 61 of file TransportReactorTask.h.

ACE_Proactor* OpenDDS::DCPS::TransportReactorTask::proactor_ [private]

Definition at line 66 of file TransportReactorTask.h.

Referenced by get_proactor(), TransportReactorTask(), and ~TransportReactorTask().

ACE_Reactor* OpenDDS::DCPS::TransportReactorTask::reactor_ [private]

Definition at line 64 of file TransportReactorTask.h.

Referenced by get_reactor(), stop(), svc(), TransportReactorTask(), and ~TransportReactorTask().

ACE_thread_t OpenDDS::DCPS::TransportReactorTask::reactor_owner_ [private]

Definition at line 65 of file TransportReactorTask.h.

Referenced by get_reactor_owner(), and svc().

State OpenDDS::DCPS::TransportReactorTask::state_ [private]

Definition at line 62 of file TransportReactorTask.h.

Referenced by open(), stop(), and svc().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:31 2016 for OpenDDS by  doxygen 1.4.7