OpenDDS::DCPS::TransportQueueElement Class Reference

Base wrapper class around a data/control sample to be sent. More...

#include <TransportQueueElement.h>

Inheritance diagram for OpenDDS::DCPS::TransportQueueElement:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportQueueElement:
Collaboration graph
[legend]

List of all members.

Classes

class  MatchCriteria
class  MatchOnDataPayload
class  MatchOnPubId

Public Member Functions

virtual ~TransportQueueElement ()
bool data_dropped (bool dropped_by_transport=false)
bool data_delivered ()
virtual bool requires_exclusive_packet () const
 Does the sample require an exclusive transport packet?
virtual RepoId publication_id () const =0
 Accessor for the publication id that sent the sample.
virtual RepoId subscription_id () const
 Accessor for the subscription id, if sent the sample is sent to 1 sub.
virtual SequenceNumber sequence () const
virtual const ACE_Message_Blockmsg () const =0
 The marshalled sample (sample header + sample data).
virtual const ACE_Message_Blockmsg_payload () const =0
 The marshalled payload only (sample data).
virtual bool is_control (RepoId pub_id) const
 Is the element a "control" sample from the specified pub_id?
bool released () const
 Is the listener get called ?
void released (bool flag)
virtual bool owned_by_transport ()=0
 Is the sample created by the transport?
virtual ElementPair fragment (size_t size)
virtual bool is_fragment () const
 Is this QueueElement the result of fragmentation?
virtual bool is_request_ack () const

Static Public Member Functions

static ACE_Message_Blockclone_mb (const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)

Protected Member Functions

 TransportQueueElement (unsigned long initial_count)
virtual void release_element (bool dropped_by_transport)=0
 Invoked when the counter reaches 0.
bool was_dropped () const

Private Member Functions

bool decision_made (bool dropped_by_transport)
 Common logic for data_dropped() and data_delivered().

Private Attributes

ACE_Atomic_Op
< ACE_Thread_Mutex, unsigned
long > 
sub_loan_count_
 Counts the number of outstanding sub-loans.
bool dropped_
 Flag flipped to true if any DataLink dropped the sample.
bool released_
 If the callback to DW is made.

Friends

class TransportCustomizedElement

Detailed Description

Base wrapper class around a data/control sample to be sent.

This class serves as the base class for different types of samples that can be sent. For example, there are data samples and control samples. A subclass of TransportQueueElement exists for each of these types of samples.

This class maintains a counter that, when decremented to 0, will trigger some logic (defined in the subclass) that will "return the loan" of the sample. The sample is "loaned" to the transport via a send() or send_control() call on the TransportClient. This wrapper object will "return the loan" when all DataLinks have "returned" their sub-loans.

Definition at line 50 of file TransportQueueElement.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportQueueElement::~TransportQueueElement (  )  [virtual]

Definition at line 23 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

00024 {
00025   DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6);
00026 }

ACE_INLINE OpenDDS::DCPS::TransportQueueElement::TransportQueueElement ( unsigned long  initial_count  )  [explicit, protected]

Ctor. The initial_count is the number of DataLinks to which this TransportQueueElement will be sent.

Definition at line 17 of file TransportQueueElement.inl.

References DBG_ENTRY_LVL.

00018   : sub_loan_count_(initial_count),
00019     dropped_(false),
00020     released_(false)
00021 {
00022   DBG_ENTRY_LVL("TransportQueueElement", "TransportQueueElement", 6);
00023 }


Member Function Documentation

ACE_Message_Block * OpenDDS::DCPS::TransportQueueElement::clone_mb ( const ACE_Message_Block msg,
MessageBlockAllocator mb_allocator,
DataBlockAllocator db_allocator 
) [static]

Clone method with provided message block allocator and data block allocators.

Definition at line 61 of file TransportQueueElement.cpp.

References ACE_Message_Block::base(), ACE_Message_Block::capacity(), ACE_Message_Block::cont(), ACE_Message_Block::copy(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, ACE_Message_Block::rd_ptr(), ACE_Message_Block::size(), ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

Referenced by OpenDDS::DCPS::SingleSendBuffer::insert_buffer(), and OpenDDS::DCPS::TransportRetainedElement::TransportRetainedElement().

00064 {
00065   ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg);
00066   ACE_Message_Block* head_copy = 0;
00067   ACE_Message_Block* cur_copy  = 0;
00068   ACE_Message_Block* prev_copy = 0;
00069   // deep copy sample data
00070   while (cur_block != 0) {
00071     ACE_NEW_MALLOC_RETURN(cur_copy,
00072                           static_cast<ACE_Message_Block*>(
00073                           mb_allocator->malloc(sizeof(ACE_Message_Block))),
00074                           ACE_Message_Block(cur_block->capacity(),
00075                                             ACE_Message_Block::MB_DATA,
00076                                             0, //cont
00077                                             0, //data
00078                                             0, //alloc_strategy
00079                                             0, //locking_strategy
00080                                             ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00081                                             ACE_Time_Value::zero,
00082                                             ACE_Time_Value::max_time,
00083                                             db_allocator,
00084                                             mb_allocator),
00085                           0);
00086 
00087     cur_copy->copy(cur_block->base(), cur_block->size());
00088     cur_copy->rd_ptr(cur_copy->base() +
00089                      (cur_block->rd_ptr() - cur_block->base()));
00090     cur_copy->wr_ptr(cur_copy->base() +
00091                      (cur_block->wr_ptr() - cur_block->base()));
00092 
00093     if (head_copy == 0) {
00094       head_copy = cur_copy;
00095     } else {
00096       prev_copy->cont(cur_copy);
00097     }
00098 
00099     prev_copy = cur_copy;
00100 
00101     cur_block = cur_block->cont();
00102   }
00103 
00104   return head_copy;
00105 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_delivered (  ) 

Invoked when the sample has been sent by a DataLink. The return value indicates if this element is released.

Reimplemented in OpenDDS::DCPS::TransportControlElement.

Definition at line 36 of file TransportQueueElement.inl.

References DBG_ENTRY_LVL, decision_made(), and dropped_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request(), OpenDDS::DCPS::DataLink::handle_send_request_ack(), OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i(), and OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications().

00037 {
00038   DBG_ENTRY_LVL("TransportQueueElement", "data_delivered", 6);
00039   // Decision made depend on dropped_ flag. If any link drops
00040   // the sample even other links deliver successfully, the
00041   // data dropped by transport will called back to writer.
00042   return this->decision_made(this->dropped_);
00043 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_dropped ( bool  dropped_by_transport = false  ) 

Invoked when the sample is dropped from a DataLink due to a remove_sample() call. The dropped_by_transport flag true indicates the data dropping is initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample(). The return value indicates if this element is released.

Definition at line 27 of file TransportQueueElement.inl.

References DBG_ENTRY_LVL, decision_made(), and dropped_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::DataLink::send(), OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications(), OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref(), OpenDDS::DCPS::ThreadPerConRemoveVisitor::visit_element_remove(), OpenDDS::DCPS::RemoveAllVisitor::visit_element_remove(), and OpenDDS::DCPS::QueueRemoveVisitor::visit_element_remove().

00028 {
00029   DBG_ENTRY_LVL("TransportQueueElement", "data_dropped", 6);
00030   this->dropped_ = true;
00031   return this->decision_made(dropped_by_transport);
00032 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::decision_made ( bool  dropped_by_transport  )  [private]

Common logic for data_dropped() and data_delivered().

Definition at line 47 of file TransportQueueElement.inl.

References DBG_ENTRY_LVL, release_element(), and sub_loan_count_.

Referenced by data_delivered(), data_dropped(), and OpenDDS::DCPS::TransportCustomizedElement::release_element().

00048 {
00049   DBG_ENTRY_LVL("TransportQueueElement", "decision_made", 6);
00050 
00051   const unsigned long new_count = --this->sub_loan_count_;
00052   if (new_count == 0) {
00053     // All interested subscriptions have been satisfied.
00054 
00055     // The queue elements are released to its cached allocator
00056     // in release_element() call.
00057     // It's not necessary to set the released_ flag to true
00058     // as this element will be released anyway and not be
00059     // accessible. Note it can not be set after release_element
00060     // call.
00061     // this->released_ = true;
00062     this->release_element(dropped_by_transport);
00063     return true;
00064   }
00065 
00066   // ciju: The sub_loan_count_ has been observed to drop below zero.
00067   // Since it isn't exactly a ref count and the object is created in
00068   // allocater memory (user space) we *probably* can disregard the
00069   // count for now. Ideally we would like to prevent the count from
00070   // falling below 0 and opening up this assert.
00071   // assert (new_count > 0);
00072   return false;
00073 }

Here is the call graph for this function:

Here is the caller graph for this function:

ElementPair OpenDDS::DCPS::TransportQueueElement::fragment ( size_t  size  )  [virtual]

Create two TransportQueueElements representing the same data payload as the current TransportQueueElement, with the first one (including its DataSampleHeader) fitting in "size" bytes. This method leaves the current TransportQueueElement alone (but can't be made const because the newly-created elements will need to invoke non-const methods on it). Each element in the pair will contain its own serialized modified DataSampleHeader.

Reimplemented in OpenDDS::DCPS::RtpsCustomizedElement.

Definition at line 43 of file TransportQueueElement.cpp.

References OpenDDS::DCPS::move(), msg(), publication_id(), OpenDDS::DCPS::TransportCustomizedElement::set_msg(), OpenDDS::DCPS::TransportCustomizedElement::set_publication_id(), OpenDDS::DCPS::DataSampleHeader::split(), and TransportCustomizedElement.

Referenced by OpenDDS::DCPS::TransportSendStrategy::send().

00044 {
00045   Message_Block_Ptr head;
00046   Message_Block_Ptr tail;
00047   DataSampleHeader::split(*msg(), size, head, tail);
00048 
00049   TransportCustomizedElement* frag = new TransportCustomizedElement(0, true);
00050   frag->set_publication_id(publication_id());
00051   frag->set_msg(move(head));
00052 
00053   TransportCustomizedElement* rest =
00054     new TransportCustomizedElement(this, true);
00055   rest->set_msg(move(tail));
00056 
00057   return ElementPair(frag, rest);
00058 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportQueueElement::is_control ( RepoId  pub_id  )  const [virtual]

Is the element a "control" sample from the specified pub_id?

Reimplemented in OpenDDS::DCPS::TransportSendControlElement.

Definition at line 36 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

00037 {
00038   DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6);
00039   return false;
00040 }

virtual bool OpenDDS::DCPS::TransportQueueElement::is_fragment (  )  const [inline, virtual]

Is this QueueElement the result of fragmentation?

Reimplemented in OpenDDS::DCPS::TransportCustomizedElement.

Definition at line 147 of file TransportQueueElement.h.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert().

00147 { return false; }

Here is the caller graph for this function:

virtual bool OpenDDS::DCPS::TransportQueueElement::is_request_ack (  )  const [inline, virtual]

Reimplemented in OpenDDS::DCPS::TransportSendControlElement.

Definition at line 149 of file TransportQueueElement.h.

Referenced by OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(), and OpenDDS::DCPS::DataLink::send().

00149 { return false; }

Here is the caller graph for this function:

virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg (  )  const [pure virtual]
virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg_payload (  )  const [pure virtual]
virtual bool OpenDDS::DCPS::TransportQueueElement::owned_by_transport (  )  [pure virtual]
virtual RepoId OpenDDS::DCPS::TransportQueueElement::publication_id (  )  const [pure virtual]
virtual void OpenDDS::DCPS::TransportQueueElement::release_element ( bool  dropped_by_transport  )  [protected, pure virtual]
ACE_INLINE void OpenDDS::DCPS::TransportQueueElement::released ( bool  flag  ) 

Definition at line 91 of file TransportQueueElement.inl.

References released_.

00092 {
00093   this->released_ = flag;
00094 }

ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::released (  )  const

Is the listener get called ?

Definition at line 84 of file TransportQueueElement.inl.

References released_.

00085 {
00086   return this->released_;
00087 }

bool OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet (  )  const [virtual]

Does the sample require an exclusive transport packet?

Reimplemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, and OpenDDS::DCPS::TransportSendControlElement.

Definition at line 29 of file TransportQueueElement.cpp.

References DBG_ENTRY_LVL.

Referenced by OpenDDS::DCPS::TransportSendStrategy::send().

00030 {
00031   DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6);
00032   return false;
00033 }

Here is the caller graph for this function:

virtual SequenceNumber OpenDDS::DCPS::TransportQueueElement::sequence (  )  const [inline, virtual]
virtual RepoId OpenDDS::DCPS::TransportQueueElement::subscription_id (  )  const [inline, virtual]
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::was_dropped (  )  const [protected]

May be used by subclass' implementation of release_element() to determine if any DataLinks dropped the data instead of delivering it.

Definition at line 77 of file TransportQueueElement.inl.

References dropped_.

Referenced by OpenDDS::DCPS::TransportSendElement::release_element(), and OpenDDS::DCPS::TransportSendControlElement::release_element().

00078 {
00079   return this->dropped_;
00080 }

Here is the caller graph for this function:


Friends And Related Function Documentation

friend class TransportCustomizedElement [friend]

Definition at line 169 of file TransportQueueElement.h.

Referenced by fragment().


Member Data Documentation

Flag flipped to true if any DataLink dropped the sample.

Definition at line 175 of file TransportQueueElement.h.

Referenced by data_delivered(), data_dropped(), and was_dropped().

If the callback to DW is made.

Definition at line 178 of file TransportQueueElement.h.

Referenced by released().

Counts the number of outstanding sub-loans.

Definition at line 172 of file TransportQueueElement.h.

Referenced by decision_made().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1