ThreadPerConnectionSendTask.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H
00009 #define OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H
00010 
00011 #include /**/ "ace/pre.h"
00012 
00013 #include "dds/DCPS/dcps_export.h"
00014 #include "dds/DCPS/PoolAllocationBase.h"
00015 #include "BasicQueue_T.h"
00016 #include "TransportDefs.h"
00017 
00018 #include "ace/Condition_T.h"
00019 #include "ace/Synch_Traits.h"
00020 #include "ace/Task.h"
00021 
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 # pragma once
00024 #endif /* ACE_LACKS_PRAGMA_ONCE */
00025 
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 class DataLink;
00032 class TransportQueueElement;
00033 class DataSampleElement;
00034 class TransportSendElement;
00035 
00036 
00037 enum SendStrategyOpType {
00038   SEND_START,
00039   SEND,
00040   SEND_STOP,
00041   REMOVE_SAMPLE,
00042   REMOVE_ALL_CONTROL_SAMPLES
00043 };
00044 
00045 struct SendRequest : public PoolAllocationBase {
00046   SendStrategyOpType op_;
00047   TransportQueueElement* element_;
00048 };
00049 
00050 /**
00051  * @class ThreadPerConnectionSendTask
00052  *
00053  * @brief Execute the requests of sending a sample or control message.
00054  *
00055  *  This task implements the request execute method which handles each step
00056  *  of sending a sample or control message.
00057  */
00058 class OpenDDS_Dcps_Export ThreadPerConnectionSendTask : public ACE_Task_Base {
00059 public:
00060   ThreadPerConnectionSendTask(DataLink* link);
00061 
00062   virtual ~ThreadPerConnectionSendTask();
00063 
00064   /// Put the request to the request queue.
00065   /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this
00066   /// task is shutdown).
00067   int add_request(SendStrategyOpType op, TransportQueueElement* element = 0);
00068 
00069   /// Activate the worker threads
00070   virtual int open(void* = 0);
00071 
00072   /// The "mainline" executed by the worker thread.
00073   virtual int svc();
00074 
00075   /// Called when the thread exits.
00076   virtual int close(u_long flag = 0);
00077 
00078   /// Remove sample from the thread per connection queue.
00079   RemoveResult remove_sample(const DataSampleElement* element);
00080 
00081 private:
00082 
00083   /// Handle the request.
00084   virtual void execute(SendRequest& req);
00085 
00086   typedef ACE_SYNCH_MUTEX         LockType;
00087   typedef ACE_Guard<LockType>     GuardType;
00088   typedef ACE_Condition<LockType> ConditionType;
00089 
00090   typedef BasicQueue<SendRequest> QueueType;
00091 
00092   /// Lock to protect the "state" (all of the data members) of this object.
00093   LockType lock_;
00094 
00095   /// The request queue.
00096   QueueType queue_;
00097 
00098   /// Condition used to signal the worker threads that they may be able to
00099   /// find a request in the queue_ that needs to be executed.
00100   /// This condition will be signal()'ed each time a request is
00101   /// added to the queue_, and also when this task is shutdown.
00102   ConditionType work_available_;
00103 
00104   /// Flag used to initiate a shutdown request to all worker threads.
00105   bool shutdown_initiated_;
00106 
00107   /// Flag used to avoid multiple open() calls.
00108   bool opened_;
00109 
00110   /// The id of the thread created by this task.
00111   ACE_thread_t thr_id_;
00112 
00113   /// The datalink to send the samples or control messages.
00114   DataLink* link_;
00115 };
00116 
00117 } // namespace DCPS
00118 } // namespace OpenDDS
00119 
00120 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00121 
00122 #include /**/ "ace/post.h"
00123 
00124 #endif /* OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1