LCOV - code coverage report
Current view: top level - DCPS - ReactorInterceptor.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 52 71 73.2 %
Date: 2023-04-30 01:32:43 Functions: 7 14 50.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "ace/Log_Msg.h"
      11             : #include "ace/Reverse_Lock_T.h"
      12             : #include "ace/Synch.h"
      13             : 
      14             : #include "ReactorInterceptor.h"
      15             : #include "Service_Participant.h"
      16             : 
      17             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      18             : 
      19             : namespace OpenDDS {
      20             : namespace DCPS {
      21             : 
      22          32 : ReactorInterceptor::Command::Command()
      23          32 :   : reactor_(0)
      24          32 : {}
      25             : 
      26          21 : ReactorInterceptor::ReactorInterceptor(ACE_Reactor* reactor,
      27           0 :                                        ACE_thread_t owner)
      28          21 :   : owner_(owner)
      29          21 :   , state_(RS_NONE)
      30             : {
      31          21 :   RcEventHandler::reactor(reactor);
      32          21 : }
      33             : 
      34          21 : ReactorInterceptor::~ReactorInterceptor()
      35             : {
      36          21 : }
      37             : 
      38          32 : ReactorInterceptor::CommandPtr ReactorInterceptor::execute_or_enqueue(CommandPtr command)
      39             : {
      40          32 :   OPENDDS_ASSERT(command);
      41             : 
      42          32 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      43             : 
      44             :   // Only allow immediate execution if running on the reactor thread, otherwise we risk deadlock
      45             :   // when calling into the reactor object.
      46          32 :   const bool is_owner = ACE_OS::thr_equal(owner_, ACE_Thread::self());
      47             : 
      48             :   // If state is set to processing, the conents of command_queue_ have been swapped out
      49             :   // so immediate execution may run jobs out of the expected order.
      50          32 :   const bool is_not_processing = state_ != RS_PROCESSING;
      51             : 
      52             :   // If the command_queue_ is not empty, allowing execution will potentially run unexpected code
      53             :   // which is problematic since we may be holding locks used by the unexpected code.
      54          32 :   const bool is_empty = command_queue_.empty();
      55             : 
      56             :   // If all three of these conditions are met, it should be safe to execute
      57          32 :   const bool is_safe_to_execute = is_owner && is_not_processing && is_empty;
      58             : 
      59             :   // Even if it's not normally safe to execute, allow immediate execution if the reactor is shut down
      60          32 :   const bool immediate = is_safe_to_execute || reactor_is_shut_down();
      61             : 
      62             :   // Always set reactor and push to the queue
      63          32 :   ACE_Reactor* local_reactor = ACE_Event_Handler::reactor();
      64          32 :   command->set_reactor(local_reactor);
      65          32 :   command_queue_.push_back(command);
      66             : 
      67             :   // But depending on whether we're running it immediately or not, we either process or notify
      68          32 :   if (immediate) {
      69           9 :     process_command_queue_i(guard);
      70          23 :   } else if (state_ == RS_NONE) {
      71          22 :     state_ = RS_NOTIFIED;
      72          22 :     guard.release();
      73          22 :     local_reactor->notify(this);
      74             :   }
      75          64 :   return command;
      76          32 : }
      77             : 
      78          22 : int ReactorInterceptor::handle_exception(ACE_HANDLE /*fd*/)
      79             : {
      80          22 :   ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
      81             : 
      82          22 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      83          22 :   process_command_queue_i(guard);
      84          22 :   return 0;
      85          22 : }
      86             : 
      87          31 : void ReactorInterceptor::process_command_queue_i(ACE_Guard<ACE_Thread_Mutex>& guard)
      88             : {
      89          31 :   Queue cq;
      90          31 :   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(mutex_);
      91             : 
      92          31 :   state_ = RS_PROCESSING;
      93          31 :   if (!command_queue_.empty()) {
      94          31 :     cq.swap(command_queue_);
      95          31 :     ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock);
      96          63 :     for (Queue::const_iterator pos = cq.begin(), limit = cq.end(); pos != limit; ++pos) {
      97          32 :       (*pos)->execute();
      98             :     }
      99          31 :   }
     100          31 :   if (!command_queue_.empty()) {
     101           0 :     state_ = RS_NOTIFIED;
     102           0 :     ACE_Reactor* const reactor = ACE_Event_Handler::reactor();
     103           0 :     guard.release();
     104           0 :     reactor->notify(this);
     105             :   } else {
     106          31 :     state_ = RS_NONE;
     107             :   }
     108          31 : }
     109             : 
     110           0 : void ReactorInterceptor::reactor(ACE_Reactor *reactor)
     111             : {
     112           0 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     113           0 :   ACE_Event_Handler::reactor(reactor);
     114           0 : }
     115             : 
     116          45 : ACE_Reactor* ReactorInterceptor::reactor() const
     117             : {
     118          45 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     119          90 :   return ACE_Event_Handler::reactor();
     120          45 : }
     121             : 
     122           0 : void RegisterHandler::execute()
     123             : {
     124           0 :   if (reactor()->register_handler(io_handle_, event_handler_, mask_) != 0) {
     125           0 :     if (log_level >= LogLevel::Error) {
     126           0 :       ACE_ERROR((LM_ERROR,
     127             :                  "(%P|%t) ERROR: RegisterHandler::execute: failed to register handler for socket %d\n",
     128             :                  io_handle_));
     129             :     }
     130             :   }
     131           0 : }
     132             : 
     133           0 : void RemoveHandler::execute()
     134             : {
     135           0 :   if (reactor()->remove_handler(io_handle_, mask_) != 0) {
     136           0 :     if (log_level >= LogLevel::Error) {
     137           0 :       ACE_ERROR((LM_ERROR,
     138             :                  "(%P|%t) ERROR: UnregisterHandler::execute: failed to remove handler for socket %d\n",
     139             :                  io_handle_));
     140             :     }
     141             :   }
     142           0 : }
     143             : 
     144             : }
     145             : }
     146             : 
     147             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16