LCOV - code coverage report
Current view: top level - DCPS - ThreadPool.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 44 44 100.0 %
Date: 2023-04-30 01:32:43 Functions: 6 6 100.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "Definitions.h"
      11             : #include "ThreadPool.h"
      12             : 
      13             : #include "Definitions.h"
      14             : 
      15             : #include <ace/Guard_T.h>
      16             : 
      17             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      18             : 
      19             : namespace OpenDDS {
      20             : namespace DCPS {
      21             : 
      22          54 : ThreadPool::ThreadPool(size_t count, FunPtr fun, void* arg)
      23          54 :  : fun_(fun)
      24          54 :  , arg_(arg)
      25          54 :  , mutex_()
      26          54 :  , cv_(mutex_)
      27          54 :  , active_threads_(0)
      28             : #ifdef OPENDDS_NO_THREAD_JOIN
      29             :  , finished_threads_(0)
      30             : #endif
      31         108 :  , ids_(count, ACE_thread_t())
      32             : {
      33             :   {
      34          54 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      35         215 :     for (size_t i = 0; i < count; ++i) {
      36         161 :       ACE_Thread::spawn(run, this, THR_NEW_LWP | THR_JOINABLE, 0, &(ids_[i]));
      37             :     }
      38          54 :   }
      39          54 :   if (count) {
      40          53 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      41         149 :     while (active_threads_ != count) {
      42          96 :       cv_.wait(tsm_);
      43             :     }
      44          53 :   }
      45          54 : }
      46             : 
      47          55 : ThreadPool::~ThreadPool()
      48             : {
      49          54 :   join_all();
      50          55 : }
      51             : 
      52         161 : ACE_THR_FUNC_RETURN ThreadPool::run(void* arg)
      53             : {
      54         161 :   ThreadPool& pool = *static_cast<ThreadPool*>(arg);
      55             :   {
      56         161 :     ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_);
      57         161 :     pool.id_set_.insert(ACE_Thread::self());
      58         161 :     ++pool.active_threads_;
      59         161 :     pool.cv_.notify_all();
      60         396 :     while (pool.active_threads_ != pool.ids_.size()) {
      61         235 :       pool.cv_.wait(pool.tsm_);
      62             :     }
      63         161 :   }
      64         161 :   (*pool.fun_)(pool.arg_);
      65             : #ifdef OPENDDS_NO_THREAD_JOIN
      66             :   ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_);
      67             :   ++pool.finished_threads_;
      68             :   pool.cv_.notify_one();
      69             : #endif
      70         161 :   return 0;
      71             : }
      72             : 
      73          85 : bool ThreadPool::contains(ACE_thread_t id) const
      74             : {
      75          85 :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      76         170 :   return id_set_.count(id);
      77          85 : }
      78             : 
      79          54 : void ThreadPool::join_all()
      80             : {
      81             : #ifdef OPENDDS_NO_THREAD_JOIN
      82             :   ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      83             :   while (finished_threads_ != ids_.size()) {
      84             :     cv_.wait(tsm_);
      85             :   }
      86             : #else
      87          54 :   OPENDDS_VECTOR(ACE_hthread_t) ids;
      88             :   {
      89          54 :     ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
      90          54 :     ids = ids_;
      91          54 :   }
      92             : 
      93         215 :   for (size_t i = 0; i < ids.size(); ++i) {
      94         161 :     const int result = ACE_Thread::join(ids[i], 0);
      95             :     ACE_UNUSED_ARG(result);
      96         161 :     OPENDDS_ASSERT(result == 0);
      97             :   }
      98             : #endif
      99          54 : }
     100             : 
     101             : } // DCPS
     102             : } // OpenDDS
     103             : 
     104             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16