OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Types | Private Attributes | List of all members
OpenDDS::DCPS::QueueTaskBase< T > Class Template Referenceabstract

A simple ACE task that manages a queue of request. More...

#include <QueueTaskBase_T.h>

Inheritance diagram for OpenDDS::DCPS::QueueTaskBase< T >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::QueueTaskBase< T >:
Collaboration graph
[legend]

Public Member Functions

 QueueTaskBase ()
 
virtual ~QueueTaskBase ()
 
int add (const T &req)
 
virtual int open (void *=0)
 Activate the worker threads. More...
 
virtual int svc ()
 The "mainline" executed by the worker thread. More...
 
virtual int close (u_long flag=0)
 Called when the thread exits. More...
 
bool is_shutdown_initiated () const
 
virtual void execute (T &req)=0
 
- Public Member Functions inherited from ACE_Task_Base
 ACE_Task_Base (ACE_Thread_Manager *=0)
 
virtual ~ACE_Task_Base (void)
 
virtual int module_closed (void)
 
virtual int put (ACE_Message_Block *, ACE_Time_Value *=0)
 
virtual int activate (long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
 
virtual int wait (void)
 
virtual int suspend (void)
 
virtual int resume (void)
 
int grp_id (void) const
 
void grp_id (int)
 
ACE_Thread_Managerthr_mgr (void) const
 
void thr_mgr (ACE_Thread_Manager *)
 
int is_reader (void) const
 
int is_writer (void) const
 
size_t thr_count (void) const
 
ACE_thread_t last_thread (void) const
 
- Public Member Functions inherited from ACE_Service_Object
 ACE_Service_Object (ACE_Reactor *=0)
 
virtual ~ACE_Service_Object (void)
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
virtual Reference_Count add_reference (void)
 
virtual Reference_Count remove_reference (void)
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from ACE_Shared_Object
 ACE_Shared_Object (void)
 
virtual ~ACE_Shared_Object (void)
 
virtual int init (int argc, ACE_TCHAR *argv[])
 
virtual int fini (void)
 
virtual int info (ACE_TCHAR **info_string, size_t length=0) const
 

Private Types

typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef ConditionVariable< LockTypeConditionVariableType
 
typedef ACE_Unbounded_Queue< T > Queue
 

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object. More...
 
Queue queue_
 The request queue. More...
 
ConditionVariableType work_available_
 
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads. More...
 
bool opened_
 Flag used to avoid multiple open() calls. More...
 
ACE_thread_t thr_id_
 The id of the thread created by this task. More...
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Task_Base
static ACE_THR_FUNC_RETURN svc_run (void *)
 
static void cleanup (void *object, void *params)
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Service_Object
 ACE_ALLOC_HOOK_DECLARE
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Attributes inherited from ACE_Task_Base
size_t thr_count_
 
ACE_Thread_Managerthr_mgr_
 
u_long flags_
 
int grp_id_
 
ACE_thread_t last_thread_id_
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

template<typename T>
class OpenDDS::DCPS::QueueTaskBase< T >

A simple ACE task that manages a queue of request.

Definition at line 40 of file QueueTaskBase_T.h.

Member Typedef Documentation

◆ ConditionVariableType

Definition at line 205 of file QueueTaskBase_T.h.

◆ GuardType

template<typename T>
typedef ACE_Guard<LockType> OpenDDS::DCPS::QueueTaskBase< T >::GuardType
private

Definition at line 204 of file QueueTaskBase_T.h.

◆ LockType

template<typename T>
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::QueueTaskBase< T >::LockType
private

Definition at line 203 of file QueueTaskBase_T.h.

◆ Queue

template<typename T>
typedef ACE_Unbounded_Queue<T> OpenDDS::DCPS::QueueTaskBase< T >::Queue
private

Definition at line 207 of file QueueTaskBase_T.h.

Constructor & Destructor Documentation

◆ QueueTaskBase()

template<typename T>
OpenDDS::DCPS::QueueTaskBase< T >::QueueTaskBase ( )
inline

Definition at line 42 of file QueueTaskBase_T.h.

44  , shutdown_initiated_(false)
45  , opened_(false)
47  {
48  DBG_ENTRY("QueueTaskBase","QueueTaskBase");
49  }
ACE_thread_t thr_id_
The id of the thread created by this task.
ACE_thread_t NULL_thread
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ConditionVariableType work_available_
bool opened_
Flag used to avoid multiple open() calls.
LockType lock_
Lock to protect the "state" (all of the data members) of this object.

◆ ~QueueTaskBase()

template<typename T>
virtual OpenDDS::DCPS::QueueTaskBase< T >::~QueueTaskBase ( )
inlinevirtual

Definition at line 51 of file QueueTaskBase_T.h.

51  {
52  DBG_ENTRY("QueueTaskBase","~QueueTaskBase");
53  }
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72

Member Function Documentation

◆ add()

template<typename T>
int OpenDDS::DCPS::QueueTaskBase< T >::add ( const T &  req)
inline

Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).

Definition at line 58 of file QueueTaskBase_T.h.

58  {
59  DBG_ENTRY("QueueTaskBase","add");
60  GuardType guard(this->lock_);
61 
62  if (this->shutdown_initiated_)
63  return -1;
64 
65  int result = this->queue_.enqueue_tail(req);
66 
67  if (result == 0) {
69 
70  } else
71  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n",
72  ACE_TEXT("enqueue_tail")));
73 
74  return result;
75  }
ACE_Guard< LockType > GuardType
#define ACE_ERROR(X)
int enqueue_tail(const T &new_item)
Queue queue_
The request queue.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ConditionVariableType work_available_
ACE_TEXT("TCP_Factory")
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
bool notify_one()
Unblock one of the threads waiting on this condition.

◆ close()

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::close ( u_long  flag = 0)
inlinevirtual

Called when the thread exits.

Reimplemented from ACE_Task_Base.

Definition at line 166 of file QueueTaskBase_T.h.

166  {
167  DBG_ENTRY("QueueTaskBase","close");
168 
169  if (flag == 0)
170  return 0;
171 
172  {
173  GuardType guard(this->lock_);
174 
175  if (this->shutdown_initiated_)
176  return 0;
177 
178  // Set the shutdown flag to true.
179  this->shutdown_initiated_ = true;
181  }
182 
183  if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) {
184  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
185  ThreadStatusManager::Sleeper sleeper(thread_status_manager);
186  this->wait();
187  }
188 
189  return 0;
190  }
ACE_Guard< LockType > GuardType
ACE_thread_t thr_id_
The id of the thread created by this task.
ACE_thread_t thr_self(void)
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ConditionVariableType work_available_
virtual int wait(void)
bool opened_
Flag used to avoid multiple open() calls.
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
bool notify_one()
Unblock one of the threads waiting on this condition.
#define TheServiceParticipant

◆ execute()

template<typename T>
virtual void OpenDDS::DCPS::QueueTaskBase< T >::execute ( T &  req)
pure virtual

The subclass should implement this function to handle the dequeued request.

Implemented in OpenDDS::DCPS::DataLinkCleanupTask.

Referenced by OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::is_shutdown_initiated(), and OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::svc().

◆ is_shutdown_initiated()

template<typename T>
bool OpenDDS::DCPS::QueueTaskBase< T >::is_shutdown_initiated ( ) const
inline

Definition at line 192 of file QueueTaskBase_T.h.

192  {
193  GuardType guard(lock_);
194  return shutdown_initiated_;
195  }
ACE_Guard< LockType > GuardType
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
LockType lock_
Lock to protect the "state" (all of the data members) of this object.

◆ open()

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::open ( void *  = 0)
inlinevirtual

Activate the worker threads.

Reimplemented from ACE_Task_Base.

Definition at line 78 of file QueueTaskBase_T.h.

78  {
79  DBG_ENTRY("QueueTaskBase","open");
80 
81  GuardType guard(this->lock_);
82 
83  // We can assume that we are in the proper state to handle this open()
84  // call as long as we haven't been open()'ed before.
85  if (this->opened_) {
86  ACE_ERROR_RETURN((LM_ERROR,
87  "(%P|%t) QueueTaskBase failed to open. "
88  "Task has previously been open()'ed.\n"),
89  -1);
90  }
91 
92  // Activate this task object with one worker thread.
93  if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
94  // Assumes that when activate returns non-zero return code that
95  // no threads were activated.
96  ACE_ERROR_RETURN((LM_ERROR,
97  "(%P|%t) QueueTaskBase failed to activate "
98  "the worker threads.\n"),
99  -1);
100  }
101 
102  // Now we have past the point where we can say we've been open()'ed before.
103  this->opened_ = true;
104 
105  return 0;
106  }
ACE_Guard< LockType > GuardType
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
bool opened_
Flag used to avoid multiple open() calls.
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)

◆ svc()

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::svc ( void  )
inlinevirtual

The "mainline" executed by the worker thread.

Reimplemented from ACE_Task_Base.

Definition at line 109 of file QueueTaskBase_T.h.

109  {
110  DBG_ENTRY("QueueTaskBase","svc");
111 
112  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
113  const TimeDuration thread_status_interval = thread_status_manager.thread_status_interval();
114 
115  ThreadStatusManager::Start s(thread_status_manager, "QueueTaskBase");
116 
118 
119  // Start the "GetWork-And-PerformWork" loop for the current worker thread.
120  while (!this->shutdown_initiated_) {
121  T req;
122  {
123  GuardType guard(this->lock_);
124 
125  if (this->queue_.is_empty() && !shutdown_initiated_) {
126  if (thread_status_manager.update_thread_status()) {
127  MonotonicTimePoint expire = MonotonicTimePoint::now() + thread_status_interval;
128 
129  do {
130  work_available_.wait_until(expire, thread_status_manager);
131 
133  if (now > expire) {
134  expire = now + thread_status_interval;
135  }
136  } while (this->queue_.is_empty() && !shutdown_initiated_);
137  } else {
138  this->work_available_.wait(thread_status_manager);
139  }
140  }
141 
142  if (this->shutdown_initiated_)
143  break;
144 
145  int result = queue_.dequeue_head(req);
146 
147  if (result != 0) {
148  //I'm not sure why this thread got more signals than actual signals
149  //when using thread_per_connection and the user application thread
150  //send requests without interval. We just need ignore the dequeue
151  //failure.
152  //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::svc %p\n",
153  // ACE_TEXT("dequeue_head")));
154  continue;
155  }
156  }
157 
158  this->execute(req);
159  }
160 
161  // This will never get executed.
162  return 0;
163  }
ACE_Guard< LockType > GuardType
ACE_thread_t thr_id_
The id of the thread created by this task.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
int dequeue_head(T &item)
ACE_thread_t thr_self(void)
virtual void execute(T &req)=0
Queue queue_
The request queue.
bool is_empty(void) const
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
#define DBG_ENTRY(CNAME, MNAME)
Definition: EntryExit.h:72
ConditionVariableType work_available_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
#define TheServiceParticipant
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.

Member Data Documentation

◆ lock_

template<typename T>
LockType OpenDDS::DCPS::QueueTaskBase< T >::lock_
mutableprivate

◆ opened_

template<typename T>
bool OpenDDS::DCPS::QueueTaskBase< T >::opened_
private

Flag used to avoid multiple open() calls.

Definition at line 225 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::close(), and OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::open().

◆ queue_

template<typename T>
Queue OpenDDS::DCPS::QueueTaskBase< T >::queue_
private

◆ shutdown_initiated_

template<typename T>
bool OpenDDS::DCPS::QueueTaskBase< T >::shutdown_initiated_
private

◆ thr_id_

template<typename T>
ACE_thread_t OpenDDS::DCPS::QueueTaskBase< T >::thr_id_
private

The id of the thread created by this task.

Definition at line 228 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::close(), and OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::svc().

◆ work_available_

template<typename T>
ConditionVariableType OpenDDS::DCPS::QueueTaskBase< T >::work_available_
private

Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.

Definition at line 219 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::add(), OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::close(), and OpenDDS::DCPS::QueueTaskBase< DataLink_rch >::svc().


The documentation for this class was generated from the following file: