Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #include "ace/Message_Block.h" 9 : #include "EntryExit.h" 10 : 11 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 12 : 13 : namespace OpenDDS { 14 : namespace DCPS { 15 : 16 : ACE_INLINE 17 4 : TransportQueueElement::TransportQueueElement(unsigned long initial_count) 18 4 : : sub_loan_count_(initial_count), 19 4 : dropped_(false), 20 4 : released_(false) 21 : { 22 : DBG_ENTRY_LVL("TransportQueueElement", "TransportQueueElement", 6); 23 4 : } 24 : 25 : ACE_INLINE 26 : bool 27 4 : TransportQueueElement::data_dropped(bool dropped_by_transport) 28 : { 29 : DBG_ENTRY_LVL("TransportQueueElement", "data_dropped", 6); 30 4 : dropped_ = true; 31 4 : return decision_made(dropped_by_transport); 32 : } 33 : 34 : ACE_INLINE 35 : bool 36 0 : TransportQueueElement::data_delivered() 37 : { 38 : DBG_ENTRY_LVL("TransportQueueElement", "data_delivered", 6); 39 : // Decision made depend on dropped_ flag. If any link drops 40 : // the sample even other links deliver successfully, the 41 : // data dropped by transport will called back to writer. 42 0 : return decision_made(dropped_); 43 : } 44 : 45 : ACE_INLINE 46 : bool 47 4 : TransportQueueElement::decision_made(bool dropped_by_transport) 48 : { 49 : DBG_ENTRY_LVL("TransportQueueElement", "decision_made", 6); 50 : 51 4 : OPENDDS_ASSERT(sub_loan_count_); 52 : 53 4 : const unsigned long new_count = --sub_loan_count_; 54 4 : if (new_count == 0) { 55 : // All interested subscriptions have been satisfied. 56 : 57 : // The queue elements are released to its cached allocator 58 : // in release_element() call. 59 : // It's not necessary to set the released_ flag to true 60 : // as this element will be released anyway and not be 61 : // accessible. Note it can not be set after release_element 62 : // call. 63 : // released_ = true; 64 4 : release_element(dropped_by_transport); 65 4 : return true; 66 : } 67 : 68 0 : return false; 69 : } 70 : 71 : ACE_INLINE 72 : bool 73 0 : TransportQueueElement::was_dropped() const 74 : { 75 0 : return dropped_; 76 : } 77 : 78 : ACE_INLINE 79 : bool 80 : TransportQueueElement::released() const 81 : { 82 : return released_; 83 : } 84 : 85 : ACE_INLINE 86 : void 87 : TransportQueueElement::released(bool flag) 88 : { 89 : released_ = flag; 90 : } 91 : 92 : ACE_INLINE 93 : bool 94 0 : TransportQueueElement::MatchOnPubId::matches( 95 : const TransportQueueElement& candidate) const 96 : { 97 0 : return pub_id_ == candidate.publication_id() 98 0 : && pub_id_ != GUID_UNKNOWN; 99 : } 100 : 101 : ACE_INLINE 102 : bool 103 0 : TransportQueueElement::MatchOnDataPayload::matches( 104 : const TransportQueueElement& candidate) const 105 : { 106 0 : const ACE_Message_Block* payload = candidate.msg_payload(); 107 0 : if (!payload) { 108 0 : return false; 109 : } 110 0 : return data_ == payload->rd_ptr(); 111 : } 112 : 113 : ACE_INLINE 114 : bool 115 0 : TransportQueueElement::MatchOnElement::matches( 116 : const TransportQueueElement& candidate) const 117 : { 118 0 : return element_ == &candidate; 119 : } 120 : 121 : } 122 : } 123 : 124 : OPENDDS_END_VERSIONED_NAMESPACE_DECL