OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Static Public Member Functions | Protected Member Functions | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::TransportQueueElement Class Referenceabstract

Base wrapper class around a data/control sample to be sent. More...

#include <TransportQueueElement.h>

Inheritance diagram for OpenDDS::DCPS::TransportQueueElement:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportQueueElement:
Collaboration graph
[legend]

Classes

class  MatchCriteria
 
class  MatchOnDataPayload
 
class  MatchOnElement
 
class  MatchOnPubId
 
struct  OrderBySequenceNumber
 

Public Member Functions

virtual ~TransportQueueElement ()
 
bool data_dropped (bool dropped_by_transport=false)
 
bool data_delivered ()
 
void increment_loan ()
 
virtual bool requires_exclusive_packet () const
 Does the sample require an exclusive transport packet? More...
 
virtual GUID_t publication_id () const =0
 Accessor for the publication id that sent the sample. More...
 
virtual GUID_t subscription_id () const
 Accessor for the subscription id, if sent the sample is sent to 1 sub. More...
 
virtual SequenceNumber sequence () const
 
virtual ACE_Message_Blockduplicate_msg () const =0
 A reference-incremented duplicate of the marshalled sample (sample header + sample data) More...
 
virtual const ACE_Message_Blockmsg () const =0
 The marshalled sample (sample header + sample data) More...
 
virtual const ACE_Message_Blockmsg_payload () const =0
 The marshalled payload only (sample data) More...
 
virtual bool is_control (GUID_t pub_id) const
 Is the element a "control" sample from the specified pub_id? More...
 
bool released () const
 Is the listener get called ? More...
 
void released (bool flag)
 
virtual bool owned_by_transport ()=0
 Is the sample created by the transport? More...
 
virtual TqePair fragment (size_t size)
 
virtual bool is_fragment () const
 Is this QueueElement the result of fragmentation? More...
 
virtual bool is_last_fragment () const
 Is this QueueElement the last result of fragmentation? More...
 
virtual bool is_request_ack () const
 
virtual bool is_retained_replaced () const
 

Static Public Member Functions

static ACE_Message_Blockclone_mb (const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)
 

Protected Member Functions

 TransportQueueElement (unsigned long initial_count)
 
virtual void release_element (bool dropped_by_transport)=0
 Invoked when the counter reaches 0. More...
 
bool was_dropped () const
 

Private Member Functions

bool decision_made (bool dropped_by_transport)
 Common logic for data_dropped() and data_delivered(). More...
 

Private Attributes

Atomic< unsigned long > sub_loan_count_
 Counts the number of outstanding sub-loans. More...
 
bool dropped_
 Flag flipped to true if any DataLink dropped the sample. More...
 
bool released_
 If the callback to DW is made. More...
 

Friends

class TransportCustomizedElement
 

Detailed Description

Base wrapper class around a data/control sample to be sent.

This class serves as the base class for different types of samples that can be sent. For example, there are data samples and control samples. A subclass of TransportQueueElement exists for each of these types of samples.

This class maintains a counter that, when decremented to 0, will trigger some logic (defined in the subclass) that will "return the loan" of the sample. The sample is "loaned" to the transport via a send() or send_control() call on the TransportClient. This wrapper object will "return the loan" when all DataLinks have "returned" their sub-loans.

Definition at line 53 of file TransportQueueElement.h.

Constructor & Destructor Documentation

◆ ~TransportQueueElement()

OpenDDS::DCPS::TransportQueueElement::~TransportQueueElement ( )
virtual

Definition at line 25 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

26 {
27  DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
28 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ TransportQueueElement()

ACE_INLINE OpenDDS::DCPS::TransportQueueElement::TransportQueueElement ( unsigned long  initial_count)
explicitprotected

Ctor. The initial_count is the number of DataLinks to which this TransportQueueElement will be sent.

Definition at line 17 of file TransportQueueElement.inl.

References ACE_INLINE, and DBG_ENTRY_LVL.

18  : sub_loan_count_(initial_count),
19  dropped_(false),
20  released_(false)
21 {
22  DBG_ENTRY_LVL("TransportQueueElement", "TransportQueueElement", 6);
23 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool released_
If the callback to DW is made.
Atomic< unsigned long > sub_loan_count_
Counts the number of outstanding sub-loans.
bool dropped_
Flag flipped to true if any DataLink dropped the sample.

Member Function Documentation

◆ clone_mb()

ACE_Message_Block * OpenDDS::DCPS::TransportQueueElement::clone_mb ( const ACE_Message_Block msg,
MessageBlockAllocator mb_allocator,
DataBlockAllocator db_allocator 
)
static

Clone method with provided message block allocator and data block allocators.

Definition at line 62 of file TransportQueueElement.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC_RETURN, ACE_Message_Block::base(), ACE_Message_Block::capacity(), ACE_Message_Block::cont(), ACE_Message_Block::copy(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, msg(), ACE_Message_Block::rd_ptr(), ACE_Message_Block::size(), ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::SingleSendBuffer::retain_buffer().

65 {
66  ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
67  ACE_Message_Block* head_copy = 0;
68  ACE_Message_Block* cur_copy = 0;
69  ACE_Message_Block* prev_copy = 0;
70  // deep copy sample data
71  while (cur_block != 0) {
72  ACE_NEW_MALLOC_RETURN(cur_copy,
73  static_cast<ACE_Message_Block*>(
74  mb_allocator->malloc(sizeof(ACE_Message_Block))),
75  ACE_Message_Block(cur_block->capacity(),
77  0, //cont
78  0, //data
79  0, //alloc_strategy
80  0, //locking_strategy
84  db_allocator,
85  mb_allocator),
86  0);
87 
88  cur_copy->copy(cur_block->base(), cur_block->size());
89  cur_copy->rd_ptr(cur_copy->base() +
90  (cur_block->rd_ptr() - cur_block->base()));
91  cur_copy->wr_ptr(cur_copy->base() +
92  (cur_block->wr_ptr() - cur_block->base()));
93 
94  if (head_copy == 0) {
95  head_copy = cur_copy;
96  } else {
97  prev_copy->cont(cur_copy);
98  }
99 
100  prev_copy = cur_copy;
101 
102  cur_block = cur_block->cont();
103  }
104 
105  return head_copy;
106 }
static const ACE_Time_Value max_time
char * rd_ptr(void) const
int copy(const char *buf, size_t n)
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
size_t size(void) const
ACE_Message_Block * cont(void) const
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
size_t capacity(void) const
char * wr_ptr(void) const
static const ACE_Time_Value zero
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
char * base(void) const

◆ data_delivered()

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_delivered ( )

Invoked when the sample has been sent by a DataLink. The return value indicates if this element is released.

Definition at line 36 of file TransportQueueElement.inl.

References ACE_INLINE, DBG_ENTRY_LVL, decision_made(), and dropped_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request(), OpenDDS::DCPS::DataLink::handle_send_request_ack(), and OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications().

37 {
38  DBG_ENTRY_LVL("TransportQueueElement", "data_delivered", 6);
39  // Decision made depend on dropped_ flag. If any link drops
40  // the sample even other links deliver successfully, the
41  // data dropped by transport will called back to writer.
42  return decision_made(dropped_);
43 }
bool decision_made(bool dropped_by_transport)
Common logic for data_dropped() and data_delivered().
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool dropped_
Flag flipped to true if any DataLink dropped the sample.

◆ data_dropped()

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_dropped ( bool  dropped_by_transport = false)

Invoked when the sample is dropped from a DataLink due to a remove_sample() call. The dropped_by_transport flag true indicates the data dropping is initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample(). The return value indicates if this element is released.

Definition at line 27 of file TransportQueueElement.inl.

References ACE_INLINE, DBG_ENTRY_LVL, decision_made(), and dropped_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element_non_reliable_i(), OpenDDS::DCPS::TcpDataLink::handle_send_request_ack(), OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_sample(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::DataLink::send(), OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications(), OpenDDS::DCPS::TcpDataLink::send_i(), OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref(), OpenDDS::DCPS::RemoveAllVisitor::visit_element_remove(), OpenDDS::DCPS::QueueRemoveVisitor::visit_element_remove(), and OpenDDS::DCPS::ThreadPerConRemoveVisitor::visit_element_remove().

28 {
29  DBG_ENTRY_LVL("TransportQueueElement", "data_dropped", 6);
30  dropped_ = true;
31  return decision_made(dropped_by_transport);
32 }
bool decision_made(bool dropped_by_transport)
Common logic for data_dropped() and data_delivered().
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
bool dropped_
Flag flipped to true if any DataLink dropped the sample.

◆ decision_made()

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::decision_made ( bool  dropped_by_transport)
private

Common logic for data_dropped() and data_delivered().

Definition at line 47 of file TransportQueueElement.inl.

References ACE_INLINE, DBG_ENTRY_LVL, OPENDDS_ASSERT, release_element(), and sub_loan_count_.

Referenced by data_delivered(), data_dropped(), and OpenDDS::DCPS::TransportCustomizedElement::release_element().

48 {
49  DBG_ENTRY_LVL("TransportQueueElement", "decision_made", 6);
50 
52 
53  const unsigned long new_count = --sub_loan_count_;
54  if (new_count == 0) {
55  // All interested subscriptions have been satisfied.
56 
57  // The queue elements are released to its cached allocator
58  // in release_element() call.
59  // It's not necessary to set the released_ flag to true
60  // as this element will be released anyway and not be
61  // accessible. Note it can not be set after release_element
62  // call.
63  // released_ = true;
64  release_element(dropped_by_transport);
65  return true;
66  }
67 
68  return false;
69 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
virtual void release_element(bool dropped_by_transport)=0
Invoked when the counter reaches 0.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Atomic< unsigned long > sub_loan_count_
Counts the number of outstanding sub-loans.

◆ duplicate_msg()

virtual ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::duplicate_msg ( ) const
pure virtual

◆ fragment()

TqePair OpenDDS::DCPS::TransportQueueElement::fragment ( size_t  size)
virtual

Create two TransportQueueElements representing the same data payload as the current TransportQueueElement, with the first one (including its DataSampleHeader) fitting in "size" bytes. This method leaves the current TransportQueueElement alone (but can't be made const because the newly-created elements will need to invoke non-const methods on it). Each element in the pair will contain its own serialized modified DataSampleHeader.

If the fragmentation fails, a copy of null_tqe_pair is returned.

Reimplemented in OpenDDS::DCPS::RtpsCustomizedElement.

Definition at line 44 of file TransportQueueElement.cpp.

References OpenDDS::DCPS::move(), msg(), OpenDDS::DCPS::TransportCustomizedElement::set_fragment(), OpenDDS::DCPS::TransportCustomizedElement::set_msg(), OpenDDS::DCPS::DataSampleHeader::split(), and TransportCustomizedElement.

Referenced by OpenDDS::DCPS::TransportSendStrategy::send().

45 {
46  Message_Block_Ptr head;
47  Message_Block_Ptr tail;
48  DataSampleHeader::split(*msg(), size, head, tail);
49 
51  frag->set_fragment(this);
52  frag->set_msg(move(head));
53 
55  rest->set_fragment(this);
56  rest->set_msg(move(tail));
57 
58  return TqePair(frag, rest);
59 }
static void split(const ACE_Message_Block &orig, size_t size, Message_Block_Ptr &head, Message_Block_Ptr &tail)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ increment_loan()

void OpenDDS::DCPS::TransportQueueElement::increment_loan ( )
inline

Delay releasing the element by one decision (either a data_dropped or data_delivered).

Definition at line 115 of file TransportQueueElement.h.

Referenced by OpenDDS::DCPS::TransportSendStrategy::fragmentation_helper().

115 { ++sub_loan_count_; }
Atomic< unsigned long > sub_loan_count_
Counts the number of outstanding sub-loans.

◆ is_control()

bool OpenDDS::DCPS::TransportQueueElement::is_control ( GUID_t  pub_id) const
virtual

Is the element a "control" sample from the specified pub_id?

Reimplemented in OpenDDS::DCPS::TransportSendControlElement.

Definition at line 38 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

39 {
40  DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
41  return false;
42 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ is_fragment()

virtual bool OpenDDS::DCPS::TransportQueueElement::is_fragment ( ) const
inlinevirtual

◆ is_last_fragment()

virtual bool OpenDDS::DCPS::TransportQueueElement::is_last_fragment ( ) const
inlinevirtual

Is this QueueElement the last result of fragmentation?

Reimplemented in OpenDDS::DCPS::TransportSendControlElement, OpenDDS::DCPS::TransportCustomizedElement, and OpenDDS::DCPS::TransportSendElement.

Definition at line 172 of file TransportQueueElement.h.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::customize_queue_element_helper().

172 { return false; }

◆ is_request_ack()

virtual bool OpenDDS::DCPS::TransportQueueElement::is_request_ack ( ) const
inlinevirtual

◆ is_retained_replaced()

virtual bool OpenDDS::DCPS::TransportQueueElement::is_retained_replaced ( ) const
inlinevirtual

◆ msg()

virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg ( ) const
pure virtual

◆ msg_payload()

virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg_payload ( ) const
pure virtual

◆ owned_by_transport()

virtual bool OpenDDS::DCPS::TransportQueueElement::owned_by_transport ( )
pure virtual

◆ publication_id()

virtual GUID_t OpenDDS::DCPS::TransportQueueElement::publication_id ( ) const
pure virtual

◆ release_element()

virtual void OpenDDS::DCPS::TransportQueueElement::release_element ( bool  dropped_by_transport)
protectedpure virtual

◆ released() [1/2]

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::released ( ) const

Is the listener get called ?

Definition at line 80 of file TransportQueueElement.inl.

References ACE_INLINE, and released_.

81 {
82  return released_;
83 }
bool released_
If the callback to DW is made.

◆ released() [2/2]

ACE_INLINE void OpenDDS::DCPS::TransportQueueElement::released ( bool  flag)

Definition at line 87 of file TransportQueueElement.inl.

References ACE_INLINE, and released_.

88 {
89  released_ = flag;
90 }
bool released_
If the callback to DW is made.

◆ requires_exclusive_packet()

bool OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet ( ) const
virtual

Does the sample require an exclusive transport packet?

Reimplemented in OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportSendControlElement, and OpenDDS::DCPS::TransportControlElement.

Definition at line 31 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

Referenced by OpenDDS::DCPS::TransportSendStrategy::send().

32 {
33  DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
34  return false;
35 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ sequence()

virtual SequenceNumber OpenDDS::DCPS::TransportQueueElement::sequence ( ) const
inlinevirtual

◆ subscription_id()

virtual GUID_t OpenDDS::DCPS::TransportQueueElement::subscription_id ( ) const
inlinevirtual

◆ was_dropped()

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::was_dropped ( ) const
protected

May be used by subclass' implementation of release_element() to determine if any DataLinks dropped the data instead of delivering it.

Definition at line 73 of file TransportQueueElement.inl.

References ACE_INLINE, and dropped_.

Referenced by OpenDDS::DCPS::TransportSendElement::release_element(), and OpenDDS::DCPS::TransportSendControlElement::release_element().

74 {
75  return dropped_;
76 }
bool dropped_
Flag flipped to true if any DataLink dropped the sample.

Friends And Related Function Documentation

◆ TransportCustomizedElement

friend class TransportCustomizedElement
friend

Definition at line 203 of file TransportQueueElement.h.

Referenced by fragment().

Member Data Documentation

◆ dropped_

bool OpenDDS::DCPS::TransportQueueElement::dropped_
private

Flag flipped to true if any DataLink dropped the sample.

Definition at line 209 of file TransportQueueElement.h.

Referenced by data_delivered(), data_dropped(), and was_dropped().

◆ released_

bool OpenDDS::DCPS::TransportQueueElement::released_
private

If the callback to DW is made.

Definition at line 212 of file TransportQueueElement.h.

Referenced by released().

◆ sub_loan_count_

Atomic<unsigned long> OpenDDS::DCPS::TransportQueueElement::sub_loan_count_
private

Counts the number of outstanding sub-loans.

Definition at line 206 of file TransportQueueElement.h.

Referenced by decision_made().


The documentation for this class was generated from the following files: