TransportReactorTask.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportReactorTask.h"
00010 
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReactorTask.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 #include <ace/Select_Reactor.h>
00016 #include <ace/WFMO_Reactor.h>
00017 #include <ace/Proactor.h>
00018 #include <ace/Proactor_Impl.h>
00019 #include <ace/WIN32_Proactor.h>
00020 
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 OpenDDS::DCPS::TransportReactorTask::TransportReactorTask(bool useAsyncSend)
00024   : barrier_(2)
00025   , state_(STATE_NOT_RUNNING)
00026   , condition_(this->lock_)
00027   , reactor_owner_(ACE_OS::NULL_thread)
00028 {
00029   DBG_ENTRY_LVL("TransportReactorTask","TransportReactorTask",6);
00030 
00031 #if defined (ACE_WIN32) && defined (ACE_HAS_WIN32_OVERLAPPED_IO)
00032   // Set our reactor and proactor pointers to a new reactor/proactor objects.
00033   if (useAsyncSend) {
00034     this->reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
00035 
00036     ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
00037     this->proactor_ = new ACE_Proactor(proactor_impl, 1);
00038     this->reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
00039     return;
00040   }
00041 #else
00042   ACE_UNUSED_ARG(useAsyncSend);
00043 #endif
00044 
00045   this->reactor_ = new ACE_Reactor(new ACE_Select_Reactor, 1);
00046   this->proactor_ = 0;
00047 }
00048 
00049 OpenDDS::DCPS::TransportReactorTask::~TransportReactorTask()
00050 {
00051   DBG_ENTRY_LVL("TransportReactorTask","~TransportReactorTask",6);
00052 
00053 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00054   if (this->proactor_) {
00055     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00056                                    ACE_Event_Handler::DONT_CALL);
00057     delete this->proactor_;
00058   }
00059 #endif
00060 
00061   delete this->reactor_;
00062 }
00063 
00064 int
00065 OpenDDS::DCPS::TransportReactorTask::open(void*)
00066 {
00067   DBG_ENTRY_LVL("TransportReactorTask","open",6);
00068 
00069   GuardType guard(this->lock_);
00070 
00071   // Reset our state.
00072   this->state_ = STATE_NOT_RUNNING;
00073 
00074   // For now, we only support one thread to run the reactor event loop.
00075   // Parts of the logic in this class would need to change to support
00076   // more than one thread running the reactor.
00077 
00078   // Attempt to activate ourselves.  If successful, a new thread will be
00079   // started and it will invoke our svc() method.  Note that we still have
00080   // a hold on our lock while we do this.
00081   if (this->activate(THR_NEW_LWP | THR_JOINABLE,1) != 0) {
00082     ACE_ERROR_RETURN((LM_ERROR,
00083                       "(%P|%t) ERROR: TransportReactorTask Failed to activate "
00084                       "itself.\n"),
00085                      -1);
00086   }
00087 
00088   this->wait_for_startup();
00089 
00090   // Here we need to wait until a condition is triggered by the new thread(s)
00091   // that we created.  Note that this will cause us to release the lock and
00092   // wait until the condition_ is signal()'ed.  When it is signaled, the
00093   // condition will attempt to obtain the lock again, and then return to us
00094   // here.  We can then go on.
00095   if (this->state_ == STATE_NOT_RUNNING) {
00096     this->state_ = STATE_OPENING;
00097     this->condition_.wait();
00098   }
00099 
00100   return 0;
00101 }
00102 
00103 int
00104 OpenDDS::DCPS::TransportReactorTask::svc()
00105 {
00106   DBG_ENTRY_LVL("TransportReactorTask","svc",6);
00107 
00108   // First off - We need to obtain our own reference to ourselves such
00109   // that we don't get deleted while still running in our own thread.
00110   // In essence, our current thread "owns" a copy of our reference.
00111   // It's all done with the magic of intrusive reference counting!
00112   this->_add_ref();
00113 
00114   // Ignore all signals to avoid
00115   //     ERROR: <something descriptive> Interrupted system call
00116   // The main thread will handle signals.
00117   sigset_t set;
00118   ACE_OS::sigfillset(&set);
00119   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
00120 
00121   // VERY IMPORTANT!Tell the reactor that this task's thread will be
00122   //                  its "owner".
00123   if (this->reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
00124     ACE_ERROR((LM_ERROR,
00125                "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
00126   }
00127   this->reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
00128   this->wait_for_startup();
00129 
00130 
00131   {
00132     // Obtain the lock.  This should only happen once the open() has hit
00133     // the condition_.wait() line of code, and has released the lock.
00134     GuardType guard(this->lock_);
00135 
00136     if (this->state_ == STATE_OPENING) {
00137       // Advance the state.
00138       this->state_ = STATE_RUNNING;
00139 
00140       // Signal the condition_ that we are here.
00141       this->condition_.signal();
00142     }
00143   }
00144 
00145   //TBD: Should the reactor continue running if there are some exceptions
00146   //     are caught while handling events?
00147 //MJM: Put this in a loop with a state conditional and use the state to
00148 //MJM: indicate whether or not to terminate.  But I can think of no
00149 //MJM: reason to have anything in the conditional, so just expire.
00150 //MJM: Nevermind.
00151   try {
00152     // Tell the reactor to handle events.
00153     this->reactor_->run_reactor_event_loop();
00154   } catch (const std::exception& e) {
00155     ACE_ERROR((LM_ERROR,
00156                "(%P|%t) ERROR: TransportReactorTask::svc caught exception - %C.\n",
00157                e.what()));
00158   } catch (...) {
00159     ACE_ERROR((LM_ERROR,
00160                "(%P|%t) ERROR: TransportReactorTask::svc caught exception.\n"));
00161   }
00162 
00163   return 0;
00164 }
00165 
00166 int
00167 OpenDDS::DCPS::TransportReactorTask::close(u_long flags)
00168 {
00169   DBG_ENTRY_LVL("TransportReactorTask","close",6);
00170   ACE_UNUSED_ARG(flags);
00171   // This is called after the reactor threads exit.
00172   // We should not set state here since we are not
00173   // sure how many reactor threads we will use.
00174   // If there is one reactor thread then we should
00175   // set the state so the stop will not call
00176   // end_reactor_event_loop.
00177   // If there are multiple reactor threads, we still
00178   // need call end_reactor_event_loop in stop() while
00179   // one reactor thread already exited.
00180 //MJM: Right.
00181 
00182   this->_remove_ref();
00183   return 0;
00184 }
00185 
00186 void
00187 OpenDDS::DCPS::TransportReactorTask::stop()
00188 {
00189   DBG_ENTRY_LVL("TransportReactorTask","stop",6);
00190   {
00191     GuardType guard(this->lock_);
00192 
00193     if (this->state_ == STATE_NOT_RUNNING) {
00194       // We are already "stopped".  Just return.
00195       return;
00196     }
00197 
00198     this->state_ = STATE_NOT_RUNNING;
00199   }
00200 
00201 #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
00202   // Remove the proactor handler so the reactor stops forwarding messages.
00203   if (this->proactor_) {
00204     this->reactor_->remove_handler(this->proactor_->implementation()->get_handle(),
00205                                    ACE_Event_Handler::DONT_CALL);
00206   }
00207 #endif
00208 
00209   this->reactor_->end_reactor_event_loop();
00210 
00211   // Let's wait for the reactor task's thread to complete before we
00212   // leave this stop method.
00213   this->wait();
00214 }
00215 
00216 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1