00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_QUEUE_TASK_BASE_T_H
00009 #define OPENDDS_DCPS_QUEUE_TASK_BASE_T_H
00010
00011 #include "ace/pre.h"
00012
00013 #include "EntryExit.h"
00014
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif
00018
00019 #include "ace/Task.h"
00020 #include "ace/Synch.h"
00021 #include "ace/Unbounded_Queue.h"
00022 #include "ace/INET_Addr.h"
00023 #include "ace/Condition_T.h"
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028
00029
00030
00031
00032
00033 template <typename T>
00034 class QueueTaskBase : public ACE_Task_Base {
00035 public:
00036
00037
00038 QueueTaskBase()
00039 : work_available_(lock_),
00040 shutdown_initiated_(false),
00041 opened_(false),
00042 thr_id_(ACE_OS::NULL_thread) {
00043 DBG_ENTRY("QueueTaskBase","QueueTaskBase");
00044 }
00045
00046
00047 virtual ~QueueTaskBase() {
00048 DBG_ENTRY("QueueTaskBase","~QueueTaskBase");
00049 }
00050
00051
00052
00053
00054 int add(const T& req) {
00055 DBG_ENTRY("QueueTaskBase","add");
00056 GuardType guard(this->lock_);
00057
00058 if (this->shutdown_initiated_)
00059 return -1;
00060
00061 int result = this->queue_.enqueue_tail(req);
00062
00063 if (result == 0) {
00064 this->work_available_.signal();
00065
00066 } else
00067 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n",
00068 ACE_TEXT("enqueue_tail")));
00069
00070 return result;
00071 }
00072
00073
00074 virtual int open(void* = 0) {
00075 DBG_ENTRY("QueueTaskBase","open");
00076
00077 GuardType guard(this->lock_);
00078
00079
00080
00081 if (this->opened_) {
00082 ACE_ERROR_RETURN((LM_ERROR,
00083 "(%P|%t) QueueTaskBase failed to open. "
00084 "Task has previously been open()'ed.\n"),
00085 -1);
00086 }
00087
00088
00089 if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
00090
00091
00092 ACE_ERROR_RETURN((LM_ERROR,
00093 "(%P|%t) QueueTaskBase failed to activate "
00094 "the worker threads.\n"),
00095 -1);
00096 }
00097
00098
00099 this->opened_ = true;
00100
00101 return 0;
00102 }
00103
00104
00105 virtual int svc() {
00106 DBG_ENTRY("QueueTaskBase","svc");
00107
00108 this->thr_id_ = ACE_OS::thr_self();
00109
00110
00111 while (!this->shutdown_initiated_) {
00112 T req;
00113 {
00114 GuardType guard(this->lock_);
00115
00116 if (this->queue_.is_empty()) {
00117 this->work_available_.wait();
00118 }
00119
00120 if (this->shutdown_initiated_)
00121 break;
00122
00123 int result = queue_.dequeue_head(req);
00124
00125 if (result != 0) {
00126
00127
00128
00129
00130
00131
00132 continue;
00133 }
00134 }
00135
00136 this->execute(req);
00137 }
00138
00139
00140 return 0;
00141 }
00142
00143
00144 virtual int close(u_long flag = 0) {
00145 DBG_ENTRY("QueueTaskBase","close");
00146
00147 if (flag == 0)
00148 return 0;
00149
00150 {
00151 GuardType guard(this->lock_);
00152
00153 if (this->shutdown_initiated_)
00154 return 0;
00155
00156
00157 this->shutdown_initiated_ = true;
00158 this->work_available_.signal();
00159 }
00160
00161 if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self()))
00162 this->wait();
00163
00164 return 0;
00165 }
00166
00167
00168
00169 virtual void execute(T& req) = 0;
00170
00171 private:
00172
00173 typedef ACE_SYNCH_MUTEX LockType;
00174 typedef ACE_Guard<LockType> GuardType;
00175 typedef ACE_Condition<LockType> ConditionType;
00176
00177 typedef ACE_Unbounded_Queue<T> Queue;
00178
00179
00180 LockType lock_;
00181
00182
00183 Queue queue_;
00184
00185
00186
00187
00188
00189 ConditionType work_available_;
00190
00191
00192 bool shutdown_initiated_;
00193
00194
00195 bool opened_;
00196
00197
00198 ACE_thread_t thr_id_;
00199 };
00200
00201 }
00202 }
00203
00204 #include "ace/post.h"
00205
00206 #endif