LCOV - code coverage report
Current view: top level - DCPS - ReactorTask.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 77 98 78.6 %
Date: 2023-04-30 01:32:43 Functions: 8 13 61.5 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include <DCPS/DdsDcps_pch.h> // Only the _pch include should start with DCPS/
       9             : 
      10             : #include "ReactorTask.h"
      11             : #if !defined (__ACE_INLINE__)
      12             : #include "ReactorTask.inl"
      13             : #endif /* __ACE_INLINE__ */
      14             : 
      15             : #include "Service_Participant.h"
      16             : 
      17             : #include <ace/Select_Reactor.h>
      18             : #include <ace/WFMO_Reactor.h>
      19             : #include <ace/Proactor.h>
      20             : #include <ace/Proactor_Impl.h>
      21             : #include <ace/WIN32_Proactor.h>
      22             : #include <ace/OS_NS_Thread.h>
      23             : 
      24             : #include <exception>
      25             : #include <cstring>
      26             : 
      27             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      28             : 
      29             : namespace OpenDDS {
      30             : namespace DCPS {
      31             : 
      32           1 : ReactorTask::ReactorTask(bool useAsyncSend)
      33           1 :   : condition_(lock_)
      34           1 :   , state_(STATE_UNINITIALIZED)
      35           1 :   , reactor_(0)
      36           1 :   , reactor_owner_(ACE_OS::NULL_thread)
      37           1 :   , proactor_(0)
      38             : #ifdef OPENDDS_REACTOR_TASK_ASYNC
      39             :   , use_async_send_(useAsyncSend)
      40             : #endif
      41           1 :   , timer_queue_(0)
      42           2 :   , thread_status_manager_(0)
      43             : {
      44             :   ACE_UNUSED_ARG(useAsyncSend);
      45           1 : }
      46             : 
      47           1 : ReactorTask::~ReactorTask()
      48             : {
      49           1 :   cleanup();
      50           1 : }
      51             : 
      52          17 : void ReactorTask::wait_for_startup_i() const
      53             : {
      54          17 :   while (state_ == STATE_UNINITIALIZED || state_ == STATE_OPENING) {
      55           0 :     condition_.wait(thread_status_manager_ ?
      56             :                       *thread_status_manager_ :
      57           0 :                       TheServiceParticipant->get_thread_status_manager());
      58             :   }
      59          17 : }
      60             : 
      61          10 : void ReactorTask::cleanup()
      62             : {
      63             : #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
      64          10 :   if (proactor_) {
      65           0 :     reactor_->remove_handler(
      66           0 :       proactor_->implementation()->get_handle(),
      67             :       ACE_Event_Handler::DONT_CALL);
      68           0 :     delete proactor_;
      69           0 :     proactor_ = 0;
      70             :   }
      71             : #endif
      72             : 
      73          10 :   delete reactor_;
      74          10 :   reactor_ = 0;
      75          10 :   delete timer_queue_;
      76          10 :   timer_queue_ = 0;
      77          10 : }
      78             : 
      79           9 : int ReactorTask::open_reactor_task(void*,
      80             :                                    ThreadStatusManager* thread_status_manager,
      81             :                                    const String& name)
      82             : {
      83           9 :   GuardType guard(lock_);
      84             : 
      85             :   // If we've already been opened, let's clean up the old stuff
      86           9 :   cleanup();
      87             : 
      88             :   // thread status reporting support
      89           9 :   thread_status_manager_ = thread_status_manager;
      90           9 :   name_ = name;
      91             : 
      92             :   // Set our reactor and proactor pointers to a new reactor/proactor objects.
      93             : #ifdef OPENDDS_REACTOR_TASK_ASYNC
      94             :   if (use_async_send_ && !reactor_) {
      95             :     reactor_ = new ACE_Reactor(new ACE_WFMO_Reactor, 1);
      96             : 
      97             :     ACE_WIN32_Proactor* proactor_impl = new ACE_WIN32_Proactor(0, 1);
      98             :     proactor_ = new ACE_Proactor(proactor_impl, 1);
      99             :     reactor_->register_handler(proactor_impl, proactor_impl->get_handle());
     100             :   } else
     101             : #endif
     102           9 :   if (!reactor_) {
     103           9 :     reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
     104           9 :     proactor_ = 0;
     105             :   }
     106             : 
     107           9 :   if (!timer_queue_) {
     108           9 :     timer_queue_ = new TimerQueueType();
     109           9 :     reactor_->timer_queue(timer_queue_);
     110             :   }
     111             : 
     112           9 :   state_ = STATE_OPENING;
     113           9 :   condition_.notify_all();
     114             : 
     115           9 :   if (activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
     116           0 :     ACE_ERROR_RETURN((LM_ERROR,
     117             :                       "(%P|%t) ERROR: ReactorTask Failed to activate "
     118             :                       "itself.\n"),
     119             :                      -1);
     120             :   }
     121             : 
     122          18 :   while (state_ != STATE_RUNNING) {
     123           9 :     condition_.wait(*thread_status_manager);
     124             :   }
     125             : 
     126           9 :   return 0;
     127           9 : }
     128             : 
     129           9 : int ReactorTask::svc()
     130             : {
     131           9 :   ThreadStatusManager::Start s(*thread_status_manager_, name_);
     132             : 
     133             :   {
     134           9 :     GuardType guard(lock_);
     135             : 
     136             :     // First off - We need to obtain our own reference to ourselves such
     137             :     // that we don't get deleted while still running in our own thread.
     138             :     // In essence, our current thread "owns" a copy of our reference.
     139             :     // It's all done with the magic of intrusive reference counting!
     140           9 :     _add_ref();
     141             : 
     142             :     // Ignore all signals to avoid
     143             :     //     ERROR: <something descriptive> Interrupted system call
     144             :     // The main thread will handle signals.
     145             :     sigset_t set;
     146           9 :     ACE_OS::sigfillset(&set);
     147           9 :     ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
     148             : 
     149             :     // Tell the reactor that this thread will be its owner
     150           9 :     if (reactor_->owner(ACE_Thread_Manager::instance()->thr_self()) != 0) {
     151           0 :       ACE_ERROR((LM_ERROR,
     152             :                  "(%P|%t) ERROR: Failed to change the reactor's owner().\n"));
     153             :     }
     154           9 :     reactor_owner_ = ACE_Thread_Manager::instance()->thr_self();
     155             : 
     156           9 :     interceptor_ = make_rch<Interceptor>(this, reactor_, reactor_owner_);
     157             : 
     158             :     // Advance the state.
     159           9 :     state_ = STATE_RUNNING;
     160           9 :     condition_.notify_all();
     161           9 :   }
     162             : 
     163             :   // Tell the reactor to handle events.
     164           9 :   if (thread_status_manager_->update_thread_status()) {
     165           0 :     while (state_ == STATE_RUNNING) {
     166           0 :       ACE_Time_Value t = thread_status_manager_->thread_status_interval().value();
     167           0 :       ThreadStatusManager::Sleeper sleeper(*thread_status_manager_);
     168           0 :       reactor_->run_reactor_event_loop(t, 0);
     169           0 :     }
     170             : 
     171             :   } else {
     172           9 :     reactor_->run_reactor_event_loop();
     173             :   }
     174             : 
     175           9 :   return 0;
     176           9 : }
     177             : 
     178           9 : int ReactorTask::close(u_long flags)
     179             : {
     180             :   ACE_UNUSED_ARG(flags);
     181             :   // This is called after the reactor threads exit.
     182             :   // We should not set state here since we are not
     183             :   // sure how many reactor threads we will use.
     184             :   // If there is one reactor thread then we should
     185             :   // set the state so the stop will not call
     186             :   // end_reactor_event_loop.
     187             :   // If there are multiple reactor threads, we still
     188             :   // need call end_reactor_event_loop in stop() while
     189             :   // one reactor thread already exited.
     190             : //MJM: Right.
     191             : 
     192           9 :   _remove_ref();
     193           9 :   return 0;
     194             : }
     195             : 
     196           9 : void ReactorTask::stop()
     197             : {
     198           9 :   ACE_Reactor* reactor = 0;
     199             :   {
     200           9 :     GuardType guard(lock_);
     201             : 
     202           9 :     if (state_ == STATE_UNINITIALIZED || state_ == STATE_SHUT_DOWN) {
     203             :       // We are already "stopped".  Just return.
     204           0 :       return;
     205             :     }
     206             : 
     207           9 :     state_ = STATE_SHUT_DOWN;
     208             : 
     209             : #if defined (ACE_HAS_WIN32_OVERLAPPED_IO) || defined (ACE_HAS_AIO_CALLS)
     210             :     // Remove the proactor handler so the reactor stops forwarding messages.
     211           9 :     if (proactor_) {
     212           0 :       reactor_->remove_handler(
     213           0 :         proactor_->implementation()->get_handle(),
     214             :         ACE_Event_Handler::DONT_CALL);
     215             :     }
     216             : #endif
     217           9 :     reactor = reactor_;
     218           9 :   }
     219             : 
     220           9 :   if (reactor) {
     221             :     // We can't hold the lock when we call this, because the reactor threads may need to
     222             :     // access the lock as part of normal execution before they return to the reactor control loop
     223           9 :     reactor->end_reactor_event_loop();
     224             :   }
     225             : 
     226             :   {
     227           9 :     GuardType guard(lock_);
     228             : 
     229             :     // In the future, we will likely want to replace this assert with a new "SHUTTING_DOWN" state
     230             :     // which can be used to delay any potential new calls to open_reactor_task()
     231           9 :     OPENDDS_ASSERT(state_ == STATE_SHUT_DOWN);
     232             : 
     233             :     // Let's wait for the reactor task's thread to complete before we
     234             :     // leave this stop method.
     235           9 :     if (thread_status_manager_) {
     236           9 :       ThreadStatusManager::Sleeper sleeper(*thread_status_manager_);
     237           9 :       wait();
     238             : 
     239             :       // Reset the thread manager in case it goes away before the next open.
     240           9 :       thr_mgr(0);
     241           9 :     }
     242           9 :   }
     243             : }
     244             : 
     245           0 : void ReactorTask::reactor(ACE_Reactor* reactor)
     246             : {
     247           0 :   ACE_Event_Handler::reactor(reactor);
     248           0 : }
     249             : 
     250           0 : ACE_Reactor* ReactorTask::reactor() const
     251             : {
     252           0 :   return ACE_Event_Handler::reactor();
     253             : }
     254             : 
     255             : } // namespace DCPS
     256             : } // namespace OpenDDS
     257             : 
     258             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16