8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_QUEUETASKBASE_T_H 9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_QUEUETASKBASE_T_H 20 #if !defined (ACE_LACKS_PRAGMA_ONCE) 48 DBG_ENTRY(
"QueueTaskBase",
"QueueTaskBase");
52 DBG_ENTRY(
"QueueTaskBase",
"~QueueTaskBase");
58 int add(
const T& req) {
78 virtual int open(
void* = 0) {
87 "(%P|%t) QueueTaskBase failed to open. " 88 "Task has previously been open()'ed.\n"),
93 if (this->
activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
97 "(%P|%t) QueueTaskBase failed to activate " 98 "the worker threads.\n"),
134 expire = now + thread_status_interval;
166 virtual int close(u_long flag = 0) {
199 virtual void execute(T& req) = 0;
void thread_status_interval(const TimeDuration &thread_status_interval)
LockType lock_
Lock to protect the "state" (all of the data members) of this object.
virtual int close(u_long flag=0)
Called when the thread exits.
Queue queue_
The request queue.
A simple ACE task that manages a queue of request.
bool is_shutdown_initiated() const
int dequeue_head(T &item)
bool notify_one()
Unblock one of the threads waiting on this condition.
ACE_thread_t thr_self(void)
virtual int svc()
The "mainline" executed by the worker thread.
int enqueue_tail(const T &new_item)
bool update_thread_status() const
ACE_thread_t thr_id_
The id of the thread created by this task.
static TimePoint_T< MonotonicClock > now()
ConditionVariableType work_available_
virtual void execute(T &req)=0
bool is_empty(void) const
ConditionVariable< LockType > ConditionVariableType
int thr_equal(ACE_thread_t t1, ACE_thread_t t2)
bool opened_
Flag used to avoid multiple open() calls.
ACE_Guard< LockType > GuardType
#define DBG_ENTRY(CNAME, MNAME)
bool shutdown_initiated_
Flag used to initiate a shutdown request to all worker threads.
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
ACE_Unbounded_Queue< T > Queue
CvStatus wait_until(const MonotonicTimePoint &expire_at, ThreadStatusManager &thread_status_manager)
Block until woken up or until expire_at. Same as wait() if expire_at is zero.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ACE_ERROR_RETURN(X, Y)
virtual int activate(long flags=THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED, int n_threads=1, int force_active=0, long priority=ACE_DEFAULT_THREAD_PRIORITY, int grp_id=-1, ACE_Task_Base *task=0, ACE_hthread_t thread_handles[]=0, void *stack[]=0, size_t stack_size[]=0, ACE_thread_t thread_ids[]=0, const char *thr_name[]=0)
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
virtual int open(void *=0)
Activate the worker threads.