00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H 00009 #define OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H 00010 00011 #include /**/ "ace/pre.h" 00012 00013 #include "dds/DCPS/dcps_export.h" 00014 #include "dds/DCPS/PoolAllocationBase.h" 00015 #include "BasicQueue_T.h" 00016 #include "TransportDefs.h" 00017 00018 #include "ace/Condition_T.h" 00019 #include "ace/Synch_Traits.h" 00020 #include "ace/Task.h" 00021 00022 #if !defined (ACE_LACKS_PRAGMA_ONCE) 00023 # pragma once 00024 #endif /* ACE_LACKS_PRAGMA_ONCE */ 00025 00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00027 00028 namespace OpenDDS { 00029 namespace DCPS { 00030 00031 class DataLink; 00032 class TransportQueueElement; 00033 class DataSampleElement; 00034 class TransportSendElement; 00035 00036 00037 enum SendStrategyOpType { 00038 SEND_START, 00039 SEND, 00040 SEND_STOP, 00041 REMOVE_SAMPLE, 00042 REMOVE_ALL_CONTROL_SAMPLES 00043 }; 00044 00045 struct SendRequest : public PoolAllocationBase { 00046 SendStrategyOpType op_; 00047 TransportQueueElement* element_; 00048 }; 00049 00050 /** 00051 * @class ThreadPerConnectionSendTask 00052 * 00053 * @brief Execute the requests of sending a sample or control message. 00054 * 00055 * This task implements the request execute method which handles each step 00056 * of sending a sample or control message. 00057 */ 00058 class OpenDDS_Dcps_Export ThreadPerConnectionSendTask : public ACE_Task_Base { 00059 public: 00060 ThreadPerConnectionSendTask(DataLink* link); 00061 00062 virtual ~ThreadPerConnectionSendTask(); 00063 00064 /// Put the request to the request queue. 00065 /// Returns 0 if successful, -1 otherwise (it has been "rejected" or this 00066 /// task is shutdown). 00067 int add_request(SendStrategyOpType op, TransportQueueElement* element = 0); 00068 00069 /// Activate the worker threads 00070 virtual int open(void* = 0); 00071 00072 /// The "mainline" executed by the worker thread. 00073 virtual int svc(); 00074 00075 /// Called when the thread exits. 00076 virtual int close(u_long flag = 0); 00077 00078 /// Remove sample from the thread per connection queue. 00079 RemoveResult remove_sample(const DataSampleElement* element); 00080 00081 private: 00082 00083 /// Handle the request. 00084 virtual void execute(SendRequest& req); 00085 00086 typedef ACE_SYNCH_MUTEX LockType; 00087 typedef ACE_Guard<LockType> GuardType; 00088 typedef ACE_Condition<LockType> ConditionType; 00089 00090 typedef BasicQueue<SendRequest> QueueType; 00091 00092 /// Lock to protect the "state" (all of the data members) of this object. 00093 LockType lock_; 00094 00095 /// The request queue. 00096 QueueType queue_; 00097 00098 /// Condition used to signal the worker threads that they may be able to 00099 /// find a request in the queue_ that needs to be executed. 00100 /// This condition will be signal()'ed each time a request is 00101 /// added to the queue_, and also when this task is shutdown. 00102 ConditionType work_available_; 00103 00104 /// Flag used to initiate a shutdown request to all worker threads. 00105 bool shutdown_initiated_; 00106 00107 /// Flag used to avoid multiple open() calls. 00108 bool opened_; 00109 00110 /// The id of the thread created by this task. 00111 ACE_thread_t thr_id_; 00112 00113 /// The datalink to send the samples or control messages. 00114 DataLink* link_; 00115 }; 00116 00117 } // namespace DCPS 00118 } // namespace OpenDDS 00119 00120 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00121 00122 #include /**/ "ace/post.h" 00123 00124 #endif /* OPENDDS_DCPS_THREADPERCONNECTIONSENDER_H */