OpenDDS  Snapshot(2023/04/28-20:55)
DispatchService.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "debug.h"
11 #include "DispatchService.h"
12 #include "Service_Participant.h"
13 #include "TimeDuration.h"
14 
15 #include <ace/Reverse_Lock_T.h>
16 
18 
19 namespace OpenDDS {
20 namespace DCPS {
21 
23  : cv_(mutex_)
24  , allow_dispatch_(true)
25  , stop_when_empty_(false)
26  , running_(true)
27  , running_threads_(0)
28  , max_timer_id_(LONG_MAX)
29  , pool_(count, run, this)
30 {
31 }
32 
34 {
35  shutdown();
36 }
37 
38 void DispatchService::shutdown(bool immediate, EventQueue* const pending)
39 {
41  allow_dispatch_ = false;
42  stop_when_empty_ = true;
43  running_ = running_ && !immediate; // && with existing state in case shutdown has already been called
44  cv_.notify_all();
45 
47  if (log_level >= LogLevel::Error) {
48  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR :: DispatchService::shutdown: Contained Thread Attempting To Call Shutdown.\n"));
49  }
50  if (pending) {
51  pending->clear();
52  }
53  return;
54  }
55 
56  while (running_threads_) {
57  cv_.wait(TheServiceParticipant->get_thread_status_manager());
58  }
59 
60  if (pending) {
61  pending->clear();
62  pending->swap(event_queue_);
63  const TimerQueueMap& cmap = timer_queue_map_;
64  for (TimerQueueMap::const_iterator it = cmap.begin(), limit = cmap.end(); it != limit; ++it) {
65  pending->push_back(it->second.first);
66  }
67  } else {
68  event_queue_.clear();
69  }
70  timer_queue_map_.clear();
71  timer_id_map_.clear();
72 }
73 
75 {
76  if (!fun) {
77  return DS_ERROR;
78  }
79 
81  if (allow_dispatch_) {
82  event_queue_.push_back(std::make_pair(fun, arg));
83  cv_.notify_one();
84  return DS_SUCCESS;
85  }
86  return DS_ERROR;
87 }
88 
90 {
91  if (!fun) {
92  return TI_FAILURE;
93  }
94 
95  TimerId id = 0;
97  if (allow_dispatch_) {
98  TimerQueueMap::iterator pos = timer_queue_map_.insert(std::make_pair(expiration, std::make_pair(std::make_pair(fun, arg), 0)));
99  // Make it a loop in case we ever recycle timer ids
100  const TimerId starting_id = max_timer_id_;
101  do {
102  id = max_timer_id_ = max_timer_id_ == LONG_MAX ? 1 : max_timer_id_ + 1;
103  if (id == starting_id) {
104  return TI_FAILURE; // all ids in use ?!
105  }
106  pos->second.second = id;
107  } while (timer_id_map_.insert(std::make_pair(id, pos)).second == false);
108  cv_.notify_one();
109  return id;
110  }
111  return TI_FAILURE;
112 }
113 
115 {
117  TimerIdMap::iterator pos = timer_id_map_.find(id);
118  if (pos != timer_id_map_.end()) {
119  if (pos->second == timer_queue_map_.begin()) {
120  cv_.notify_all();
121  }
122  if (arg) {
123  *arg = pos->second->second.first.second;
124  }
125  timer_queue_map_.erase(pos->second);
126  timer_id_map_.erase(pos);
127  return 1;
128  }
129  return 0;
130 }
131 
132 size_t DispatchService::cancel(FunPtr fun, void* arg)
133 {
134  OPENDDS_ASSERT(fun);
135  size_t count = 0;
137  for (TimerQueueMap::iterator it = timer_queue_map_.begin(); it != timer_queue_map_.end();) {
138  if (it->second.first.first == fun && it->second.first.second == arg) {
139  if (it == timer_queue_map_.begin()) {
140  cv_.notify_all();
141  }
142  timer_id_map_.erase(it->second.second);
143  timer_queue_map_.erase(it++);
144  ++count;
145  } else {
146  ++it;
147  }
148  }
149  return count;
150 }
151 
152 ACE_THR_FUNC_RETURN DispatchService::run(void* arg)
153 {
154  DispatchService& dispatcher = *static_cast<DispatchService*>(arg);
155  dispatcher.run_event_loop();
156  return 0;
157 }
158 
160 {
164  while (running_) {
165 
166  // Logical Order:
167  // - Move expired timer events into normal event queue
168  // - Wait appropriate length if there's nothing to do
169  // - Check for early exit before execution
170  // - Run first task from event queue
171 
172  if (allow_dispatch_ && !timer_queue_map_.empty()) {
174 
175  TimerQueueMap::iterator last = timer_queue_map_.upper_bound(now), pos = last;
176  while (pos != timer_queue_map_.begin()) {
177  --pos;
178  event_queue_.push_back(pos->second.first);
179  timer_id_map_.erase(pos->second.second);
180  }
181  if (last != timer_queue_map_.begin()) {
182  timer_queue_map_.erase(timer_queue_map_.begin(), last);
183  }
184  }
185 
186  if (event_queue_.empty()) {
187  if (stop_when_empty_) {
188  running_ = false;
189  cv_.notify_all();
190  } else if (allow_dispatch_ && timer_queue_map_.size()) {
191  MonotonicTimePoint deadline(timer_queue_map_.begin()->first);
192  cv_.wait_until(deadline, TheServiceParticipant->get_thread_status_manager());
193  } else {
194  cv_.wait(TheServiceParticipant->get_thread_status_manager());
195  }
196  }
197 
198  if (!running_ || event_queue_.empty()) continue;
199 
200  FunArgPair pair = event_queue_.front();
201  event_queue_.pop_front();
202  ACE_Guard<ACE_Reverse_Lock<ACE_Thread_Mutex> > rev_guard(rev_lock);
203  pair.first(pair.second);
204  }
206  cv_.notify_all();
207 }
208 
209 } // DCPS
210 } // OpenDDS
211 
ConditionVariable< ACE_Thread_Mutex > cv_
size_t cancel(TimerId id, void **arg=0)
#define ACE_ERROR(X)
long TimerId
Typedef for Schedule Return Values.
bool notify_one()
Unblock one of the threads waiting on this condition.
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
std::pair< FunPtr, void * > FunArgPair
static const long TI_FAILURE
TimerId Failure Constant.
static const bool DS_ERROR
DispatchStatus Error Constant.
static ACE_THR_FUNC_RETURN run(void *arg)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool contains(ACE_thread_t id) const
Definition: ThreadPool.cpp:73
DispatchStatus dispatch(FunPtr fun, void *arg=0)
static ACE_thread_t self(void)
bool DispatchStatus
Typedef for Dispatch Return Values.
TimerId schedule(FunPtr fun, void *arg=0, const MonotonicTimePoint &expiration=MonotonicTimePoint::now())
bool notify_all()
Unblock all of the threads waiting on this condition.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS_Dcps_Export LogLevel log_level
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void shutdown(bool immediate=false, EventQueue *pending=0)
static const bool DS_SUCCESS
DispatchStatus Success Constant.
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28