OpenDDS  Snapshot(2023/04/28-20:55)
ThreadPool.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "Definitions.h"
11 #include "ThreadPool.h"
12 
13 #include "Definitions.h"
14 
15 #include <ace/Guard_T.h>
16 
18 
19 namespace OpenDDS {
20 namespace DCPS {
21 
22 ThreadPool::ThreadPool(size_t count, FunPtr fun, void* arg)
23  : fun_(fun)
24  , arg_(arg)
25  , mutex_()
26  , cv_(mutex_)
27  , active_threads_(0)
28 #ifdef OPENDDS_NO_THREAD_JOIN
29  , finished_threads_(0)
30 #endif
31  , ids_(count, ACE_thread_t())
32 {
33  {
35  for (size_t i = 0; i < count; ++i) {
36  ACE_Thread::spawn(run, this, THR_NEW_LWP | THR_JOINABLE, 0, &(ids_[i]));
37  }
38  }
39  if (count) {
41  while (active_threads_ != count) {
42  cv_.wait(tsm_);
43  }
44  }
45 }
46 
48 {
49  join_all();
50 }
51 
52 ACE_THR_FUNC_RETURN ThreadPool::run(void* arg)
53 {
54  ThreadPool& pool = *static_cast<ThreadPool*>(arg);
55  {
57  pool.id_set_.insert(ACE_Thread::self());
58  ++pool.active_threads_;
59  pool.cv_.notify_all();
60  while (pool.active_threads_ != pool.ids_.size()) {
61  pool.cv_.wait(pool.tsm_);
62  }
63  }
64  (*pool.fun_)(pool.arg_);
65 #ifdef OPENDDS_NO_THREAD_JOIN
67  ++pool.finished_threads_;
68  pool.cv_.notify_one();
69 #endif
70  return 0;
71 }
72 
74 {
76  return id_set_.count(id);
77 }
78 
80 {
81 #ifdef OPENDDS_NO_THREAD_JOIN
83  while (finished_threads_ != ids_.size()) {
84  cv_.wait(tsm_);
85  }
86 #else
88  {
90  ids = ids_;
91  }
92 
93  for (size_t i = 0; i < ids.size(); ++i) {
94  const int result = ACE_Thread::join(ids[i], 0);
95  ACE_UNUSED_ARG(result);
96  OPENDDS_ASSERT(result == 0);
97  }
98 #endif
99 }
100 
101 } // DCPS
102 } // OpenDDS
103 
ThreadPool(size_t count, FunPtr fun, void *arg=0)
Definition: ThreadPool.cpp:22
ConditionVariable< ACE_Thread_Mutex > cv_
Definition: ThreadPool.h:67
bool notify_one()
Unblock one of the threads waiting on this condition.
static int spawn(ACE_THR_FUNC func, void *arg=0, long flags=THR_NEW_LWP|THR_JOINABLE, ACE_thread_t *t_id=0, ACE_hthread_t *t_handle=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, void *stack=0, size_t stack_size=ACE_DEFAULT_THREAD_STACKSIZE, ACE_Thread_Adapter *thread_adapter=0, const char **thr_name=0)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
static ACE_THR_FUNC_RETURN run(void *arg)
A static helper function used to redirect to requested thread start point.
Definition: ThreadPool.cpp:52
static int join(ACE_thread_t thread_id, ACE_thread_t *departed, ACE_THR_FUNC_RETURN *status)
ThreadStatusManager tsm_
Definition: ThreadPool.h:68
bool contains(ACE_thread_t id) const
Definition: ThreadPool.cpp:73
HANDLE ACE_hthread_t
DWORD ACE_thread_t
static ACE_thread_t self(void)
bool notify_all()
Unblock all of the threads waiting on this condition.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
ACE_Thread_Mutex mutex_
Definition: ThreadPool.h:66
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
OPENDDS_VECTOR(ACE_hthread_t) ids_