QueueTaskBase_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_QUEUE_TASK_BASE_T_H
00009 #define OPENDDS_DCPS_QUEUE_TASK_BASE_T_H
00010 
00011 #include /**/ "ace/pre.h"
00012 
00013 #include "EntryExit.h"
00014 
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif /* ACE_LACKS_PRAGMA_ONCE */
00018 
00019 #include "ace/Task.h"
00020 #include "ace/Synch.h"
00021 #include "ace/Unbounded_Queue.h"
00022 #include "ace/INET_Addr.h"
00023 #include "ace/Condition_T.h"
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 /**
00029  * @class QueueTaskBase
00030  *
00031  * @brief A simple ACE task that manages a queue of request.
00032  */
00033 template <typename T>
00034 class QueueTaskBase : public ACE_Task_Base {
00035 public:
00036 
00037   /// Constructor.
00038   QueueTaskBase()
00039   : work_available_(lock_),
00040       shutdown_initiated_(false),
00041       opened_(false),
00042       thr_id_(ACE_OS::NULL_thread) {
00043     DBG_ENTRY("QueueTaskBase","QueueTaskBase");
00044   }
00045 
00046   /// Virtual Destructor.
00047   virtual ~QueueTaskBase() {
00048     DBG_ENTRY("QueueTaskBase","~QueueTaskBase");
00049   }
00050 
00051   /// Put the request to the request queue.
00052   /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this
00053   /// task is shutdown).
00054   int add(const T& req) {
00055     DBG_ENTRY("QueueTaskBase","add");
00056     GuardType guard(this->lock_);
00057 
00058     if (this->shutdown_initiated_)
00059       return -1;
00060 
00061     int result = this->queue_.enqueue_tail(req);
00062 
00063     if (result == 0) {
00064       this->work_available_.signal();
00065 
00066     } else
00067       ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::add %p\n",
00068        ACE_TEXT("enqueue_tail")));
00069 
00070     return result;
00071   }
00072 
00073   /// Activate the worker threads
00074   virtual int open(void* = 0) {
00075     DBG_ENTRY("QueueTaskBase","open");
00076 
00077     GuardType guard(this->lock_);
00078 
00079     // We can assume that we are in the proper state to handle this open()
00080     // call as long as we haven't been open()'ed before.
00081     if (this->opened_) {
00082       ACE_ERROR_RETURN((LM_ERROR,
00083                         "(%P|%t) QueueTaskBase failed to open.  "
00084                         "Task has previously been open()'ed.\n"),
00085                        -1);
00086     }
00087 
00088     // Activate this task object with one worker thread.
00089     if (this->activate(THR_NEW_LWP | THR_JOINABLE, 1) != 0) {
00090       // Assumes that when activate returns non-zero return code that
00091       // no threads were activated.
00092       ACE_ERROR_RETURN((LM_ERROR,
00093                         "(%P|%t) QueueTaskBase failed to activate "
00094                         "the worker threads.\n"),
00095                        -1);
00096     }
00097 
00098     // Now we have past the point where we can say we've been open()'ed before.
00099     this->opened_ = true;
00100 
00101     return 0;
00102   }
00103 
00104   /// The "mainline" executed by the worker thread.
00105   virtual int svc() {
00106     DBG_ENTRY("QueueTaskBase","svc");
00107 
00108     this->thr_id_ = ACE_OS::thr_self();
00109 
00110     // Start the "GetWork-And-PerformWork" loop for the current worker thread.
00111     while (!this->shutdown_initiated_) {
00112       T req;
00113       {
00114         GuardType guard(this->lock_);
00115 
00116         if (this->queue_.is_empty()) {
00117           this->work_available_.wait();
00118         }
00119 
00120         if (this->shutdown_initiated_)
00121           break;
00122 
00123         int result = queue_.dequeue_head(req);
00124 
00125         if (result != 0) {
00126           //I'm not sure why this thread got more signals than actual signals
00127           //when using thread_per_connection and the user application thread
00128           //send requests without interval. We just need ignore the dequeue
00129           //failure.
00130           //ACE_ERROR ((LM_ERROR, "(%P|%t) ERROR: QueueTaskBase::svc  %p\n",
00131           //  ACE_TEXT("dequeue_head")));
00132           continue;
00133         }
00134       }
00135 
00136       this->execute(req);
00137     }
00138 
00139     // This will never get executed.
00140     return 0;
00141   }
00142 
00143   /// Called when the thread exits.
00144   virtual int close(u_long flag = 0) {
00145     DBG_ENTRY("QueueTaskBase","close");
00146 
00147     if (flag == 0)
00148       return 0;
00149 
00150     {
00151       GuardType guard(this->lock_);
00152 
00153       if (this->shutdown_initiated_)
00154         return 0;
00155 
00156       // Set the shutdown flag to true.
00157       this->shutdown_initiated_ = true;
00158       this->work_available_.signal();
00159     }
00160 
00161     if (this->opened_ && !ACE_OS::thr_equal(this->thr_id_, ACE_OS::thr_self()))
00162       this->wait();
00163 
00164     return 0;
00165   }
00166 
00167   /// The subclass should implement this function to handle the
00168   /// dequeued request.
00169   virtual void execute(T& req) = 0;
00170 
00171 private:
00172 
00173   typedef ACE_SYNCH_MUTEX         LockType;
00174   typedef ACE_Guard<LockType>     GuardType;
00175   typedef ACE_Condition<LockType> ConditionType;
00176 
00177   typedef ACE_Unbounded_Queue<T>  Queue;
00178 
00179   /// Lock to protect the "state" (all of the data members) of this object.
00180   LockType lock_;
00181 
00182   /// The request queue.
00183   Queue queue_;
00184 
00185   /// Condition used to signal the worker threads that they may be able to
00186   /// find a request in the queue_ that needs to be executed.
00187   /// This condition will be signal()'ed each time a request is
00188   /// added to the queue_, and also when this task is shutdown.
00189   ConditionType work_available_;
00190 
00191   /// Flag used to initiate a shutdown request to all worker threads.
00192   bool shutdown_initiated_;
00193 
00194   /// Flag used to avoid multiple open() calls.
00195   bool opened_;
00196 
00197   /// The id of the thread created by this task.
00198   ACE_thread_t thr_id_;
00199 };
00200 
00201 } // namespace DCPS
00202 } // namespace OpenDDS
00203 
00204 #include /**/ "ace/post.h"
00205 
00206 #endif /* OPENDDS_DCPS_QUEUE_TASK_BASE_T_H */

Generated on Fri Feb 12 20:05:25 2016 for OpenDDS by  doxygen 1.4.7