TransportReactorTask.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportReactorTask.h"
00010 
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReactorTask.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 #include <ace/Select_Reactor.h>
00016 #include <ace/WFMO_Reactor.h>
00017 #include <ace/Proactor.h>
00018 #include <ace/Proactor_Impl.h>
00019 #include <ace/WIN32_Proactor.h>
00020 
00021 OpenDDS::DCPS::TransportReactorTask::TransportReactorTask(bool useAsyncSend)
00022   : barrier_(2)
00023   , state_(STATE_NOT_RUNNING)
00024   , condition_(this->lock_)
00025 {
00026   DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00027 
00028 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00029   // Set our reactor and proactor pointers to a new reactor/proactor objects.
00030   if (useAsyncSend) {
00031     this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00032 
00033     ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00034     this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00035     this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00036     return;
00037   }
00038 #else
00039   ACE_UNUSED_ARG(useAsyncSend);
00040 #endif
00041 
00042   this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00043   this->proactor_ = 0;
00044 }
00045 
00046 OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask()
00047 {
00048   DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00049 
00050 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00051   if (this->proactor_) {
00052     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00053                                    ACE_Event_Handler::DONT_CALL);
00054     delete this->proactor_;
00055   }
00056 #endif
00057 
00058   delete this->reactor_;
00059 }
00060 
00061 int
00062 OpenDDS::DCPS::TransportReactorTask::open(void*)
00063 {
00064   DBG_ENTRY_LVL("TransportReactorTask","open",6);
00065 
00066   GuardType guard(this->lock_);
00067 
00068   // Reset our state.
00069   this->state_ = STATE_NOT_RUNNING;
00070 
00071   // For now, we only support one thread to run the reactor event loop.
00072   // Parts of the logic in this class would need to change to support
00073   // more than one thread running the reactor.
00074 
00075   // Attempt to activate ourselves.  If successful, a new thread will be
00076   // started and it will invoke our svc() method.  Note that we still have
00077   // a hold on our lock while we do this.
00078   if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00079     ACE_ERROR_RETURN((LM_ERROR,
00080                       "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00081                       "itself.\n"),
00082                      -1);
00083   }
00084 
00085   this->wait_for_startup();
00086 
00087   // Here we need to wait until a condition is triggered by the new thread(s)
00088   // that we created.  Note that this will cause us to release the lock and
00089   // wait until the condition_ is signal()'ed.  When it is signaled, the
00090   // condition will attempt to obtain the lock again, and then return to us
00091   // here.  We can then go on.
00092   if (this->state_ == STATE_NOT_RUNNING) {
00093     this->state_ = STATE_OPENING;
00094     this->condition_.wait();
00095   }
00096 
00097   return 0;
00098 }
00099 
00100 int
00101 OpenDDS::DCPS::TransportReactorTask::svc()
00102 {
00103   DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00104 
00105   // First off - We need to obtain our own reference to ourselves such
00106   // that we don't get deleted while still running in our own thread.
00107   // In essence, our current thread "owns" a copy of our reference.
00108   // It's all done with the magic of intrusive reference counting!
00109   this->_add_ref();
00110 
00111   // Ignore all signals to avoid
00112   //     ERROR: <something descriptive> Interrupted system call
00113   // The main thread will handle signals.
00114   sigset_t set;
00115   ACE_OS::sigfillset(&set);
00116   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00117 
00118   // VERY IMPORTANT!Tell the reactor that this task's thread will be
00119   //                  its "owner".
00120   if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00121     ACE_ERROR((LM_ERROR,
00122                "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00123   }
00124   this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00125   this->wait_for_startup();
00126 
00127 
00128   {
00129     // Obtain the lock.  This should only happen once the open() has hit
00130     // the condition_.wait() line of code, and has released the lock.
00131     GuardType guard(this->lock_);
00132 
00133     if (this->state_ == STATE_OPENING) {
00134       // Advance the state.
00135       this->state_ = STATE_RUNNING;
00136 
00137       // Signal the condition_ that we are here.
00138       this->condition_.signal();
00139     }
00140   }
00141 
00142   //TBD: Should the reactor continue running if there are some exceptions
00143   //     are caught while handling events?
00144 //MJM: Put this in a loop with a state conditional and use the state to
00145 //MJM: indicate whether or not to terminate.  But I can think of no
00146 //MJM: reason to have anything in the conditional, so just expire.
00147 //MJM: Nevermind.
00148   try {
00149     // Tell the reactor to handle events.
00150     this->reactor_->run_reactor_event_loop();
00151   } catch (const std::exception& e) {
00152     ACE_ERROR((LM_ERROR,
00153                "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00154                e.what()));
00155   } catch (...) {
00156     ACE_ERROR((LM_ERROR,
00157                "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00158   }
00159 
00160   return 0;
00161 }
00162 
00163 int
00164 OpenDDS::DCPS::TransportReactorTask::close(u_long flags)
00165 {
00166   DBG_ENTRY_LVL("TransportReactorTask","close",6);
00167   ACE_UNUSED_ARG(flags);
00168   // This is called after the reactor threads exit.
00169   // We should not set state here since we are not
00170   // sure how many reactor threads we will use.
00171   // If there is one reactor thread then we should
00172   // set the state so the stop will not call
00173   // end_reactor_event_loop.
00174   // If there are multiple reactor threads, we still
00175   // need call end_reactor_event_loop in stop() while
00176   // one reactor thread already exited.
00177 //MJM: Right.
00178 
00179   this->_remove_ref();
00180   return 0;
00181 }
00182 
00183 void
00184 OpenDDS::DCPS::TransportReactorTask::stop()
00185 {
00186   DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00187   {
00188     GuardType guard(this->lock_);
00189 
00190     if (this->state_ == STATE_NOT_RUNNING) {
00191       // We are already "stopped".  Just return.
00192       return;
00193     }
00194 
00195     this->state_ = STATE_NOT_RUNNING;
00196   }
00197 
00198 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00199   // Remove the proactor handler so the reactor stops forwarding messages.
00200   if (this->proactor_) {
00201     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00202                                    ACE_Event_Handler::DONT_CALL);
00203   }
00204 #endif
00205 
00206   this->reactor_->end_reactor_event_loop();
00207 
00208   // Let's wait for the reactor task's thread to complete before we
00209   // leave this stop method.
00210   this->wait();
00211 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7