OpenDDS  Snapshot(2023/04/28-20:55)
TransportQueueElement.inl
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "ace/Message_Block.h"
9 #include "EntryExit.h"
10 
12 
13 namespace OpenDDS {
14 namespace DCPS {
15 
18  : sub_loan_count_(initial_count),
19  dropped_(false),
20  released_(false)
21 {
22  DBG_ENTRY_LVL("TransportQueueElement", "TransportQueueElement", 6);
23 }
24 
26 bool
27 TransportQueueElement::data_dropped(bool dropped_by_transport)
28 {
29  DBG_ENTRY_LVL("TransportQueueElement", "data_dropped", 6);
30  dropped_ = true;
31  return decision_made(dropped_by_transport);
32 }
33 
35 bool
37 {
38  DBG_ENTRY_LVL("TransportQueueElement", "data_delivered", 6);
39  // Decision made depend on dropped_ flag. If any link drops
40  // the sample even other links deliver successfully, the
41  // data dropped by transport will called back to writer.
42  return decision_made(dropped_);
43 }
44 
46 bool
47 TransportQueueElement::decision_made(bool dropped_by_transport)
48 {
49  DBG_ENTRY_LVL("TransportQueueElement", "decision_made", 6);
50 
52 
53  const unsigned long new_count = --sub_loan_count_;
54  if (new_count == 0) {
55  // All interested subscriptions have been satisfied.
56 
57  // The queue elements are released to its cached allocator
58  // in release_element() call.
59  // It's not necessary to set the released_ flag to true
60  // as this element will be released anyway and not be
61  // accessible. Note it can not be set after release_element
62  // call.
63  // released_ = true;
64  release_element(dropped_by_transport);
65  return true;
66  }
67 
68  return false;
69 }
70 
72 bool
74 {
75  return dropped_;
76 }
77 
79 bool
81 {
82  return released_;
83 }
84 
86 void
88 {
89  released_ = flag;
90 }
91 
93 bool
95  const TransportQueueElement& candidate) const
96 {
97  return pub_id_ == candidate.publication_id()
98  && pub_id_ != GUID_UNKNOWN;
99 }
100 
102 bool
104  const TransportQueueElement& candidate) const
105 {
106  const ACE_Message_Block* payload = candidate.msg_payload();
107  if (!payload) {
108  return false;
109  }
110  return data_ == payload->rd_ptr();
111 }
112 
114 bool
116  const TransportQueueElement& candidate) const
117 {
118  return element_ == &candidate;
119 }
120 
121 }
122 }
123 
virtual bool matches(const TransportQueueElement &candidate) const
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool data_dropped(bool dropped_by_transport=false)
TransportQueueElement(unsigned long initial_count)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
char * rd_ptr(void) const
virtual void release_element(bool dropped_by_transport)=0
Invoked when the counter reaches 0.
Atomic< unsigned long > sub_loan_count_
Counts the number of outstanding sub-loans.
bool dropped_
Flag flipped to true if any DataLink dropped the sample.
bool decision_made(bool dropped_by_transport)
Common logic for data_dropped() and data_delivered().
bool released_
If the callback to DW is made.
virtual bool matches(const TransportQueueElement &candidate) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual GUID_t publication_id() const =0
Accessor for the publication id that sent the sample.
#define ACE_INLINE
bool released() const
Is the listener get called ?
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual bool matches(const TransportQueueElement &candidate) const
Base wrapper class around a data/control sample to be sent.
virtual const ACE_Message_Block * msg_payload() const =0
The marshalled payload only (sample data)