OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ThreadPool Class Reference

#include <ThreadPool.h>

Collaboration diagram for OpenDDS::DCPS::ThreadPool:
Collaboration graph
[legend]

Public Types

typedef ACE_THR_FUNC_RETURN(* FunPtr) (void *)
 A typedef for the starting point of the ThreadPool. More...
 

Public Member Functions

 ThreadPool (size_t count, FunPtr fun, void *arg=0)
 
virtual ~ThreadPool ()
 
bool contains (ACE_thread_t id) const
 

Static Public Member Functions

static ACE_THR_FUNC_RETURN run (void *arg)
 A static helper function used to redirect to requested thread start point. More...
 

Private Member Functions

void join_all ()
 
 OPENDDS_VECTOR (ACE_hthread_t) ids_
 
 OPENDDS_SET (ACE_thread_t) id_set_
 

Private Attributes

FunPtr fun_
 
void * arg_
 
ACE_Thread_Mutex mutex_
 
ConditionVariable< ACE_Thread_Mutexcv_
 
ThreadStatusManager tsm_
 
size_t active_threads_
 

Detailed Description

ThreadPool is a light-weight utility class for starting a group of threads

ThreadPool creates several threads at construction and attempts to join them at destruction. Users of ThreadPool are responsible for making sure the running threads are in a joinable state before the destruction of ThreadPool

Definition at line 34 of file ThreadPool.h.

Member Typedef Documentation

◆ FunPtr

typedef ACE_THR_FUNC_RETURN(* OpenDDS::DCPS::ThreadPool::FunPtr) (void *)

A typedef for the starting point of the ThreadPool.

Definition at line 39 of file ThreadPool.h.

Constructor & Destructor Documentation

◆ ThreadPool()

OpenDDS::DCPS::ThreadPool::ThreadPool ( size_t  count,
FunPtr  fun,
void *  arg = 0 
)

Creates a ThreadPool with the specifed size, starting point, and argument

Parameters
countnumber of threads
funstarting point for the threads of the ThreadPool
argan optional argument to pass to the starting function

Definition at line 22 of file ThreadPool.cpp.

References active_threads_, cv_, mutex_, run(), ACE_Thread::spawn(), tsm_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

23  : fun_(fun)
24  , arg_(arg)
25  , mutex_()
26  , cv_(mutex_)
27  , active_threads_(0)
28 #ifdef OPENDDS_NO_THREAD_JOIN
29  , finished_threads_(0)
30 #endif
31  , ids_(count, ACE_thread_t())
32 {
33  {
35  for (size_t i = 0; i < count; ++i) {
36  ACE_Thread::spawn(run, this, THR_NEW_LWP | THR_JOINABLE, 0, &(ids_[i]));
37  }
38  }
39  if (count) {
41  while (active_threads_ != count) {
42  cv_.wait(tsm_);
43  }
44  }
45 }
ACE_Thread_Mutex mutex_
Definition: ThreadPool.h:66
static int spawn(ACE_THR_FUNC func, void *arg=0, long flags=THR_NEW_LWP|THR_JOINABLE, ACE_thread_t *t_id=0, ACE_hthread_t *t_handle=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, void *stack=0, size_t stack_size=ACE_DEFAULT_THREAD_STACKSIZE, ACE_Thread_Adapter *thread_adapter=0, const char **thr_name=0)
ConditionVariable< ACE_Thread_Mutex > cv_
Definition: ThreadPool.h:67
DWORD ACE_thread_t
ThreadStatusManager tsm_
Definition: ThreadPool.h:68
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
static ACE_THR_FUNC_RETURN run(void *arg)
A static helper function used to redirect to requested thread start point.
Definition: ThreadPool.cpp:52

◆ ~ThreadPool()

OpenDDS::DCPS::ThreadPool::~ThreadPool ( )
virtual

Definition at line 47 of file ThreadPool.cpp.

References join_all().

48 {
49  join_all();
50 }

Member Function Documentation

◆ contains()

bool OpenDDS::DCPS::ThreadPool::contains ( ACE_thread_t  id) const

Check if a specific thread id belongs to this ThreadPool

Parameters
idthread id to check
Returns
true if the ThreadPool contains thread with specified id, false otherwise

Definition at line 73 of file ThreadPool.cpp.

References mutex_.

Referenced by OpenDDS::DCPS::DispatchService::shutdown().

74 {
76  return id_set_.count(id);
77 }
ACE_Thread_Mutex mutex_
Definition: ThreadPool.h:66

◆ join_all()

void OpenDDS::DCPS::ThreadPool::join_all ( )
private

Definition at line 79 of file ThreadPool.cpp.

References cv_, ACE_Thread::join(), mutex_, OPENDDS_ASSERT, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OPENDDS_VECTOR(), tsm_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

Referenced by ~ThreadPool().

80 {
81 #ifdef OPENDDS_NO_THREAD_JOIN
83  while (finished_threads_ != ids_.size()) {
84  cv_.wait(tsm_);
85  }
86 #else
88  {
90  ids = ids_;
91  }
92 
93  for (size_t i = 0; i < ids.size(); ++i) {
94  const int result = ACE_Thread::join(ids[i], 0);
95  ACE_UNUSED_ARG(result);
96  OPENDDS_ASSERT(result == 0);
97  }
98 #endif
99 }
ACE_Thread_Mutex mutex_
Definition: ThreadPool.h:66
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ConditionVariable< ACE_Thread_Mutex > cv_
Definition: ThreadPool.h:67
static int join(ACE_thread_t thread_id, ACE_thread_t *departed, ACE_THR_FUNC_RETURN *status)
HANDLE ACE_hthread_t
ThreadStatusManager tsm_
Definition: ThreadPool.h:68
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OPENDDS_VECTOR(ACE_hthread_t) ids_

◆ OPENDDS_SET()

OpenDDS::DCPS::ThreadPool::OPENDDS_SET ( ACE_thread_t  )
private

◆ OPENDDS_VECTOR()

OpenDDS::DCPS::ThreadPool::OPENDDS_VECTOR ( ACE_hthread_t  )
private

Referenced by join_all().

◆ run()

ACE_THR_FUNC_RETURN OpenDDS::DCPS::ThreadPool::run ( void *  arg)
static

A static helper function used to redirect to requested thread start point.

Definition at line 52 of file ThreadPool.cpp.

References active_threads_, arg_, cv_, fun_, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), ACE_Thread::self(), tsm_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

Referenced by ThreadPool().

53 {
54  ThreadPool& pool = *static_cast<ThreadPool*>(arg);
55  {
56  ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_);
57  pool.id_set_.insert(ACE_Thread::self());
58  ++pool.active_threads_;
59  pool.cv_.notify_all();
60  while (pool.active_threads_ != pool.ids_.size()) {
61  pool.cv_.wait(pool.tsm_);
62  }
63  }
64  (*pool.fun_)(pool.arg_);
65 #ifdef OPENDDS_NO_THREAD_JOIN
66  ACE_Guard<ACE_Thread_Mutex> guard(pool.mutex_);
67  ++pool.finished_threads_;
68  pool.cv_.notify_one();
69 #endif
70  return 0;
71 }
static ACE_thread_t self(void)
ThreadPool(size_t count, FunPtr fun, void *arg=0)
Definition: ThreadPool.cpp:22

Member Data Documentation

◆ active_threads_

size_t OpenDDS::DCPS::ThreadPool::active_threads_
private

Definition at line 69 of file ThreadPool.h.

Referenced by run(), and ThreadPool().

◆ arg_

void* OpenDDS::DCPS::ThreadPool::arg_
private

Definition at line 65 of file ThreadPool.h.

Referenced by run().

◆ cv_

ConditionVariable<ACE_Thread_Mutex> OpenDDS::DCPS::ThreadPool::cv_
mutableprivate

Definition at line 67 of file ThreadPool.h.

Referenced by join_all(), run(), and ThreadPool().

◆ fun_

FunPtr OpenDDS::DCPS::ThreadPool::fun_
private

Definition at line 64 of file ThreadPool.h.

Referenced by run().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::ThreadPool::mutex_
mutableprivate

Definition at line 66 of file ThreadPool.h.

Referenced by contains(), join_all(), run(), and ThreadPool().

◆ tsm_

ThreadStatusManager OpenDDS::DCPS::ThreadPool::tsm_
private

Definition at line 68 of file ThreadPool.h.

Referenced by join_all(), run(), and ThreadPool().


The documentation for this class was generated from the following files: