Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/ 9 : 10 : #include "Definitions.h" 11 : #include "ThreadPool.h" 12 : 13 : #include "Definitions.h" 14 : 15 : #include <ace/Guard_T.h> 16 : 17 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 18 : 19 : namespace OpenDDS { 20 : namespace DCPS { 21 : 22 54 : ThreadPool::ThreadPool(size_t count, FunPtr fun, void* arg) 23 54 : : fun_(fun) 24 54 : , arg_(arg) 25 54 : , mutex_() 26 54 : , cv_(mutex_) 27 54 : , active_threads_(0) 28 : #ifdef OPENDDS_NO_THREAD_JOIN 29 : , finished_threads_(0) 30 : #endif 31 108 : , ids_(count, ACE_thread_t()) 32 : { 33 : { 34 54 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 35 215 : for (size_t i = 0; i < count; ++i) { 36 161 : ACE_Thread::spawn(run, this, THR_NEW_LWP | THR_JOINABLE, 0, &(ids_[i])); 37 : } 38 54 : } 39 54 : if (count) { 40 53 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 41 149 : while (active_threads_ != count) { 42 96 : cv_.wait(tsm_); 43 : } 44 53 : } 45 54 : } 46 : 47 55 : ThreadPool::~ThreadPool() 48 : { 49 54 : join_all(); 50 55 : } 51 : 52 161 : ACE_THR_FUNC_RETURN ThreadPool::run(void* arg) 53 : { 54 161 : ThreadPool& pool = *static_cast<ThreadPool*>(arg); 55 : { 56 161 : ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_); 57 161 : pool.id_set_.insert(ACE_Thread::self()); 58 161 : ++pool.active_threads_; 59 161 : pool.cv_.notify_all(); 60 396 : while (pool.active_threads_ != pool.ids_.size()) { 61 235 : pool.cv_.wait(pool.tsm_); 62 : } 63 161 : } 64 161 : (*pool.fun_)(pool.arg_); 65 : #ifdef OPENDDS_NO_THREAD_JOIN 66 : ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_); 67 : ++pool.finished_threads_; 68 : pool.cv_.notify_one(); 69 : #endif 70 161 : return 0; 71 : } 72 : 73 85 : bool ThreadPool::contains(ACE_thread_t id) const 74 : { 75 85 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 76 170 : return id_set_.count(id); 77 85 : } 78 : 79 54 : void ThreadPool::join_all() 80 : { 81 : #ifdef OPENDDS_NO_THREAD_JOIN 82 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 83 : while (finished_threads_ != ids_.size()) { 84 : cv_.wait(tsm_); 85 : } 86 : #else 87 54 : OPENDDS_VECTOR(ACE_hthread_t) ids; 88 : { 89 54 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_); 90 54 : ids = ids_; 91 54 : } 92 : 93 215 : for (size_t i = 0; i < ids.size(); ++i) { 94 161 : const int result = ACE_Thread::join(ids[i], 0); 95 : ACE_UNUSED_ARG(result); 96 161 : OPENDDS_ASSERT(result == 0); 97 : } 98 : #endif 99 54 : } 100 : 101 : } // DCPS 102 : } // OpenDDS 103 : 104 : OPENDDS_END_VERSIONED_NAMESPACE_DECL