#include <QueueTaskBase_T.h>
Inheritance diagram for OpenDDS::DCPS::QueueTaskBase< T >:
Public Member Functions | |
QueueTaskBase () | |
Constructor. | |
virtual | ~QueueTaskBase () |
Virtual Destructor. | |
int | add (const T &req) |
virtual int | open (void *=0) |
Activate the worker threads. | |
virtual int | svc () |
The "mainline" executed by the worker thread. | |
virtual int | close (u_long flag=0) |
Called when the thread exits. | |
virtual void | execute (T &req)=0 |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef ACE_Condition< LockType > | ConditionType |
typedef ACE_Unbounded_Queue< T > | Queue |
Private Attributes | |
LockType | lock_ |
Lock to protect the "state" (all of the data members) of this object. | |
Queue | queue_ |
The request queue. | |
ConditionType | work_available_ |
bool | shutdown_initiated_ |
Flag used to initiate a shutdown request to all worker threads. | |
bool | opened_ |
Flag used to avoid multiple open() calls. | |
ACE_thread_t | thr_id_ |
The id of the thread created by this task. |
Definition at line 34 of file QueueTaskBase_T.h.
typedef ACE_Condition<LockType> OpenDDS::DCPS::QueueTaskBase< T >::ConditionType [private] |
Definition at line 175 of file QueueTaskBase_T.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::QueueTaskBase< T >::GuardType [private] |
Definition at line 174 of file QueueTaskBase_T.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::QueueTaskBase< T >::LockType [private] |
Definition at line 173 of file QueueTaskBase_T.h.
typedef ACE_Unbounded_Queue<T> OpenDDS::DCPS::QueueTaskBase< T >::Queue [private] |
Definition at line 177 of file QueueTaskBase_T.h.
OpenDDS::DCPS::QueueTaskBase< T >::QueueTaskBase | ( | ) | [inline] |
Constructor.
Definition at line 38 of file QueueTaskBase_T.h.
00039 : work_available_(lock_), 00040 shutdown_initiated_(false), 00041 opened_(false), 00042 thr_id_(ACE_OS::NULL_thread) { 00043 DBG_ENTRY("QueueTaskBase","QueueTaskBase"); 00044 }
virtual OpenDDS::DCPS::QueueTaskBase< T >::~QueueTaskBase | ( | ) | [inline, virtual] |
Virtual Destructor.
Definition at line 47 of file QueueTaskBase_T.h.
00047 { 00048 DBG_ENTRY("QueueTaskBase","~QueueTaskBase"); 00049 }
int OpenDDS::DCPS::QueueTaskBase< T >::add | ( | const T & | req | ) | [inline] |
Put the request to the request queue. Returns 0 if successful, -1 otherwise (it has been "rejected" or this task is shutdown).
Definition at line 54 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::TcpConnection::handle_close(), OpenDDS::DCPS::TcpTransport::passive_connection(), OpenDDS::DCPS::TransportImpl::release_link_resources(), and OpenDDS::DCPS::TcpConnection::relink_from_send().
00054 { 00055 DBG_ENTRY("QueueTaskBase","add"); 00056 GuardType guard(this->lock_); 00057 00058 if (this->shutdown_initiated_) 00059 return -1; 00060 00061 int result = this->queue_.enqueue_tail(req); 00062 00063 if (result == 0) { 00064 this->work_available_.signal(); 00065 00066 } else 00067 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n", 00068 ACE_TEXT("enqueue_tail"))); 00069 00070 return result; 00071 }
virtual int OpenDDS::DCPS::QueueTaskBase< T >::close | ( | u_long | flag = 0 |
) | [inline, virtual] |
Called when the thread exits.
Definition at line 144 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::TransportImpl::shutdown(), OpenDDS::DCPS::TcpConnection::shutdown(), OpenDDS::DCPS::TcpTransport::shutdown_i(), OpenDDS::DCPS::TcpConnection::transfer(), OpenDDS::DCPS::TcpConnection::~TcpConnection(), and OpenDDS::DCPS::TcpTransport::~TcpTransport().
00144 { 00145 DBG_ENTRY("QueueTaskBase","close"); 00146 00147 if (flag == 0) 00148 return 0; 00149 00150 { 00151 GuardType guard(this->lock_); 00152 00153 if (this->shutdown_initiated_) 00154 return 0; 00155 00156 // Set the shutdown flag to true. 00157 this->shutdown_initiated_ = true; 00158 this->work_available_.signal(); 00159 } 00160 00161 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self())) 00162 this->wait(); 00163 00164 return 0; 00165 }
virtual void OpenDDS::DCPS::QueueTaskBase< T >::execute | ( | T & | req | ) | [pure virtual] |
The subclass should implement this function to handle the dequeued request.
Implemented in OpenDDS::DCPS::TcpReconnectTask.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().
virtual int OpenDDS::DCPS::QueueTaskBase< T >::open | ( | void * | = 0 |
) | [inline, virtual] |
Activate the worker threads.
Definition at line 74 of file QueueTaskBase_T.h.
00074 { 00075 DBG_ENTRY("QueueTaskBase","open"); 00076 00077 GuardType guard(this->lock_); 00078 00079 // We can assume that we are in the proper state to handle this open() 00080 // call as long as we haven't been open()'ed before. 00081 if (this->opened_) { 00082 ACE_ERROR_RETURN((LM_ERROR, 00083 "(%P|%t) QueueTaskBase failed to open. " 00084 "Task has previously been open()'ed.\n"), 00085 -1); 00086 } 00087 00088 // Activate this task object with one worker thread. 00089 if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) { 00090 // Assumes that when activate returns non-zero return code that 00091 // no threads were activated. 00092 ACE_ERROR_RETURN((LM_ERROR, 00093 "(%P|%t) QueueTaskBase failed to activate " 00094 "the worker threads.\n"), 00095 -1); 00096 } 00097 00098 // Now we have past the point where we can say we've been open()'ed before. 00099 this->opened_ = true; 00100 00101 return 0; 00102 }
virtual int OpenDDS::DCPS::QueueTaskBase< T >::svc | ( | ) | [inline, virtual] |
The "mainline" executed by the worker thread.
Definition at line 105 of file QueueTaskBase_T.h.
00105 { 00106 DBG_ENTRY("QueueTaskBase","svc"); 00107 00108 this->thr_id_ = ACE_OS::thr_self(); 00109 00110 // Start the "GetWork-And-PerformWork" loop for the current worker thread. 00111 while (!this->shutdown_initiated_) { 00112 T req; 00113 { 00114 GuardType guard(this->lock_); 00115 00116 if (this->queue_.is_empty()) { 00117 this->work_available_.wait(); 00118 } 00119 00120 if (this->shutdown_initiated_) 00121 break; 00122 00123 int result = queue_.dequeue_head(req); 00124 00125 if (result != 0) { 00126 //I'm not sure why this thread got more signals than actual signals 00127 //when using thread_per_connection and the user application thread 00128 //send requests without interval. We just need ignore the dequeue 00129 //failure. 00130 //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::svc %p\n", 00131 // ACE_TEXT("dequeue_head"))); 00132 continue; 00133 } 00134 } 00135 00136 this->execute(req); 00137 } 00138 00139 // This will never get executed. 00140 return 0; 00141 }
LockType OpenDDS::DCPS::QueueTaskBase< T >::lock_ [private] |
Lock to protect the "state" (all of the data members) of this object.
Definition at line 180 of file QueueTaskBase_T.h.
bool OpenDDS::DCPS::QueueTaskBase< T >::opened_ [private] |
Flag used to avoid multiple open() calls.
Definition at line 195 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::open().
Queue OpenDDS::DCPS::QueueTaskBase< T >::queue_ [private] |
The request queue.
Definition at line 183 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::add(), and OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().
bool OpenDDS::DCPS::QueueTaskBase< T >::shutdown_initiated_ [private] |
Flag used to initiate a shutdown request to all worker threads.
Definition at line 192 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::close().
ACE_thread_t OpenDDS::DCPS::QueueTaskBase< T >::thr_id_ [private] |
The id of the thread created by this task.
Definition at line 198 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::svc().
ConditionType OpenDDS::DCPS::QueueTaskBase< T >::work_available_ [private] |
Condition used to signal the worker threads that they may be able to find a request in the queue_ that needs to be executed. This condition will be signal()'ed each time a request is added to the queue_, and also when this task is shutdown.
Definition at line 189 of file QueueTaskBase_T.h.
Referenced by OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::add(), and OpenDDS::DCPS::QueueTaskBase< ReconnectOpType >::close().