00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H 00009 #define OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/Definitions.h" 00013 #include "dds/DCPS/GuidUtils.h" 00014 #include "dds/DCPS/PoolAllocationBase.h" 00015 #include "dds/DCPS/SequenceNumber.h" 00016 00017 #include <utility> 00018 00019 ACE_BEGIN_VERSIONED_NAMESPACE_DECL 00020 class ACE_Message_Block; 00021 ACE_END_VERSIONED_NAMESPACE_DECL 00022 00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00024 00025 namespace OpenDDS { 00026 namespace DCPS { 00027 00028 class DataSampleElement; 00029 00030 class TransportQueueElement; 00031 typedef std::pair<TransportQueueElement*, TransportQueueElement*> ElementPair; 00032 00033 /** 00034 * @class TransportQueueElement 00035 * 00036 * @brief Base wrapper class around a data/control sample to be sent. 00037 * 00038 * This class serves as the base class for different types of samples 00039 * that can be sent. For example, there are data samples and control 00040 * samples. A subclass of TransportQueueElement exists for each of 00041 * these types of samples. 00042 * 00043 * This class maintains a counter that, when decremented to 0, will 00044 * trigger some logic (defined in the subclass) that will "return 00045 * the loan" of the sample. The sample is "loaned" to the transport 00046 * via a send() or send_control() call on the TransportClient. 00047 * This wrapper object will "return the loan" when all DataLinks have 00048 * "returned" their sub-loans. 00049 */ 00050 class OpenDDS_Dcps_Export TransportQueueElement : public PoolAllocationBase { 00051 public: 00052 00053 virtual ~TransportQueueElement(); 00054 00055 class OpenDDS_Dcps_Export MatchCriteria { 00056 protected: 00057 virtual ~MatchCriteria(); 00058 MatchCriteria() {} 00059 public: 00060 virtual bool matches(const TransportQueueElement& candidate) const = 0; 00061 virtual bool unique() const = 0; // (only expect to match 1 element) 00062 private: // and unimplemented... 00063 MatchCriteria(const MatchCriteria&); 00064 MatchCriteria& operator=(const MatchCriteria&); 00065 }; 00066 00067 class OpenDDS_Dcps_Export MatchOnPubId : public MatchCriteria { 00068 public: 00069 explicit MatchOnPubId(const RepoId& id) : pub_id_(id) {} 00070 virtual ~MatchOnPubId(); 00071 virtual bool matches(const TransportQueueElement& candidate) const; 00072 virtual bool unique() const { return false; } 00073 private: 00074 RepoId pub_id_; 00075 }; 00076 00077 class OpenDDS_Dcps_Export MatchOnDataPayload : public MatchCriteria { 00078 public: 00079 explicit MatchOnDataPayload(const char* data) : data_(data) {} 00080 virtual ~MatchOnDataPayload(); 00081 virtual bool matches(const TransportQueueElement& candidate) const; 00082 virtual bool unique() const { return true; } 00083 private: 00084 const char* data_; 00085 }; 00086 00087 /// Invoked when the sample is dropped from a DataLink due to a 00088 /// remove_sample() call. 00089 /// The dropped_by_transport flag true indicates the data dropping is initiated 00090 /// by transport when the transport send strategy is in a MODE_TERMINATED. 00091 /// The dropped_by_transport flag false indicates the dropping is initiated 00092 /// by the remove_sample and data_dropped() is a result of remove_sample(). 00093 /// The return value indicates if this element is released. 00094 bool data_dropped(bool dropped_by_transport = false); 00095 00096 /// Invoked when the sample has been sent by a DataLink. 00097 /// The return value indicates if this element is released. 00098 bool data_delivered(); 00099 00100 /// Does the sample require an exclusive transport packet? 00101 virtual bool requires_exclusive_packet() const; 00102 00103 /// Accessor for the publication id that sent the sample. 00104 virtual RepoId publication_id() const = 0; 00105 00106 /// Accessor for the subscription id, if sent the sample is sent to 1 sub 00107 virtual RepoId subscription_id() const { 00108 return GUID_UNKNOWN; 00109 } 00110 00111 virtual SequenceNumber sequence() const { 00112 return SequenceNumber::SEQUENCENUMBER_UNKNOWN(); 00113 } 00114 00115 /// The marshalled sample (sample header + sample data) 00116 virtual const ACE_Message_Block* msg() const = 0; 00117 00118 /// The marshalled payload only (sample data) 00119 virtual const ACE_Message_Block* msg_payload() const = 0; 00120 00121 /// Is the element a "control" sample from the specified pub_id? 00122 virtual bool is_control(RepoId pub_id) const; 00123 00124 /// Is the listener get called ? 00125 bool released() const; 00126 void released(bool flag); 00127 00128 /// Clone method with provided message block allocator and data block 00129 /// allocators. 00130 static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg, 00131 MessageBlockAllocator* mb_allocator, 00132 DataBlockAllocator* db_allocator); 00133 00134 /// Is the sample created by the transport? 00135 virtual bool owned_by_transport() = 0; 00136 00137 /// Create two TransportQueueElements representing the same data payload 00138 /// as the current TransportQueueElement, with the first one (including its 00139 /// DataSampleHeader) fitting in "size" bytes. This method leaves the 00140 /// current TransportQueueElement alone (but can't be made const because 00141 /// the newly-created elements will need to invoke non-const methods on it). 00142 /// Each element in the pair will contain its own serialized modified 00143 /// DataSampleHeader. 00144 virtual ElementPair fragment(size_t size); 00145 00146 /// Is this QueueElement the result of fragmentation? 00147 virtual bool is_fragment() const { return false; } 00148 00149 virtual bool is_request_ack() const { return false; } 00150 00151 protected: 00152 00153 /// Ctor. The initial_count is the number of DataLinks to which 00154 /// this TransportQueueElement will be sent. 00155 explicit TransportQueueElement(unsigned long initial_count); 00156 00157 /// Invoked when the counter reaches 0. 00158 virtual void release_element(bool dropped_by_transport) = 0; 00159 00160 /// May be used by subclass' implementation of release_element() 00161 /// to determine if any DataLinks dropped the data instead of 00162 /// delivering it. 00163 bool was_dropped() const; 00164 00165 private: 00166 00167 /// Common logic for data_dropped() and data_delivered(). 00168 bool decision_made(bool dropped_by_transport); 00169 friend class TransportCustomizedElement; 00170 00171 /// Counts the number of outstanding sub-loans. 00172 ACE_Atomic_Op<ACE_Thread_Mutex, unsigned long> sub_loan_count_; 00173 00174 /// Flag flipped to true if any DataLink dropped the sample. 00175 bool dropped_; 00176 00177 /// If the callback to DW is made. 00178 bool released_; 00179 }; 00180 00181 } // namespace DCPS 00182 } // namespace OpenDDS 00183 00184 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00185 00186 #if defined(__ACE_INLINE__) 00187 #include "TransportQueueElement.inl" 00188 #endif /* __ACE_INLINE__ */ 00189 00190 #endif /* OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H */