OpenDDS::DCPS::QueueTaskBase< T > Class Template Reference

A simple ACE task that manages a queue of request. More...

#include <QueueTaskBase_T.h>

Inheritance diagram for OpenDDS::DCPS::QueueTaskBase< T >:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::QueueTaskBase< T >:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 QueueTaskBase ()
 Constructor.
virtual ~QueueTaskBase ()
 Virtual Destructor.
int add (const T &req)
virtual int open (void *=0)
 Activate the worker threads.
virtual int svc ()
 The "mainline" executed by the worker thread.
virtual int close (u_long flag=0)
 Called when the thread exits.
virtual void execute (T &req)=0

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef ACE_Condition< LockTypeConditionType
typedef ACE_Unbounded_Queue<
T > 
Queue

Private Attributes

LockType lock_
 Lock to protect the "state" (all of the data members) of this object.
Queue queue_
 The request queue.
ConditionType work_available_
bool shutdown_initiated_
 Flag used to initiate a shutdown request to all worker threads.
bool opened_
 Flag used to avoid multiple open() calls.
ACE_thread_t thr_id_
 The id of the thread created by this task.

Detailed Description

template<typename T>
class OpenDDS::DCPS::QueueTaskBase< T >

A simple ACE task that manages a queue of request.

Definition at line 34 of file QueueTaskBase_T.h.


Member Typedef Documentation

template<typename T>
typedef ACE_Condition<LockType> OpenDDS::DCPS::QueueTaskBase< T >::ConditionType [private]

Definition at line 175 of file QueueTaskBase_T.h.

template<typename T>
typedef ACE_Guard<LockType> OpenDDS::DCPS::QueueTaskBase< T >::GuardType [private]

Definition at line 174 of file QueueTaskBase_T.h.

template<typename T>
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::QueueTaskBase< T >::LockType [private]

Definition at line 173 of file QueueTaskBase_T.h.

template<typename T>
typedef ACE_Unbounded_Queue<T> OpenDDS::DCPS::QueueTaskBase< T >::Queue [private]

Definition at line 177 of file QueueTaskBase_T.h.


Constructor & Destructor Documentation

template<typename T>
OpenDDS::DCPS::QueueTaskBase< T >::QueueTaskBase (  )  [inline]

Constructor.

Definition at line 38 of file QueueTaskBase_T.h.

00039   : work_available_(lock_),
00040       shutdown_initiated_(false),
00041       opened_(false),
00042       thr_id_(ACE_OS::NULL_thread) {
00043     DBG_ENTRY("QueueTaskBase","QueueTaskBase");
00044   }

template<typename T>
virtual OpenDDS::DCPS::QueueTaskBase< T >::~QueueTaskBase (  )  [inline, virtual]

Virtual Destructor.

Definition at line 47 of file QueueTaskBase_T.h.

00047                            {
00048     DBG_ENTRY("QueueTaskBase","~QueueTaskBase");
00049   }


Member Function Documentation

template<typename T>
int OpenDDS::DCPS::QueueTaskBase< T >::add ( const T &  req  )  [inline]

Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).

Definition at line 54 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::TcpConnection::handle_close(), OpenDDS::DCPS::TcpTransport::passive_connection(), OpenDDS::DCPS::TransportImpl::release_link_resources(), and OpenDDS::DCPS::TcpConnection::relink_from_send().

00054                         {
00055     DBG_ENTRY("QueueTaskBase","add");
00056     GuardType guard(this->lock_);
00057 
00058     if (this->shutdown_initiated_)
00059       return -1;
00060 
00061     int result = this->queue_.enqueue_tail(req);
00062 
00063     if (result == 0) {
00064       this->work_available_.signal();
00065 
00066     } else
00067       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n",
00068        ACE_TEXT("enqueue_tail")));
00069 
00070     return result;
00071   }

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::close ( u_long  flag = 0  )  [inline, virtual]

Called when the thread exits.

Definition at line 144 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::TransportImpl::shutdown(), OpenDDS::DCPS::TcpConnection::shutdown(), OpenDDS::DCPS::TcpTransport::shutdown_i(), OpenDDS::DCPS::TcpConnection::transfer(), OpenDDS::DCPS::TcpConnection::~TcpConnection(), and OpenDDS::DCPS::TcpTransport::~TcpTransport().

00144                                      {
00145     DBG_ENTRY("QueueTaskBase","close");
00146 
00147     if (flag == 0)
00148       return 0;
00149 
00150     {
00151       GuardType guard(this->lock_);
00152 
00153       if (this->shutdown_initiated_)
00154         return 0;
00155 
00156       // Set the shutdown flag to true.
00157       this->shutdown_initiated_ = true;
00158       this->work_available_.signal();
00159     }
00160 
00161     if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self()))
00162       this->wait();
00163 
00164     return 0;
00165   }

template<typename T>
virtual void OpenDDS::DCPS::QueueTaskBase< T >::execute ( T &  req  )  [pure virtual]

The subclass should implement this function to handle the dequeued request.

Implemented in OpenDDS::DCPS::TcpReconnectTask.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::open ( void *  = 0  )  [inline, virtual]

Activate the worker threads.

Definition at line 74 of file QueueTaskBase_T.h.

00074                               {
00075     DBG_ENTRY("QueueTaskBase","open");
00076 
00077     GuardType guard(this->lock_);
00078 
00079     // We can assume that we are in the proper state to handle this open()
00080     // call as long as we haven't been open()'ed before.
00081     if (this->opened_) {
00082       ACE_ERROR_RETURN((LM_ERROR,
00083                         "(%P|%t) QueueTaskBase failed to open.  "
00084                         "Task has previously been open()'ed.\n"),
00085                        -1);
00086     }
00087 
00088     // Activate this task object with one worker thread.
00089     if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
00090       // Assumes that when activate returns non-zero return code that
00091       // no threads were activated.
00092       ACE_ERROR_RETURN((LM_ERROR,
00093                         "(%P|%t) QueueTaskBase failed to activate "
00094                         "the worker threads.\n"),
00095                        -1);
00096     }
00097 
00098     // Now we have past the point where we can say we've been open()'ed before.
00099     this->opened_ = true;
00100 
00101     return 0;
00102   }

template<typename T>
virtual int OpenDDS::DCPS::QueueTaskBase< T >::svc (  )  [inline, virtual]

The "mainline" executed by the worker thread.

Definition at line 105 of file QueueTaskBase_T.h.

00105                     {
00106     DBG_ENTRY("QueueTaskBase","svc");
00107 
00108     this->thr_id_ = ACE_OS::thr_self();
00109 
00110     // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00111     while (!this->shutdown_initiated_) {
00112       T req;
00113       {
00114         GuardType guard(this->lock_);
00115 
00116         if (this->queue_.is_empty()) {
00117           this->work_available_.wait();
00118         }
00119 
00120         if (this->shutdown_initiated_)
00121           break;
00122 
00123         int result = queue_.dequeue_head(req);
00124 
00125         if (result != 0) {
00126           //I'm not sure why this thread got more signals than actual signals
00127           //when using thread_per_connection and the user application thread
00128           //send requests without interval. We just need ignore the dequeue
00129           //failure.
00130           //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::svc  %p\n",
00131           //  ACE_TEXT("dequeue_head")));
00132           continue;
00133         }
00134       }
00135 
00136       this->execute(req);
00137     }
00138 
00139     // This will never get executed.
00140     return 0;
00141   }


Member Data Documentation

template<typename T>
LockType OpenDDS::DCPS::QueueTaskBase< T >::lock_ [private]

Lock to protect the "state" (all of the data members) of this object.

Definition at line 180 of file QueueTaskBase_T.h.

template<typename T>
bool OpenDDS::DCPS::QueueTaskBase< T >::opened_ [private]

Flag used to avoid multiple open() calls.

Definition at line 195 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::open().

template<typename T>
Queue OpenDDS::DCPS::QueueTaskBase< T >::queue_ [private]

The request queue.

Definition at line 183 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::add(), and OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().

template<typename T>
bool OpenDDS::DCPS::QueueTaskBase< T >::shutdown_initiated_ [private]

Flag used to initiate a shutdown request to all worker threads.

Definition at line 192 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::close().

template<typename T>
ACE_thread_t OpenDDS::DCPS::QueueTaskBase< T >::thr_id_ [private]

The id of the thread created by this task.

Definition at line 198 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().

template<typename T>
ConditionType OpenDDS::DCPS::QueueTaskBase< T >::work_available_ [private]

Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.

Definition at line 189 of file QueueTaskBase_T.h.

Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::add(), and OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::close().


The documentation for this class was generated from the following file:
Generated on Fri Feb 12 20:06:28 2016 for OpenDDS by  doxygen 1.4.7