Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTQUEUEELEMENT_H 9 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTQUEUEELEMENT_H 10 : 11 : #include "dds/DCPS/dcps_export.h" 12 : 13 : #include <dds/DCPS/Atomic.h> 14 : #include <dds/DCPS/Cached_Allocator_With_Overflow_T.h> 15 : #include <dds/DCPS/Definitions.h> 16 : #include <dds/DCPS/GuidUtils.h> 17 : #include <dds/DCPS/PoolAllocationBase.h> 18 : #include <dds/DCPS/SequenceNumber.h> 19 : 20 : #include <utility> 21 : 22 : ACE_BEGIN_VERSIONED_NAMESPACE_DECL 23 : class ACE_Message_Block; 24 : ACE_END_VERSIONED_NAMESPACE_DECL 25 : 26 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 27 : 28 : namespace OpenDDS { 29 : namespace DCPS { 30 : 31 : class TransportQueueElement; 32 : typedef std::pair<TransportQueueElement*, TransportQueueElement*> TqePair; 33 : extern OpenDDS_Dcps_Export const TqePair null_tqe_pair; 34 : typedef OPENDDS_VECTOR(TransportQueueElement*) TqeVector; 35 : 36 : /** 37 : * @class TransportQueueElement 38 : * 39 : * @brief Base wrapper class around a data/control sample to be sent. 40 : * 41 : * This class serves as the base class for different types of samples 42 : * that can be sent. For example, there are data samples and control 43 : * samples. A subclass of TransportQueueElement exists for each of 44 : * these types of samples. 45 : * 46 : * This class maintains a counter that, when decremented to 0, will 47 : * trigger some logic (defined in the subclass) that will "return 48 : * the loan" of the sample. The sample is "loaned" to the transport 49 : * via a send() or send_control() call on the TransportClient. 50 : * This wrapper object will "return the loan" when all DataLinks have 51 : * "returned" their sub-loans. 52 : */ 53 : class OpenDDS_Dcps_Export TransportQueueElement : public PoolAllocationBase { 54 : public: 55 : 56 : virtual ~TransportQueueElement(); 57 : 58 : class OpenDDS_Dcps_Export MatchCriteria { 59 : protected: 60 : virtual ~MatchCriteria(); 61 0 : MatchCriteria() {} 62 : public: 63 : virtual bool matches(const TransportQueueElement& candidate) const = 0; 64 : virtual bool unique() const = 0; // (only expect to match 1 element) 65 : private: // and unimplemented... 66 : MatchCriteria(const MatchCriteria&); 67 : MatchCriteria& operator=(const MatchCriteria&); 68 : }; 69 : 70 : class OpenDDS_Dcps_Export MatchOnPubId : public MatchCriteria { 71 : public: 72 0 : explicit MatchOnPubId(const GUID_t& id) : pub_id_(id) {} 73 : virtual ~MatchOnPubId(); 74 : virtual bool matches(const TransportQueueElement& candidate) const; 75 0 : virtual bool unique() const { return false; } 76 : private: 77 : GUID_t pub_id_; 78 : }; 79 : 80 : class OpenDDS_Dcps_Export MatchOnDataPayload : public MatchCriteria { 81 : public: 82 0 : explicit MatchOnDataPayload(const char* data) : data_(data) {} 83 : virtual ~MatchOnDataPayload(); 84 : virtual bool matches(const TransportQueueElement& candidate) const; 85 0 : virtual bool unique() const { return true; } 86 : private: 87 : const char* data_; 88 : }; 89 : 90 : class OpenDDS_Dcps_Export MatchOnElement : public MatchCriteria { 91 : public: 92 0 : explicit MatchOnElement(const TransportQueueElement* element) : element_(element) {} 93 : virtual ~MatchOnElement(); 94 : virtual bool matches(const TransportQueueElement& candidate) const; 95 0 : virtual bool unique() const { return true; } 96 : private: 97 : const TransportQueueElement* element_; 98 : }; 99 : 100 : /// Invoked when the sample is dropped from a DataLink due to a 101 : /// remove_sample() call. 102 : /// The dropped_by_transport flag true indicates the data dropping is initiated 103 : /// by transport when the transport send strategy is in a MODE_TERMINATED. 104 : /// The dropped_by_transport flag false indicates the dropping is initiated 105 : /// by the remove_sample and data_dropped() is a result of remove_sample(). 106 : /// The return value indicates if this element is released. 107 : bool data_dropped(bool dropped_by_transport = false); 108 : 109 : /// Invoked when the sample has been sent by a DataLink. 110 : /// The return value indicates if this element is released. 111 : bool data_delivered(); 112 : 113 : /// Delay releasing the element by one decision (either a data_dropped or 114 : /// data_delivered). 115 0 : void increment_loan() { ++sub_loan_count_; } 116 : 117 : /// Does the sample require an exclusive transport packet? 118 : virtual bool requires_exclusive_packet() const; 119 : 120 : /// Accessor for the publication id that sent the sample. 121 : virtual GUID_t publication_id() const = 0; 122 : 123 : /// Accessor for the subscription id, if sent the sample is sent to 1 sub 124 0 : virtual GUID_t subscription_id() const { 125 0 : return GUID_UNKNOWN; 126 : } 127 : 128 0 : virtual SequenceNumber sequence() const { 129 0 : return SequenceNumber::SEQUENCENUMBER_UNKNOWN(); 130 : } 131 : 132 : /// A reference-incremented duplicate of the marshalled sample (sample header + sample data) 133 : virtual ACE_Message_Block* duplicate_msg() const = 0; 134 : 135 : /// The marshalled sample (sample header + sample data) 136 : virtual const ACE_Message_Block* msg() const = 0; 137 : 138 : /// The marshalled payload only (sample data) 139 : virtual const ACE_Message_Block* msg_payload() const = 0; 140 : 141 : /// Is the element a "control" sample from the specified pub_id? 142 : virtual bool is_control(GUID_t pub_id) const; 143 : 144 : /// Is the listener get called ? 145 : bool released() const; 146 : void released(bool flag); 147 : 148 : /// Clone method with provided message block allocator and data block 149 : /// allocators. 150 : static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg, 151 : MessageBlockAllocator* mb_allocator, 152 : DataBlockAllocator* db_allocator); 153 : 154 : /// Is the sample created by the transport? 155 : virtual bool owned_by_transport() = 0; 156 : 157 : /// Create two TransportQueueElements representing the same data payload 158 : /// as the current TransportQueueElement, with the first one (including its 159 : /// DataSampleHeader) fitting in "size" bytes. This method leaves the 160 : /// current TransportQueueElement alone (but can't be made const because 161 : /// the newly-created elements will need to invoke non-const methods on it). 162 : /// Each element in the pair will contain its own serialized modified 163 : /// DataSampleHeader. 164 : /// 165 : /// If the fragmentation fails, a copy of null_tqe_pair is returned. 166 : virtual TqePair fragment(size_t size); 167 : 168 : /// Is this QueueElement the result of fragmentation? 169 0 : virtual bool is_fragment() const { return false; } 170 : 171 : /// Is this QueueElement the last result of fragmentation? 172 0 : virtual bool is_last_fragment() const { return false; } 173 : 174 0 : virtual bool is_request_ack() const { return false; } 175 : 176 0 : virtual bool is_retained_replaced() const { return false; } 177 : 178 : struct OrderBySequenceNumber { 179 : bool operator()(const TransportQueueElement* lhs, const TransportQueueElement* rhs) const 180 : { 181 : const SequenceNumber seq_l = lhs->sequence(), seq_r = rhs->sequence(); 182 : return seq_l < seq_r || (seq_l == seq_r && lhs < rhs); 183 : } 184 : }; 185 : 186 : protected: 187 : 188 : /// Ctor. The initial_count is the number of DataLinks to which 189 : /// this TransportQueueElement will be sent. 190 : explicit TransportQueueElement(unsigned long initial_count); 191 : 192 : /// Invoked when the counter reaches 0. 193 : virtual void release_element(bool dropped_by_transport) = 0; 194 : 195 : /// May be used by subclass' implementation of release_element() 196 : /// to determine if any DataLinks dropped the data instead of 197 : /// delivering it. 198 : bool was_dropped() const; 199 : 200 : private: 201 : /// Common logic for data_dropped() and data_delivered(). 202 : bool decision_made(bool dropped_by_transport); 203 : friend class TransportCustomizedElement; 204 : 205 : /// Counts the number of outstanding sub-loans. 206 : Atomic<unsigned long> sub_loan_count_; 207 : 208 : /// Flag flipped to true if any DataLink dropped the sample. 209 : bool dropped_; 210 : 211 : /// If the callback to DW is made. 212 : bool released_; 213 : }; 214 : 215 : } // namespace DCPS 216 : } // namespace OpenDDS 217 : 218 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 219 : 220 : #if defined(__ACE_INLINE__) 221 : #include "TransportQueueElement.inl" 222 : #endif /* __ACE_INLINE__ */ 223 : 224 : #endif /* OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H */