00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H 00009 #define OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H 00010 00011 #include /**/ "ace/pre.h" 00012 00013 #include "dds/DCPS/dcps_export.h" 00014 #include "BasicQueue_T.h" 00015 #include "TransportDefs.h" 00016 #include "ace/Task.h" 00017 #include "ace/Synch.h" 00018 #include "ace/Condition_T.h" 00019 #include "dds/DCPS/PoolAllocationBase.h" 00020 00021 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00022 # pragma once 00023 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00024 00025 namespace OpenDDS { 00026 namespace DCPS { 00027 00028 class DataLink; 00029 class TransportQueueElement; 00030 class DataSampleElement; 00031 class TransportSendElement; 00032 00033 00034 enum SendStrategyOpType { 00035 SEND_START, 00036 SEND, 00037 SEND_STOP, 00038 REMOVE_SAMPLE, 00039 REMOVE_ALL_CONTROL_SAMPLES 00040 }; 00041 00042 struct SendRequest : public PoolAllocationBase { 00043 SendStrategyOpType op_; 00044 TransportQueueElement* element_; 00045 }; 00046 00047 /** 00048 * @class ThreadPerConnectionSendTask 00049 * 00050 * @brief Execute the requests of sending a sample or control message. 00051 * 00052 * This task implements the request execute method which handles each step 00053 * of sending a sample or control message. 00054 */ 00055 class OpenDDS_Dcps_Export ThreadPerConnectionSendTask : public ACE_Task_Base { 00056 public: 00057 00058 /// Constructor. 00059 ThreadPerConnectionSendTask(DataLink* link); 00060 00061 /// Virtual Destructor. 00062 virtual ~ThreadPerConnectionSendTask(); 00063 00064 /// Put the request to the request queue. 00065 /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this 00066 /// task is shutdown). 00067 int add_request(SendStrategyOpType op, TransportQueueElement* element = 0); 00068 00069 /// Activate the worker threads 00070 virtual int open(void* = 0); 00071 00072 /// The "mainline" executed by the worker thread. 00073 virtual int svc(); 00074 00075 /// Called when the thread exits. 00076 virtual int close(u_long flag = 0); 00077 00078 /// Remove sample from the thread per connection queue. 00079 RemoveResult remove_sample(const DataSampleElement* element); 00080 00081 private: 00082 00083 /// Handle the request. 00084 virtual void execute(SendRequest& req); 00085 00086 typedef ACE_SYNCH_MUTEX LockType; 00087 typedef ACE_Guard<LockType> GuardType; 00088 typedef ACE_Condition<LockType> ConditionType; 00089 00090 typedef BasicQueue<SendRequest> QueueType; 00091 00092 /// Lock to protect the "state" (all of the data members) of this object. 00093 LockType lock_; 00094 00095 /// The request queue. 00096 QueueType queue_; 00097 00098 /// Condition used to signal the worker threads that they may be able to 00099 /// find a request in the queue_ that needs to be executed. 00100 /// This condition will be signal()'ed each time a request is 00101 /// added to the queue_, and also when this task is shutdown. 00102 ConditionType work_available_; 00103 00104 /// Flag used to initiate a shutdown request to all worker threads. 00105 bool shutdown_initiated_; 00106 00107 /// Flag used to avoid multiple open() calls. 00108 bool opened_; 00109 00110 /// The id of the thread created by this task. 00111 ACE_thread_t thr_id_; 00112 00113 /// The datalink to send the samples or control messages. 00114 DataLink* link_; 00115 }; 00116 00117 } // namespace DCPS 00118 } // namespace OpenDDS 00119 00120 #include /**/ "ace/post.h" 00121 00122 #endif /* OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H */