LCOV - code coverage report
Current view: top level - DCPS - DispatchService.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 119 127 93.7 %
Date: 2023-04-30 01:32:43 Functions: 10 12 83.3 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "debug.h"
      11             : #include "DispatchService.h"
      12             : #include "Service_Participant.h"
      13             : #include "TimeDuration.h"
      14             : 
      15             : #include <ace/Reverse_Lock_T.h>
      16             : 
      17             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      18             : 
      19             : namespace OpenDDS {
      20             : namespace DCPS {
      21             : 
      22          48 : DispatchService::DispatchService(size_t count)
      23          48 :  : cv_(mutex_)
      24          48 :  , allow_dispatch_(true)
      25          48 :  , stop_when_empty_(false)
      26          48 :  , running_(true)
      27          48 :  , running_threads_(0)
      28          48 :  , max_timer_id_(LONG_MAX)
      29          96 :  , pool_(count, run, this)
      30             : {
      31          48 : }
      32             : 
      33          76 : DispatchService::~DispatchService()
      34             : {
      35          48 :   shutdown();
      36          76 : }
      37             : 
      38          84 : void DispatchService::shutdown(bool immediate, EventQueue* const pending)
      39             : {
      40          84 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      41          84 :   allow_dispatch_ = false;
      42          84 :   stop_when_empty_ = true;
      43          84 :   running_ = running_ && !immediate; // && with existing state in case shutdown has already been called
      44          84 :   cv_.notify_all();
      45             : 
      46          84 :   if (pool_.contains(ACE_Thread::self())) {
      47           0 :     if (log_level >= LogLevel::Error) {
      48           0 :       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR :: DispatchService::shutdown: Contained Thread Attempting To Call Shutdown.\n"));
      49             :     }
      50           0 :     if (pending) {
      51           0 :       pending->clear();
      52             :     }
      53           0 :     return;
      54             :   }
      55             : 
      56         180 :   while (running_threads_) {
      57          96 :     cv_.wait(TheServiceParticipant->get_thread_status_manager());
      58             :   }
      59             : 
      60          84 :   if (pending) {
      61          31 :     pending->clear();
      62          31 :     pending->swap(event_queue_);
      63          31 :     const TimerQueueMap& cmap = timer_queue_map_;
      64          35 :     for (TimerQueueMap::const_iterator it = cmap.begin(), limit = cmap.end(); it != limit; ++it) {
      65           4 :       pending->push_back(it->second.first);
      66             :     }
      67             :   } else {
      68          53 :     event_queue_.clear();
      69             :   }
      70          84 :   timer_queue_map_.clear();
      71          84 :   timer_id_map_.clear();
      72          84 : }
      73             : 
      74        4147 : DispatchService::DispatchStatus DispatchService::dispatch(FunPtr fun, void* arg)
      75             : {
      76        4147 :   if (!fun) {
      77           0 :     return DS_ERROR;
      78             :   }
      79             : 
      80        4147 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      81        4147 :   if (allow_dispatch_) {
      82        4146 :     event_queue_.push_back(std::make_pair(fun, arg));
      83        4146 :     cv_.notify_one();
      84        4146 :     return DS_SUCCESS;
      85             :   }
      86           1 :   return DS_ERROR;
      87        4147 : }
      88             : 
      89          84 : DispatchService::TimerId DispatchService::schedule(FunPtr fun, void* arg, const MonotonicTimePoint& expiration)
      90             : {
      91          84 :   if (!fun) {
      92           0 :     return TI_FAILURE;
      93             :   }
      94             : 
      95          84 :   TimerId id = 0;
      96          84 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      97          84 :   if (allow_dispatch_) {
      98          83 :     TimerQueueMap::iterator pos = timer_queue_map_.insert(std::make_pair(expiration, std::make_pair(std::make_pair(fun, arg), 0)));
      99             :     // Make it a loop in case we ever recycle timer ids
     100          83 :     const TimerId starting_id = max_timer_id_;
     101             :     do {
     102          83 :       id = max_timer_id_ = max_timer_id_ == LONG_MAX ? 1 : max_timer_id_ + 1;
     103          83 :       if (id == starting_id) {
     104           0 :         return TI_FAILURE; // all ids in use ?!
     105             :       }
     106          83 :       pos->second.second = id;
     107          83 :     } while (timer_id_map_.insert(std::make_pair(id, pos)).second == false);
     108          83 :     cv_.notify_one();
     109          83 :     return id;
     110             :   }
     111           1 :   return TI_FAILURE;
     112          84 : }
     113             : 
     114          25 : size_t DispatchService::cancel(DispatchService::TimerId id, void** arg)
     115             : {
     116          25 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     117          25 :   TimerIdMap::iterator pos = timer_id_map_.find(id);
     118          25 :   if (pos != timer_id_map_.end()) {
     119          23 :     if (pos->second == timer_queue_map_.begin()) {
     120          13 :       cv_.notify_all();
     121             :     }
     122          23 :     if (arg) {
     123          17 :       *arg = pos->second->second.first.second;
     124             :     }
     125          23 :     timer_queue_map_.erase(pos->second);
     126          23 :     timer_id_map_.erase(pos);
     127          23 :     return 1;
     128             :   }
     129           2 :   return 0;
     130          25 : }
     131             : 
     132           4 : size_t DispatchService::cancel(FunPtr fun, void* arg)
     133             : {
     134           4 :   OPENDDS_ASSERT(fun);
     135           4 :   size_t count = 0;
     136           4 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     137          22 :   for (TimerQueueMap::iterator it = timer_queue_map_.begin(); it != timer_queue_map_.end();) {
     138          18 :     if (it->second.first.first == fun && it->second.first.second == arg) {
     139          12 :       if (it == timer_queue_map_.begin()) {
     140           6 :         cv_.notify_all();
     141             :       }
     142          12 :       timer_id_map_.erase(it->second.second);
     143          12 :       timer_queue_map_.erase(it++);
     144          12 :       ++count;
     145             :     } else {
     146           6 :       ++it;
     147             :     }
     148             :   }
     149           4 :   return count;
     150           4 : }
     151             : 
     152         134 : ACE_THR_FUNC_RETURN DispatchService::run(void* arg)
     153             : {
     154         134 :   DispatchService& dispatcher = *static_cast<DispatchService*>(arg);
     155         134 :   dispatcher.run_event_loop();
     156         134 :   return 0;
     157             : }
     158             : 
     159         134 : void DispatchService::run_event_loop()
     160             : {
     161         134 :   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(mutex_);
     162         134 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
     163         134 :   ++running_threads_;
     164        4645 :   while (running_) {
     165             : 
     166             :     // Logical Order:
     167             :     // - Move expired timer events into normal event queue
     168             :     // - Wait appropriate length if there's nothing to do
     169             :     // - Check for early exit before execution
     170             :     // - Run first task from event queue
     171             : 
     172        4511 :     if (allow_dispatch_ && !timer_queue_map_.empty()) {
     173         185 :       const MonotonicTimePoint now = MonotonicTimePoint::now();
     174             : 
     175         185 :       TimerQueueMap::iterator last = timer_queue_map_.upper_bound(now), pos = last;
     176         229 :       while (pos != timer_queue_map_.begin()) {
     177          44 :         --pos;
     178          44 :         event_queue_.push_back(pos->second.first);
     179          44 :         timer_id_map_.erase(pos->second.second);
     180             :       }
     181         185 :       if (last != timer_queue_map_.begin()) {
     182          44 :         timer_queue_map_.erase(timer_queue_map_.begin(), last);
     183             :       }
     184         185 :     }
     185             : 
     186        4511 :     if (event_queue_.empty()) {
     187         386 :       if (stop_when_empty_) {
     188          38 :         running_ = false;
     189          38 :         cv_.notify_all();
     190         348 :       } else if (allow_dispatch_ && timer_queue_map_.size()) {
     191         126 :         MonotonicTimePoint deadline(timer_queue_map_.begin()->first);
     192         126 :         cv_.wait_until(deadline, TheServiceParticipant->get_thread_status_manager());
     193         126 :       } else {
     194         222 :         cv_.wait(TheServiceParticipant->get_thread_status_manager());
     195             :       }
     196             :     }
     197             : 
     198        4511 :     if (!running_ || event_queue_.empty()) continue;
     199             : 
     200        4189 :     FunArgPair pair = event_queue_.front();
     201        4189 :     event_queue_.pop_front();
     202        4189 :     ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock);
     203        4189 :     pair.first(pair.second);
     204        4189 :   }
     205         134 :   --running_threads_;
     206         134 :   cv_.notify_all();
     207         134 : }
     208             : 
     209             : } // DCPS
     210             : } // OpenDDS
     211             : 
     212             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16