OpenDDS  Snapshot(2023/04/28-20:55)
QueueTaskBase_T.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_QUEUETASKBASE_T_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_QUEUETASKBASE_T_H
10 
11 #include "EntryExit.h"
12 
13 #include <dds/DCPS/PoolAllocator.h>
17 #include <dds/DCPS/TimeTypes.h>
19 
20 #if !defined (ACE_LACKS_PRAGMA_ONCE)
21 # pragma once
22 #endif /* ACE_LACKS_PRAGMA_ONCE */
23 
24 #include <ace/Task.h>
25 #include <ace/Unbounded_Queue.h>
26 #include <ace/INET_Addr.h>
27 #include <ace/Synch_Traits.h>
28 
30 
31 namespace OpenDDS {
32 namespace DCPS {
33 
34 /**
35  * @class QueueTaskBase
36  *
37  * @brief A simple ACE task that manages a queue of request.
38  */
39 template <typename T>
40 class QueueTaskBase : public ACE_Task_Base {
41 public:
44  , shutdown_initiated_(false)
45  , opened_(false)
47  {
48  DBG_ENTRY("QueueTaskBase","QueueTaskBase");
49  }
50 
51  virtual ~QueueTaskBase() {
52  DBG_ENTRY("QueueTaskBase","~QueueTaskBase");
53  }
54 
55  /// Put the request to the request queue.
56  /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this
57  /// task is shutdown).
58  int add(const T& req) {
59  DBG_ENTRY("QueueTaskBase","add");
60  GuardType guard(this->lock_);
61 
62  if (this->shutdown_initiated_)
63  return -1;
64 
65  int result = this->queue_.enqueue_tail(req);
66 
67  if (result == 0) {
69 
70  } else
71  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n",
72  ACE_TEXT("enqueue_tail")));
73 
74  return result;
75  }
76 
77  /// Activate the worker threads
78  virtual int open(void* = 0) {
79  DBG_ENTRY("QueueTaskBase","open");
80 
81  GuardType guard(this->lock_);
82 
83  // We can assume that we are in the proper state to handle this open()
84  // call as long as we haven't been open()'ed before.
85  if (this->opened_) {
87  "(%P|%t) QueueTaskBase failed to open. "
88  "Task has previously been open()'ed.\n"),
89  -1);
90  }
91 
92  // Activate this task object with one worker thread.
93  if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
94  // Assumes that when activate returns non-zero return code that
95  // no threads were activated.
97  "(%P|%t) QueueTaskBase failed to activate "
98  "the worker threads.\n"),
99  -1);
100  }
101 
102  // Now we have past the point where we can say we've been open()'ed before.
103  this->opened_ = true;
104 
105  return 0;
106  }
107 
108  /// The "mainline" executed by the worker thread.
109  virtual int svc() {
110  DBG_ENTRY("QueueTaskBase","svc");
111 
112  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
113  const TimeDuration thread_status_interval = thread_status_manager.thread_status_interval();
114 
115  ThreadStatusManager::Start s(thread_status_manager, "QueueTaskBase");
116 
118 
119  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
120  while (!this->shutdown_initiated_) {
121  T req;
122  {
123  GuardType guard(this->lock_);
124 
125  if (this->queue_.is_empty() && !shutdown_initiated_) {
126  if (thread_status_manager.update_thread_status()) {
127  MonotonicTimePoint expire = MonotonicTimePoint::now() + thread_status_interval;
128 
129  do {
130  work_available_.wait_until(expire, thread_status_manager);
131 
133  if (now > expire) {
134  expire = now + thread_status_interval;
135  }
136  } while (this->queue_.is_empty() && !shutdown_initiated_);
137  } else {
138  this->work_available_.wait(thread_status_manager);
139  }
140  }
141 
142  if (this->shutdown_initiated_)
143  break;
144 
145  int result = queue_.dequeue_head(req);
146 
147  if (result != 0) {
148  //I'm not sure why this thread got more signals than actual signals
149  //when using thread_per_connection and the user application thread
150  //send requests without interval. We just need ignore the dequeue
151  //failure.
152  //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::svc %p\n",
153  // ACE_TEXT("dequeue_head")));
154  continue;
155  }
156  }
157 
158  this->execute(req);
159  }
160 
161  // This will never get executed.
162  return 0;
163  }
164 
165  /// Called when the thread exits.
166  virtual int close(u_long flag = 0) {
167  DBG_ENTRY("QueueTaskBase","close");
168 
169  if (flag == 0)
170  return 0;
171 
172  {
173  GuardType guard(this->lock_);
174 
175  if (this->shutdown_initiated_)
176  return 0;
177 
178  // Set the shutdown flag to true.
179  this->shutdown_initiated_ = true;
181  }
182 
183  if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
184  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
185  ThreadStatusManager::Sleeper sleeper(thread_status_manager);
186  this->wait();
187  }
188 
189  return 0;
190  }
191 
192  bool is_shutdown_initiated() const {
193  GuardType guard(lock_);
194  return shutdown_initiated_;
195  }
196 
197  /// The subclass should implement this function to handle the
198  /// dequeued request.
199  virtual void execute(T& req) = 0;
200 
201 private:
202 
206 
208 
209  /// Lock to protect the "state" (all of the data members) of this object.
210  mutable LockType lock_;
211 
212  /// The request queue.
213  Queue queue_;
214 
215  /// Condition used to signal the worker threads that they may be able to
216  /// find a request in the queue_ that needs to be executed.
217  /// This condition will be signal()'ed each time a request is
218  /// added to the queue_, and also when this task is shutdown.
219  ConditionVariableType work_available_;
220 
221  /// Flag used to initiate a shutdown request to all worker threads.
223 
224  /// Flag used to avoid multiple open() calls.
225  bool opened_;
226 
227  /// The id of the thread created by this task.
229 };
230 
231 } // namespace DCPS
232 } // namespace OpenDDS
233 
235 
236 #endif /* OPENDDS_DCPS_QUEUE_TASK_BASE_T_H */
void thread_status_interval(const TimeDuration &thread_status_interval)
#define ACE_ERROR(X)
#define ACE_SYNCH_MUTEX
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
virtual int close(u_long flag=0)
Called when the thread exits.
Queue queue_
The request queue.
A simple ACE task that manages a queue of request.
int dequeue_head(T &item)
bool notify_one()
Unblock one of the threads waiting on this condition.
ACE_thread_t thr_self(void)
virtual int svc()
The "mainline" executed by the worker thread.
int enqueue_tail(const T &new_item)
ACE_thread_t thr_id_
The id of the thread created by this task.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ACE_thread_t NULL_thread
ConditionVariableType work_available_
virtual void execute(T &req)=0
DWORD ACE_thread_t
bool is_empty(void) const
ConditionVariable< LockType > ConditionVariableType
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
bool opened_
Flag used to avoid multiple open() calls.
ACE_Guard< LockType > GuardType
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
virtual int wait(void)
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
ACE_TEXT("TCP_Factory")
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
ACE_Unbounded_Queue< T > Queue
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual int open(void *=0)
Activate the worker threads.