ThreadPerConnectionSendTask.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H
00009 #define OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H
00010 
00011 #include /**/ "ace/pre.h"
00012 
00013 #include "dds/DCPS/dcps_export.h"
00014 #include "BasicQueue_T.h"
00015 #include "TransportDefs.h"
00016 #include "ace/Task.h"
00017 #include "ace/Synch.h"
00018 #include "ace/Condition_T.h"
00019 #include "dds/DCPS/PoolAllocationBase.h"
00020 
00021 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00022 # pragma once
00023 #endif /* ACE_LACKS_PRAGMA_ONCE */
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 class DataLink;
00029 class TransportQueueElement;
00030 class DataSampleElement;
00031 class TransportSendElement;
00032 
00033 
00034 enum SendStrategyOpType {
00035   SEND_START,
00036   SEND,
00037   SEND_STOP,
00038   REMOVE_SAMPLE,
00039   REMOVE_ALL_CONTROL_SAMPLES
00040 };
00041 
00042 struct SendRequest : public PoolAllocationBase {
00043   SendStrategyOpType op_;
00044   TransportQueueElement* element_;
00045 };
00046 
00047 /**
00048  * @class ThreadPerConnectionSendTask
00049  *
00050  * @brief Execute the requests of sending a sample or control message.
00051  *
00052  *  This task implements the request execute method which handles each step
00053  *  of sending a sample or control message.
00054  */
00055 class OpenDDS_Dcps_Export ThreadPerConnectionSendTask : public ACE_Task_Base {
00056 public:
00057 
00058   /// Constructor.
00059   ThreadPerConnectionSendTask(DataLink* link);
00060 
00061   /// Virtual Destructor.
00062   virtual ~ThreadPerConnectionSendTask();
00063 
00064   /// Put the request to the request queue.
00065   /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this
00066   /// task is shutdown).
00067   int add_request(SendStrategyOpType op, TransportQueueElement* element = 0);
00068 
00069   /// Activate the worker threads
00070   virtual int open(void* = 0);
00071 
00072   /// The "mainline" executed by the worker thread.
00073   virtual int svc();
00074 
00075   /// Called when the thread exits.
00076   virtual int close(u_long flag = 0);
00077 
00078   /// Remove sample from the thread per connection queue.
00079   RemoveResult remove_sample(const DataSampleElement* element);
00080 
00081 private:
00082 
00083   /// Handle the request.
00084   virtual void execute(SendRequest& req);
00085 
00086   typedef ACE_SYNCH_MUTEX         LockType;
00087   typedef ACE_Guard<LockType>     GuardType;
00088   typedef ACE_Condition<LockType> ConditionType;
00089 
00090   typedef BasicQueue<SendRequest> QueueType;
00091 
00092   /// Lock to protect the "state" (all of the data members) of this object.
00093   LockType lock_;
00094 
00095   /// The request queue.
00096   QueueType queue_;
00097 
00098   /// Condition used to signal the worker threads that they may be able to
00099   /// find a request in the queue_ that needs to be executed.
00100   /// This condition will be signal()'ed each time a request is
00101   /// added to the queue_, and also when this task is shutdown.
00102   ConditionType work_available_;
00103 
00104   /// Flag used to initiate a shutdown request to all worker threads.
00105   bool shutdown_initiated_;
00106 
00107   /// Flag used to avoid multiple open() calls.
00108   bool opened_;
00109 
00110   /// The id of the thread created by this task.
00111   ACE_thread_t thr_id_;
00112 
00113   /// The datalink to send the samples or control messages.
00114   DataLink* link_;
00115 };
00116 
00117 } // namespace DCPS
00118 } // namespace OpenDDS
00119 
00120 #include /**/ "ace/post.h"
00121 
00122 #endif /* OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7