OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::DispatchService Class Reference

#include <DispatchService.h>

Inheritance diagram for OpenDDS::DCPS::DispatchService:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DispatchService:
Collaboration graph
[legend]

Public Types

typedef bool DispatchStatus
 Typedef for Dispatch Return Values. More...
 
typedef long TimerId
 Typedef for Schedule Return Values. More...
 
typedef void(* FunPtr) (void *)
 
typedef std::pair< FunPtr, void * > FunArgPair
 

Public Member Functions

typedef OPENDDS_DEQUE (FunArgPair) EventQueue
 
 DispatchService (size_t count=1)
 
virtual ~DispatchService ()
 
void shutdown (bool immediate=false, EventQueue *pending=0)
 
DispatchStatus dispatch (FunPtr fun, void *arg=0)
 
template<typename T >
DispatchStatus dispatch (T &ref)
 Helper function to dispatch arbitrary function objects (see fun_ptr_proxy) More...
 
TimerId schedule (FunPtr fun, void *arg=0, const MonotonicTimePoint &expiration=MonotonicTimePoint::now())
 
template<typename T >
TimerId schedule (T &ref, const MonotonicTimePoint &expiration=MonotonicTimePoint::now())
 Helper function to schedule arbitrary function objects (see fun_ptr_proxy) More...
 
size_t cancel (TimerId id, void **arg=0)
 
size_t cancel (FunPtr fun, void *arg=0)
 
template<typename T >
size_t cancel (T &ref)
 Helper function to cancel arbitrary function objects (see fun_ptr_proxy) More...
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Member Functions

template<typename T >
static void fun_ptr_proxy (void *arg)
 Helper function for adapting arbitrary function objects (with operator()) More...
 

Static Public Attributes

static const bool DS_SUCCESS = true
 DispatchStatus Success Constant. More...
 
static const bool DS_ERROR = false
 DispatchStatus Error Constant. More...
 
static const long TI_FAILURE = -1
 TimerId Failure Constant. More...
 

Private Types

typedef std::pair< FunArgPair, TimerIdTimerPair
 

Private Member Functions

void run_event_loop ()
 
typedef OPENDDS_MULTIMAP (MonotonicTimePoint, TimerPair) TimerQueueMap
 
typedef OPENDDS_MAP (TimerId, TimerQueueMap::iterator) TimerIdMap
 

Static Private Member Functions

static ACE_THR_FUNC_RETURN run (void *arg)
 

Private Attributes

ACE_Thread_Mutex mutex_
 
ConditionVariable< ACE_Thread_Mutexcv_
 
bool allow_dispatch_
 
bool stop_when_empty_
 
bool running_
 
size_t running_threads_
 
EventQueue event_queue_
 
TimerQueueMap timer_queue_map_
 
TimerIdMap timer_id_map_
 
TimerId max_timer_id_
 
ThreadPool pool_
 

Additional Inherited Members

- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Detailed Description

Definition at line 24 of file DispatchService.h.

Member Typedef Documentation

◆ DispatchStatus

Typedef for Dispatch Return Values.

Definition at line 36 of file DispatchService.h.

◆ FunArgPair

Definition at line 51 of file DispatchService.h.

◆ FunPtr

typedef void(* OpenDDS::DCPS::DispatchService::FunPtr) (void *)

Definition at line 50 of file DispatchService.h.

◆ TimerId

Typedef for Schedule Return Values.

Definition at line 45 of file DispatchService.h.

◆ TimerPair

Definition at line 129 of file DispatchService.h.

Constructor & Destructor Documentation

◆ DispatchService()

OpenDDS::DCPS::DispatchService::DispatchService ( size_t  count = 1)
explicit

Create a DispatchService

Parameters
countthe requested size of the internal thread pool

Definition at line 22 of file DispatchService.cpp.

23  : cv_(mutex_)
24  , allow_dispatch_(true)
25  , stop_when_empty_(false)
26  , running_(true)
27  , running_threads_(0)
28  , max_timer_id_(LONG_MAX)
29  , pool_(count, run, this)
30 {
31 }
ConditionVariable< ACE_Thread_Mutex > cv_
static ACE_THR_FUNC_RETURN run(void *arg)

◆ ~DispatchService()

OpenDDS::DCPS::DispatchService::~DispatchService ( )
virtual

Definition at line 33 of file DispatchService.cpp.

References shutdown().

34 {
35  shutdown();
36 }
void shutdown(bool immediate=false, EventQueue *pending=0)

Member Function Documentation

◆ cancel() [1/3]

size_t OpenDDS::DCPS::DispatchService::cancel ( DispatchService::TimerId  id,
void **  arg = 0 
)

Cancel a scheduled event by id

Parameters
idthe scheduled timer id to cancel
argif specified, arg will be set to the arg value passed in at time of scheduling
Returns
the number of timers successfully canceled (0 or 1)

Definition at line 114 of file DispatchService.cpp.

References cv_, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), timer_id_map_, and timer_queue_map_.

Referenced by OpenDDS::DCPS::ServiceEventDispatcher::cancel().

115 {
117  TimerIdMap::iterator pos = timer_id_map_.find(id);
118  if (pos != timer_id_map_.end()) {
119  if (pos->second == timer_queue_map_.begin()) {
120  cv_.notify_all();
121  }
122  if (arg) {
123  *arg = pos->second->second.first.second;
124  }
125  timer_queue_map_.erase(pos->second);
126  timer_id_map_.erase(pos);
127  return 1;
128  }
129  return 0;
130 }
bool notify_all()
Unblock all of the threads waiting on this condition.
ConditionVariable< ACE_Thread_Mutex > cv_

◆ cancel() [2/3]

size_t OpenDDS::DCPS::DispatchService::cancel ( FunPtr  fun,
void *  arg = 0 
)

Cancel a scheduled event by function pointer

Parameters
funthe function pointer of the event/s to cancel
argthe argument passed at time of scheduling
Returns
the number of timers successfully canceled (potentially several)

Definition at line 132 of file DispatchService.cpp.

References cv_, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OPENDDS_ASSERT, timer_id_map_, and timer_queue_map_.

133 {
134  OPENDDS_ASSERT(fun);
135  size_t count = 0;
137  for (TimerQueueMap::iterator it = timer_queue_map_.begin(); it != timer_queue_map_.end();) {
138  if (it->second.first.first == fun && it->second.first.second == arg) {
139  if (it == timer_queue_map_.begin()) {
140  cv_.notify_all();
141  }
142  timer_id_map_.erase(it->second.second);
143  timer_queue_map_.erase(it++);
144  ++count;
145  } else {
146  ++it;
147  }
148  }
149  return count;
150 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
bool notify_all()
Unblock all of the threads waiting on this condition.
ConditionVariable< ACE_Thread_Mutex > cv_

◆ cancel() [3/3]

template<typename T >
size_t OpenDDS::DCPS::DispatchService::cancel ( T &  ref)
inline

Helper function to cancel arbitrary function objects (see fun_ptr_proxy)

Definition at line 119 of file DispatchService.h.

120  {
121  return cancel(fun_ptr_proxy<T>, &ref);
122  }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
size_t cancel(TimerId id, void **arg=0)

◆ dispatch() [1/2]

DispatchService::DispatchStatus OpenDDS::DCPS::DispatchService::dispatch ( FunPtr  fun,
void *  arg = 0 
)

Dispatch an event

Parameters
funthe function pointer to dispatch
argthe argument to pass during dispatch
Returns
true if event successfully enqueue, otherwise false

Definition at line 74 of file DispatchService.cpp.

References allow_dispatch_, cv_, DS_ERROR, DS_SUCCESS, event_queue_, mutex_, and OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one().

Referenced by OpenDDS::DCPS::ServiceEventDispatcher::dispatch().

75 {
76  if (!fun) {
77  return DS_ERROR;
78  }
79 
81  if (allow_dispatch_) {
82  event_queue_.push_back(std::make_pair(fun, arg));
83  cv_.notify_one();
84  return DS_SUCCESS;
85  }
86  return DS_ERROR;
87 }
ConditionVariable< ACE_Thread_Mutex > cv_
static const bool DS_SUCCESS
DispatchStatus Success Constant.
bool notify_one()
Unblock one of the threads waiting on this condition.
static const bool DS_ERROR
DispatchStatus Error Constant.

◆ dispatch() [2/2]

template<typename T >
DispatchStatus OpenDDS::DCPS::DispatchService::dispatch ( T &  ref)
inline

Helper function to dispatch arbitrary function objects (see fun_ptr_proxy)

Definition at line 80 of file DispatchService.h.

References OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().

81  {
82  return dispatch(fun_ptr_proxy<T>, &ref);
83  }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
DispatchStatus dispatch(FunPtr fun, void *arg=0)

◆ fun_ptr_proxy()

template<typename T >
static void OpenDDS::DCPS::DispatchService::fun_ptr_proxy ( void *  arg)
inlinestatic

Helper function for adapting arbitrary function objects (with operator())

Definition at line 29 of file DispatchService.h.

References OpenDDS::DCPS::ref().

30  {
31  T& ref = *static_cast<T*>(arg);
32  ref();
33  }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237

◆ OPENDDS_DEQUE()

typedef OpenDDS::DCPS::DispatchService::OPENDDS_DEQUE ( FunArgPair  )

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::DispatchService::OPENDDS_MAP ( TimerId  ,
TimerQueueMap::iterator   
)
private

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::DispatchService::OPENDDS_MULTIMAP ( MonotonicTimePoint  ,
TimerPair   
)
private

◆ run()

ACE_THR_FUNC_RETURN OpenDDS::DCPS::DispatchService::run ( void *  arg)
staticprivate

Definition at line 152 of file DispatchService.cpp.

References run_event_loop().

153 {
154  DispatchService& dispatcher = *static_cast<DispatchService*>(arg);
155  dispatcher.run_event_loop();
156  return 0;
157 }

◆ run_event_loop()

void OpenDDS::DCPS::DispatchService::run_event_loop ( void  )
private

Definition at line 159 of file DispatchService.cpp.

References allow_dispatch_, cv_, event_queue_, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, running_, running_threads_, stop_when_empty_, TheServiceParticipant, timer_id_map_, timer_queue_map_, OpenDDS::DCPS::ConditionVariable< Mutex >::wait(), and OpenDDS::DCPS::ConditionVariable< Mutex >::wait_until().

Referenced by run().

160 {
164  while (running_) {
165 
166  // Logical Order:
167  // - Move expired timer events into normal event queue
168  // - Wait appropriate length if there's nothing to do
169  // - Check for early exit before execution
170  // - Run first task from event queue
171 
172  if (allow_dispatch_ && !timer_queue_map_.empty()) {
174 
175  TimerQueueMap::iterator last = timer_queue_map_.upper_bound(now), pos = last;
176  while (pos != timer_queue_map_.begin()) {
177  --pos;
178  event_queue_.push_back(pos->second.first);
179  timer_id_map_.erase(pos->second.second);
180  }
181  if (last != timer_queue_map_.begin()) {
182  timer_queue_map_.erase(timer_queue_map_.begin(), last);
183  }
184  }
185 
186  if (event_queue_.empty()) {
187  if (stop_when_empty_) {
188  running_ = false;
189  cv_.notify_all();
190  } else if (allow_dispatch_ && timer_queue_map_.size()) {
191  MonotonicTimePoint deadline(timer_queue_map_.begin()->first);
192  cv_.wait_until(deadline, TheServiceParticipant->get_thread_status_manager());
193  } else {
194  cv_.wait(TheServiceParticipant->get_thread_status_manager());
195  }
196  }
197 
198  if (!running_ || event_queue_.empty()) continue;
199 
200  FunArgPair pair = event_queue_.front();
201  event_queue_.pop_front();
202  ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock);
203  pair.first(pair.second);
204  }
206  cv_.notify_all();
207 }
std::pair< FunPtr, void * > FunArgPair
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool notify_all()
Unblock all of the threads waiting on this condition.
ConditionVariable< ACE_Thread_Mutex > cv_
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.

◆ schedule() [1/2]

DispatchService::TimerId OpenDDS::DCPS::DispatchService::schedule ( FunPtr  fun,
void *  arg = 0,
const MonotonicTimePoint expiration = MonotonicTimePoint::now() 
)

Schedule the future dispatch of an event

Parameters
funthe function pointer to schedule
argthe argument to pass during dispatch
expirationthe requested dispatch time (no earlier than)
Returns
-1 on a failure, otherwise the timer id for the future dispatch

Definition at line 89 of file DispatchService.cpp.

References allow_dispatch_, cv_, max_timer_id_, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_one(), TI_FAILURE, timer_id_map_, and timer_queue_map_.

Referenced by OpenDDS::DCPS::ServiceEventDispatcher::schedule().

90 {
91  if (!fun) {
92  return TI_FAILURE;
93  }
94 
95  TimerId id = 0;
97  if (allow_dispatch_) {
98  TimerQueueMap::iterator pos = timer_queue_map_.insert(std::make_pair(expiration, std::make_pair(std::make_pair(fun, arg), 0)));
99  // Make it a loop in case we ever recycle timer ids
100  const TimerId starting_id = max_timer_id_;
101  do {
102  id = max_timer_id_ = max_timer_id_ == LONG_MAX ? 1 : max_timer_id_ + 1;
103  if (id == starting_id) {
104  return TI_FAILURE; // all ids in use ?!
105  }
106  pos->second.second = id;
107  } while (timer_id_map_.insert(std::make_pair(id, pos)).second == false);
108  cv_.notify_one();
109  return id;
110  }
111  return TI_FAILURE;
112 }
ConditionVariable< ACE_Thread_Mutex > cv_
long TimerId
Typedef for Schedule Return Values.
bool notify_one()
Unblock one of the threads waiting on this condition.
static const long TI_FAILURE
TimerId Failure Constant.

◆ schedule() [2/2]

template<typename T >
TimerId OpenDDS::DCPS::DispatchService::schedule ( T &  ref,
const MonotonicTimePoint expiration = MonotonicTimePoint::now() 
)
inline

Helper function to schedule arbitrary function objects (see fun_ptr_proxy)

Definition at line 96 of file DispatchService.h.

97  {
98  return schedule(fun_ptr_proxy<T>, &ref, expiration);
99  }
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
TimerId schedule(FunPtr fun, void *arg=0, const MonotonicTimePoint &expiration=MonotonicTimePoint::now())

◆ shutdown()

void OpenDDS::DCPS::DispatchService::shutdown ( bool  immediate = false,
EventQueue *  pending = 0 
)

Request shutdown of this DispatchService, which prevents sucessful future calls to either dispatch or schedule and cancels all scheduled events.

Parameters
immediateprevent any further dispatches from event queue, otherwise allow current queue to empty
pendingAn EventQueue object in which to store canceled events in case extra processing is required

Definition at line 38 of file DispatchService.cpp.

References ACE_ERROR, allow_dispatch_, OpenDDS::DCPS::ThreadPool::contains(), cv_, OpenDDS::DCPS::LogLevel::Error, event_queue_, LM_ERROR, OpenDDS::DCPS::log_level, mutex_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), pool_, running_, running_threads_, ACE_Thread::self(), stop_when_empty_, TheServiceParticipant, timer_id_map_, timer_queue_map_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

Referenced by OpenDDS::DCPS::ServiceEventDispatcher::shutdown(), and ~DispatchService().

39 {
41  allow_dispatch_ = false;
42  stop_when_empty_ = true;
43  running_ = running_ && !immediate; // && with existing state in case shutdown has already been called
44  cv_.notify_all();
45 
47  if (log_level >= LogLevel::Error) {
48  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR :: DispatchService::shutdown: Contained Thread Attempting To Call Shutdown.\n"));
49  }
50  if (pending) {
51  pending->clear();
52  }
53  return;
54  }
55 
56  while (running_threads_) {
57  cv_.wait(TheServiceParticipant->get_thread_status_manager());
58  }
59 
60  if (pending) {
61  pending->clear();
62  pending->swap(event_queue_);
63  const TimerQueueMap& cmap = timer_queue_map_;
64  for (TimerQueueMap::const_iterator it = cmap.begin(), limit = cmap.end(); it != limit; ++it) {
65  pending->push_back(it->second.first);
66  }
67  } else {
68  event_queue_.clear();
69  }
70  timer_queue_map_.clear();
71  timer_id_map_.clear();
72 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
bool notify_all()
Unblock all of the threads waiting on this condition.
ConditionVariable< ACE_Thread_Mutex > cv_
static ACE_thread_t self(void)
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
bool contains(ACE_thread_t id) const
Definition: ThreadPool.cpp:73
#define TheServiceParticipant

Member Data Documentation

◆ allow_dispatch_

bool OpenDDS::DCPS::DispatchService::allow_dispatch_
private

Definition at line 135 of file DispatchService.h.

Referenced by dispatch(), run_event_loop(), schedule(), and shutdown().

◆ cv_

ConditionVariable<ACE_Thread_Mutex> OpenDDS::DCPS::DispatchService::cv_
mutableprivate

Definition at line 134 of file DispatchService.h.

Referenced by cancel(), dispatch(), run_event_loop(), schedule(), and shutdown().

◆ DS_ERROR

const bool OpenDDS::DCPS::DispatchService::DS_ERROR = false
static

DispatchStatus Error Constant.

Definition at line 42 of file DispatchService.h.

Referenced by dispatch().

◆ DS_SUCCESS

const bool OpenDDS::DCPS::DispatchService::DS_SUCCESS = true
static

DispatchStatus Success Constant.

Definition at line 39 of file DispatchService.h.

Referenced by dispatch().

◆ event_queue_

EventQueue OpenDDS::DCPS::DispatchService::event_queue_
private

Definition at line 139 of file DispatchService.h.

Referenced by dispatch(), run_event_loop(), and shutdown().

◆ max_timer_id_

TimerId OpenDDS::DCPS::DispatchService::max_timer_id_
private

Definition at line 142 of file DispatchService.h.

Referenced by schedule().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::DispatchService::mutex_
mutableprivate

Definition at line 133 of file DispatchService.h.

Referenced by cancel(), dispatch(), run_event_loop(), schedule(), and shutdown().

◆ pool_

ThreadPool OpenDDS::DCPS::DispatchService::pool_
private

Definition at line 143 of file DispatchService.h.

Referenced by shutdown().

◆ running_

bool OpenDDS::DCPS::DispatchService::running_
private

Definition at line 137 of file DispatchService.h.

Referenced by run_event_loop(), and shutdown().

◆ running_threads_

size_t OpenDDS::DCPS::DispatchService::running_threads_
private

Definition at line 138 of file DispatchService.h.

Referenced by run_event_loop(), and shutdown().

◆ stop_when_empty_

bool OpenDDS::DCPS::DispatchService::stop_when_empty_
private

Definition at line 136 of file DispatchService.h.

Referenced by run_event_loop(), and shutdown().

◆ TI_FAILURE

const long OpenDDS::DCPS::DispatchService::TI_FAILURE = -1
static

TimerId Failure Constant.

Definition at line 48 of file DispatchService.h.

Referenced by schedule().

◆ timer_id_map_

TimerIdMap OpenDDS::DCPS::DispatchService::timer_id_map_
private

Definition at line 141 of file DispatchService.h.

Referenced by cancel(), run_event_loop(), schedule(), and shutdown().

◆ timer_queue_map_

TimerQueueMap OpenDDS::DCPS::DispatchService::timer_queue_map_
private

Definition at line 140 of file DispatchService.h.

Referenced by cancel(), run_event_loop(), schedule(), and shutdown().


The documentation for this class was generated from the following files: