OpenDDS  Snapshot(2023/04/28-20:55)
TransportQueueElement.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTQUEUEELEMENT_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTQUEUEELEMENT_H
10 
11 #include "dds/DCPS/dcps_export.h"
12 
13 #include <dds/DCPS/Atomic.h>
15 #include <dds/DCPS/Definitions.h>
16 #include <dds/DCPS/GuidUtils.h>
19 
20 #include <utility>
21 
23 class ACE_Message_Block;
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
32 typedef std::pair<TransportQueueElement*, TransportQueueElement*> TqePair;
33 extern OpenDDS_Dcps_Export const TqePair null_tqe_pair;
34 typedef OPENDDS_VECTOR(TransportQueueElement*) TqeVector;
35 
36 /**
37  * @class TransportQueueElement
38  *
39  * @brief Base wrapper class around a data/control sample to be sent.
40  *
41  * This class serves as the base class for different types of samples
42  * that can be sent. For example, there are data samples and control
43  * samples. A subclass of TransportQueueElement exists for each of
44  * these types of samples.
45  *
46  * This class maintains a counter that, when decremented to 0, will
47  * trigger some logic (defined in the subclass) that will "return
48  * the loan" of the sample. The sample is "loaned" to the transport
49  * via a send() or send_control() call on the TransportClient.
50  * This wrapper object will "return the loan" when all DataLinks have
51  * "returned" their sub-loans.
52  */
54 public:
55 
56  virtual ~TransportQueueElement();
57 
59  protected:
60  virtual ~MatchCriteria();
62  public:
63  virtual bool matches(const TransportQueueElement& candidate) const = 0;
64  virtual bool unique() const = 0; // (only expect to match 1 element)
65  private: // and unimplemented...
67  MatchCriteria& operator=(const MatchCriteria&);
68  };
69 
71  public:
72  explicit MatchOnPubId(const GUID_t& id) : pub_id_(id) {}
73  virtual ~MatchOnPubId();
74  virtual bool matches(const TransportQueueElement& candidate) const;
75  virtual bool unique() const { return false; }
76  private:
78  };
79 
81  public:
82  explicit MatchOnDataPayload(const char* data) : data_(data) {}
83  virtual ~MatchOnDataPayload();
84  virtual bool matches(const TransportQueueElement& candidate) const;
85  virtual bool unique() const { return true; }
86  private:
87  const char* data_;
88  };
89 
91  public:
92  explicit MatchOnElement(const TransportQueueElement* element) : element_(element) {}
93  virtual ~MatchOnElement();
94  virtual bool matches(const TransportQueueElement& candidate) const;
95  virtual bool unique() const { return true; }
96  private:
98  };
99 
100  /// Invoked when the sample is dropped from a DataLink due to a
101  /// remove_sample() call.
102  /// The dropped_by_transport flag true indicates the data dropping is initiated
103  /// by transport when the transport send strategy is in a MODE_TERMINATED.
104  /// The dropped_by_transport flag false indicates the dropping is initiated
105  /// by the remove_sample and data_dropped() is a result of remove_sample().
106  /// The return value indicates if this element is released.
107  bool data_dropped(bool dropped_by_transport = false);
108 
109  /// Invoked when the sample has been sent by a DataLink.
110  /// The return value indicates if this element is released.
111  bool data_delivered();
112 
113  /// Delay releasing the element by one decision (either a data_dropped or
114  /// data_delivered).
115  void increment_loan() { ++sub_loan_count_; }
116 
117  /// Does the sample require an exclusive transport packet?
118  virtual bool requires_exclusive_packet() const;
119 
120  /// Accessor for the publication id that sent the sample.
121  virtual GUID_t publication_id() const = 0;
122 
123  /// Accessor for the subscription id, if sent the sample is sent to 1 sub
124  virtual GUID_t subscription_id() const {
125  return GUID_UNKNOWN;
126  }
127 
128  virtual SequenceNumber sequence() const {
130  }
131 
132  /// A reference-incremented duplicate of the marshalled sample (sample header + sample data)
133  virtual ACE_Message_Block* duplicate_msg() const = 0;
134 
135  /// The marshalled sample (sample header + sample data)
136  virtual const ACE_Message_Block* msg() const = 0;
137 
138  /// The marshalled payload only (sample data)
139  virtual const ACE_Message_Block* msg_payload() const = 0;
140 
141  /// Is the element a "control" sample from the specified pub_id?
142  virtual bool is_control(GUID_t pub_id) const;
143 
144  /// Is the listener get called ?
145  bool released() const;
146  void released(bool flag);
147 
148  /// Clone method with provided message block allocator and data block
149  /// allocators.
150  static ACE_Message_Block* clone_mb(const ACE_Message_Block* msg,
151  MessageBlockAllocator* mb_allocator,
152  DataBlockAllocator* db_allocator);
153 
154  /// Is the sample created by the transport?
155  virtual bool owned_by_transport() = 0;
156 
157  /// Create two TransportQueueElements representing the same data payload
158  /// as the current TransportQueueElement, with the first one (including its
159  /// DataSampleHeader) fitting in "size" bytes. This method leaves the
160  /// current TransportQueueElement alone (but can't be made const because
161  /// the newly-created elements will need to invoke non-const methods on it).
162  /// Each element in the pair will contain its own serialized modified
163  /// DataSampleHeader.
164  ///
165  /// If the fragmentation fails, a copy of null_tqe_pair is returned.
166  virtual TqePair fragment(size_t size);
167 
168  /// Is this QueueElement the result of fragmentation?
169  virtual bool is_fragment() const { return false; }
170 
171  /// Is this QueueElement the last result of fragmentation?
172  virtual bool is_last_fragment() const { return false; }
173 
174  virtual bool is_request_ack() const { return false; }
175 
176  virtual bool is_retained_replaced() const { return false; }
177 
179  bool operator()(const TransportQueueElement* lhs, const TransportQueueElement* rhs) const
180  {
181  const SequenceNumber seq_l = lhs->sequence(), seq_r = rhs->sequence();
182  return seq_l < seq_r || (seq_l == seq_r && lhs < rhs);
183  }
184  };
185 
186 protected:
187 
188  /// Ctor. The initial_count is the number of DataLinks to which
189  /// this TransportQueueElement will be sent.
190  explicit TransportQueueElement(unsigned long initial_count);
191 
192  /// Invoked when the counter reaches 0.
193  virtual void release_element(bool dropped_by_transport) = 0;
194 
195  /// May be used by subclass' implementation of release_element()
196  /// to determine if any DataLinks dropped the data instead of
197  /// delivering it.
198  bool was_dropped() const;
199 
200 private:
201  /// Common logic for data_dropped() and data_delivered().
202  bool decision_made(bool dropped_by_transport);
204 
205  /// Counts the number of outstanding sub-loans.
207 
208  /// Flag flipped to true if any DataLink dropped the sample.
209  bool dropped_;
210 
211  /// If the callback to DW is made.
212  bool released_;
213 };
214 
215 } // namespace DCPS
216 } // namespace OpenDDS
217 
219 
220 #if defined(__ACE_INLINE__)
221 #include "TransportQueueElement.inl"
222 #endif /* __ACE_INLINE__ */
223 
224 #endif /* OPENDDS_DCPS_TRANSPORTQUEUEELEMENT_H */
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
virtual SequenceNumber sequence() const
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
virtual bool is_last_fragment() const
Is this QueueElement the last result of fragmentation?
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
Atomic< unsigned long > sub_loan_count_
Counts the number of outstanding sub-loans.
virtual bool is_fragment() const
Is this QueueElement the result of fragmentation?
MatchOnElement(const TransportQueueElement *element)
bool operator()(const TransportQueueElement *lhs, const TransportQueueElement *rhs) const
bool dropped_
Flag flipped to true if any DataLink dropped the sample.
bool released_
If the callback to DW is made.
#define ACE_END_VERSIONED_NAMESPACE_DECL
const TqePair null_tqe_pair
Sequence number abstraction. Only allows positive 64 bit values.
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Base wrapper class around a data/control sample to be sent.