OpenDDS::DCPS::TransportReactorTask Class Reference

#include <TransportReactorTask.h>

Inheritance diagram for OpenDDS::DCPS::TransportReactorTask:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReactorTask:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 TransportReactorTask (bool useAsyncSend)
virtual ~TransportReactorTask ()
virtual int open (void *)
virtual int svc ()
virtual int close (u_long flags=0)
void stop ()
ACE_Reactorget_reactor ()
const ACE_Reactorget_reactor () const
ACE_thread_t get_reactor_owner () const
ACE_Proactorget_proactor ()
const ACE_Proactorget_proactor () const
void wait_for_startup ()
bool is_shut_down () const

Private Types

enum  State { STATE_NOT_RUNNING, STATE_OPENING, STATE_RUNNING }
typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType

Private Attributes

ACE_Barrier barrier_
LockType lock_
State state_
ConditionType condition_
ACE_Reactorreactor_
ACE_thread_t reactor_owner_
ACE_Proactorproactor_

Detailed Description

Definition at line 29 of file TransportReactorTask.h.


Member Typedef Documentation

Definition at line 60 of file TransportReactorTask.h.

Definition at line 59 of file TransportReactorTask.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportReactorTask::LockType [private]

Definition at line 58 of file TransportReactorTask.h.


Member Enumeration Documentation

Enumerator:
STATE_NOT_RUNNING 
STATE_OPENING 
STATE_RUNNING 

Definition at line 62 of file TransportReactorTask.h.


Constructor & Destructor Documentation

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TransportReactorTask::TransportReactorTask ( bool  useAsyncSend  ) 

Definition at line 23 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, ACE_WIN32_Proactor::get_handle(), NULL_thread, proactor_, reactor_, and ACE_Reactor::register_handler().

00024   : barrier_(2)
00025   , state_(STATE_NOT_RUNNING)
00026   , condition_(this->lock_)
00027   , reactor_owner_(ACE_OS::NULL_thread)
00028 {
00029   DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00030 
00031 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00032   // Set our reactor and proactor pointers to a new reactor/proactor objects.
00033   if (useAsyncSend) {
00034     this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00035 
00036     ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00037     this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00038     this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00039     return;
00040   }
00041 #else
00042   ACE_UNUSED_ARG(useAsyncSend);
00043 #endif
00044 
00045   this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00046   this->proactor_ = 0;
00047 }

Here is the call graph for this function:

OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask (  )  [virtual]

Definition at line 49 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, ACE_Proactor_Impl::get_handle(), ACE_Proactor::implementation(), proactor_, reactor_, and ACE_Reactor::remove_handler().

00050 {
00051   DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00052 
00053 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00054   if (this->proactor_) {
00055     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00056                                    ACE_Event_Handler::DONT_CALL);
00057     delete this->proactor_;
00058   }
00059 #endif
00060 
00061   delete this->reactor_;
00062 }

Here is the call graph for this function:


Member Function Documentation

int OpenDDS::DCPS::TransportReactorTask::close ( u_long  flags = 0  )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 167 of file TransportReactorTask.cpp.

References OpenDDS::DCPS::RcObject::_remove_ref(), and DBG_ENTRY_LVL.

00168 {
00169   DBG_ENTRY_LVL("TransportReactorTask","close",6);
00170   ACE_UNUSED_ARG(flags);
00171   // This is called after the reactor threads exit.
00172   // We should not set state here since we are not
00173   // sure how many reactor threads we will use.
00174   // If there is one reactor thread then we should
00175   // set the state so the stop will not call
00176   // end_reactor_event_loop.
00177   // If there are multiple reactor threads, we still
00178   // need call end_reactor_event_loop in stop() while
00179   // one reactor thread already exited.
00180 //MJM: Right.
00181 
00182   this->_remove_ref();
00183   return 0;
00184 }

Here is the call graph for this function:

ACE_INLINE const ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor (  )  const

Definition at line 42 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and proactor_.

00043 {
00044   DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6);
00045   return this->proactor_;
00046 }

ACE_INLINE ACE_Proactor * OpenDDS::DCPS::TransportReactorTask::get_proactor (  ) 

Definition at line 35 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and proactor_.

Referenced by OpenDDS::DCPS::MulticastDataLink::get_proactor().

00036 {
00037   DBG_ENTRY_LVL("TransportReactorTask","get_proactor",6);
00038   return this->proactor_;
00039 }

Here is the caller graph for this function:

ACE_INLINE const ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor ( void   )  const

Definition at line 21 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_.

00022 {
00023   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00024   return this->reactor_;
00025 }

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE ACE_Reactor * OpenDDS::DCPS::TransportReactorTask::get_reactor ( void   ) 

Definition at line 14 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_.

Referenced by OpenDDS::DCPS::UdpDataLink::get_reactor(), and OpenDDS::DCPS::MulticastDataLink::get_reactor().

00015 {
00016   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00017   return this->reactor_;
00018 }

Here is the caller graph for this function:

ACE_INLINE ACE_thread_t OpenDDS::DCPS::TransportReactorTask::get_reactor_owner (  )  const

Definition at line 28 of file TransportReactorTask.inl.

References DBG_ENTRY_LVL, and reactor_owner_.

00029 {
00030   DBG_ENTRY_LVL("TransportReactorTask","get_reactor",6);
00031   return this->reactor_owner_;
00032 }

bool OpenDDS::DCPS::TransportReactorTask::is_shut_down (  )  const [inline]

Definition at line 52 of file TransportReactorTask.h.

00052 { return state_ == STATE_NOT_RUNNING; }

int OpenDDS::DCPS::TransportReactorTask::open ( void *   )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 65 of file TransportReactorTask.cpp.

References ACE_Task_Base::activate(), condition_, DBG_ENTRY_LVL, LM_ERROR, lock_, state_, STATE_NOT_RUNNING, STATE_OPENING, ACE_Condition< MUTEX >::wait(), and wait_for_startup().

00066 {
00067   DBG_ENTRY_LVL("TransportReactorTask","open",6);
00068 
00069   GuardType guard(this->lock_);
00070 
00071   // Reset our state.
00072   this->state_ = STATE_NOT_RUNNING;
00073 
00074   // For now, we only support one thread to run the reactor event loop.
00075   // Parts of the logic in this class would need to change to support
00076   // more than one thread running the reactor.
00077 
00078   // Attempt to activate ourselves.  If successful, a new thread will be
00079   // started and it will invoke our svc() method.  Note that we still have
00080   // a hold on our lock while we do this.
00081   if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00082     ACE_ERROR_RETURN((LM_ERROR,
00083                       "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00084                       "itself.\n"),
00085                      -1);
00086   }
00087 
00088   this->wait_for_startup();
00089 
00090   // Here we need to wait until a condition is triggered by the new thread(s)
00091   // that we created.  Note that this will cause us to release the lock and
00092   // wait until the condition_ is signal()'ed.  When it is signaled, the
00093   // condition will attempt to obtain the lock again, and then return to us
00094   // here.  We can then go on.
00095   if (this->state_ == STATE_NOT_RUNNING) {
00096     this->state_ = STATE_OPENING;
00097     this->condition_.wait();
00098   }
00099 
00100   return 0;
00101 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportReactorTask::stop ( void   ) 

Definition at line 187 of file TransportReactorTask.cpp.

References DBG_ENTRY_LVL, ACE_Event_Handler::DONT_CALL, ACE_Reactor::end_reactor_event_loop(), ACE_Proactor_Impl::get_handle(), ACE_Proactor::implementation(), lock_, proactor_, reactor_, ACE_Reactor::remove_handler(), state_, STATE_NOT_RUNNING, and ACE_Task_Base::wait().

00188 {
00189   DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00190   {
00191     GuardType guard(this->lock_);
00192 
00193     if (this->state_ == STATE_NOT_RUNNING) {
00194       // We are already "stopped".  Just return.
00195       return;
00196     }
00197 
00198     this->state_ = STATE_NOT_RUNNING;
00199   }
00200 
00201 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00202   // Remove the proactor handler so the reactor stops forwarding messages.
00203   if (this->proactor_) {
00204     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00205                                    ACE_Event_Handler::DONT_CALL);
00206   }
00207 #endif
00208 
00209   this->reactor_->end_reactor_event_loop();
00210 
00211   // Let's wait for the reactor task's thread to complete before we
00212   // leave this stop method.
00213   this->wait();
00214 }

Here is the call graph for this function:

int OpenDDS::DCPS::TransportReactorTask::svc ( void   )  [virtual]

Reimplemented from ACE_Task_Base.

Definition at line 104 of file TransportReactorTask.cpp.

References OpenDDS::DCPS::RcObject::_add_ref(), condition_, DBG_ENTRY_LVL, ACE_Thread_Manager::instance(), LM_ERROR, lock_, ACE_Reactor::owner(), reactor_, reactor_owner_, ACE_Reactor::run_reactor_event_loop(), SIG_SETMASK, ACE_OS::sigfillset(), ACE_Condition< MUTEX >::signal(), sigset_t, state_, STATE_OPENING, STATE_RUNNING, ACE_Thread_Manager::thr_self(), ACE_OS::thr_sigsetmask(), and wait_for_startup().

00105 {
00106   DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00107 
00108   // First off - We need to obtain our own reference to ourselves such
00109   // that we don't get deleted while still running in our own thread.
00110   // In essence, our current thread "owns" a copy of our reference.
00111   // It's all done with the magic of intrusive reference counting!
00112   this->_add_ref();
00113 
00114   // Ignore all signals to avoid
00115   //     ERROR: <something descriptive> Interrupted system call
00116   // The main thread will handle signals.
00117   sigset_t set;
00118   ACE_OS::sigfillset(&set);
00119   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00120 
00121   // VERY IMPORTANT!Tell the reactor that this task's thread will be
00122   //                  its "owner".
00123   if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00124     ACE_ERROR((LM_ERROR,
00125                "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00126   }
00127   this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00128   this->wait_for_startup();
00129 
00130 
00131   {
00132     // Obtain the lock.  This should only happen once the open() has hit
00133     // the condition_.wait() line of code, and has released the lock.
00134     GuardType guard(this->lock_);
00135 
00136     if (this->state_ == STATE_OPENING) {
00137       // Advance the state.
00138       this->state_ = STATE_RUNNING;
00139 
00140       // Signal the condition_ that we are here.
00141       this->condition_.signal();
00142     }
00143   }
00144 
00145   //TBD: Should the reactor continue running if there are some exceptions
00146   //     are caught while handling events?
00147 //MJM: Put this in a loop with a state conditional and use the state to
00148 //MJM: indicate whether or not to terminate.  But I can think of no
00149 //MJM: reason to have anything in the conditional, so just expire.
00150 //MJM: Nevermind.
00151   try {
00152     // Tell the reactor to handle events.
00153     this->reactor_->run_reactor_event_loop();
00154   } catch (const std::exception& e) {
00155     ACE_ERROR((LM_ERROR,
00156                "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00157                e.what()));
00158   } catch (...) {
00159     ACE_ERROR((LM_ERROR,
00160                "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00161   }
00162 
00163   return 0;
00164 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportReactorTask::wait_for_startup (  )  [inline]

Definition at line 50 of file TransportReactorTask.h.

Referenced by open(), and svc().

00050 { barrier_.wait(); }

Here is the caller graph for this function:


Member Data Documentation

Definition at line 64 of file TransportReactorTask.h.

Definition at line 67 of file TransportReactorTask.h.

Referenced by open(), and svc().

Definition at line 65 of file TransportReactorTask.h.

Referenced by open(), stop(), and svc().

Reimplemented from ACE_Event_Handler.

Definition at line 68 of file TransportReactorTask.h.

Referenced by get_reactor(), stop(), svc(), TransportReactorTask(), and ~TransportReactorTask().

Definition at line 69 of file TransportReactorTask.h.

Referenced by get_reactor_owner(), and svc().

Definition at line 66 of file TransportReactorTask.h.

Referenced by open(), stop(), and svc().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1