Base wrapper class around a data/control sample to be sent. More...
#include <TransportQueueElement.h>
Classes | |
class | MatchCriteria |
class | MatchOnDataPayload |
class | MatchOnPubId |
Public Member Functions | |
virtual | ~TransportQueueElement () |
bool | data_dropped (bool dropped_by_transport=false) |
bool | data_delivered () |
virtual bool | requires_exclusive_packet () const |
Does the sample require an exclusive transport packet? | |
virtual RepoId | publication_id () const =0 |
Accessor for the publication id that sent the sample. | |
virtual RepoId | subscription_id () const |
Accessor for the subscription id, if sent the sample is sent to 1 sub. | |
virtual SequenceNumber | sequence () const |
virtual const ACE_Message_Block * | msg () const =0 |
The marshalled sample (sample header + sample data). | |
virtual const ACE_Message_Block * | msg_payload () const =0 |
The marshalled payload only (sample data). | |
virtual bool | is_control (RepoId pub_id) const |
Is the element a "control" sample from the specified pub_id? | |
bool | released () const |
Is the listener get called ? | |
void | released (bool flag) |
virtual bool | owned_by_transport ()=0 |
Is the sample created by the transport? | |
virtual ElementPair | fragment (size_t size) |
virtual bool | is_fragment () const |
Is this QueueElement the result of fragmentation? | |
virtual bool | is_request_ack () const |
Static Public Member Functions | |
static ACE_Message_Block * | clone_mb (const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator) |
Protected Member Functions | |
TransportQueueElement (unsigned long initial_count) | |
virtual void | release_element (bool dropped_by_transport)=0 |
Invoked when the counter reaches 0. | |
bool | was_dropped () const |
Private Member Functions | |
bool | decision_made (bool dropped_by_transport) |
Common logic for data_dropped() and data_delivered(). | |
Private Attributes | |
ACE_Atomic_Op < ACE_Thread_Mutex, unsigned long > | sub_loan_count_ |
Counts the number of outstanding sub-loans. | |
bool | dropped_ |
Flag flipped to true if any DataLink dropped the sample. | |
bool | released_ |
If the callback to DW is made. | |
Friends | |
class | TransportCustomizedElement |
Base wrapper class around a data/control sample to be sent.
This class serves as the base class for different types of samples that can be sent. For example, there are data samples and control samples. A subclass of TransportQueueElement exists for each of these types of samples.
This class maintains a counter that, when decremented to 0, will trigger some logic (defined in the subclass) that will "return the loan" of the sample. The sample is "loaned" to the transport via a send() or send_control() call on the TransportClient. This wrapper object will "return the loan" when all DataLinks have "returned" their sub-loans.
Definition at line 50 of file TransportQueueElement.h.
OpenDDS::DCPS::TransportQueueElement::~TransportQueueElement | ( | ) | [virtual] |
Definition at line 23 of file TransportQueueElement.cpp.
References DBG_ENTRY_LVL.
00024 { 00025 DBG_ENTRY_LVL("TransportQueueElement", "~TransportQueueElement", 6); 00026 }
ACE_INLINE OpenDDS::DCPS::TransportQueueElement::TransportQueueElement | ( | unsigned long | initial_count | ) | [explicit, protected] |
Ctor. The initial_count is the number of DataLinks to which this TransportQueueElement will be sent.
Definition at line 17 of file TransportQueueElement.inl.
References DBG_ENTRY_LVL.
00018 : sub_loan_count_(initial_count), 00019 dropped_(false), 00020 released_(false) 00021 { 00022 DBG_ENTRY_LVL("TransportQueueElement", "TransportQueueElement", 6); 00023 }
ACE_Message_Block * OpenDDS::DCPS::TransportQueueElement::clone_mb | ( | const ACE_Message_Block * | msg, | |
MessageBlockAllocator * | mb_allocator, | |||
DataBlockAllocator * | db_allocator | |||
) | [static] |
Clone method with provided message block allocator and data block allocators.
Definition at line 61 of file TransportQueueElement.cpp.
References ACE_Message_Block::base(), ACE_Message_Block::capacity(), ACE_Message_Block::cont(), ACE_Message_Block::copy(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, ACE_Message_Block::rd_ptr(), ACE_Message_Block::size(), ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.
Referenced by OpenDDS::DCPS::SingleSendBuffer::insert_buffer(), and OpenDDS::DCPS::TransportRetainedElement::TransportRetainedElement().
00064 { 00065 ACE_Message_Block* cur_block = const_cast<ACE_Message_Block*>(msg); 00066 ACE_Message_Block* head_copy = 0; 00067 ACE_Message_Block* cur_copy = 0; 00068 ACE_Message_Block* prev_copy = 0; 00069 // deep copy sample data 00070 while (cur_block != 0) { 00071 ACE_NEW_MALLOC_RETURN(cur_copy, 00072 static_cast<ACE_Message_Block*>( 00073 mb_allocator->malloc(sizeof(ACE_Message_Block))), 00074 ACE_Message_Block(cur_block->capacity(), 00075 ACE_Message_Block::MB_DATA, 00076 0, //cont 00077 0, //data 00078 0, //alloc_strategy 00079 0, //locking_strategy 00080 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00081 ACE_Time_Value::zero, 00082 ACE_Time_Value::max_time, 00083 db_allocator, 00084 mb_allocator), 00085 0); 00086 00087 cur_copy->copy(cur_block->base(), cur_block->size()); 00088 cur_copy->rd_ptr(cur_copy->base() + 00089 (cur_block->rd_ptr() - cur_block->base())); 00090 cur_copy->wr_ptr(cur_copy->base() + 00091 (cur_block->wr_ptr() - cur_block->base())); 00092 00093 if (head_copy == 0) { 00094 head_copy = cur_copy; 00095 } else { 00096 prev_copy->cont(cur_copy); 00097 } 00098 00099 prev_copy = cur_copy; 00100 00101 cur_block = cur_block->cont(); 00102 } 00103 00104 return head_copy; 00105 }
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_delivered | ( | ) |
Invoked when the sample has been sent by a DataLink. The return value indicates if this element is released.
Reimplemented in OpenDDS::DCPS::TransportControlElement.
Definition at line 36 of file TransportQueueElement.inl.
References DBG_ENTRY_LVL, decision_made(), and dropped_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request(), OpenDDS::DCPS::DataLink::handle_send_request_ack(), OpenDDS::DCPS::RtpsUdpDataLink::process_acked_by_all_i(), and OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications().
00037 { 00038 DBG_ENTRY_LVL("TransportQueueElement", "data_delivered", 6); 00039 // Decision made depend on dropped_ flag. If any link drops 00040 // the sample even other links deliver successfully, the 00041 // data dropped by transport will called back to writer. 00042 return this->decision_made(this->dropped_); 00043 }
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::data_dropped | ( | bool | dropped_by_transport = false |
) |
Invoked when the sample is dropped from a DataLink due to a remove_sample() call. The dropped_by_transport flag true indicates the data dropping is initiated by transport when the transport send strategy is in a MODE_TERMINATED. The dropped_by_transport flag false indicates the dropping is initiated by the remove_sample and data_dropped() is a result of remove_sample(). The return value indicates if this element is released.
Definition at line 27 of file TransportQueueElement.inl.
References DBG_ENTRY_LVL, decision_made(), and dropped_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::DataLink::send(), OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications(), OpenDDS::DCPS::DataLink::send_i(), OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref(), OpenDDS::DCPS::ThreadPerConRemoveVisitor::visit_element_remove(), OpenDDS::DCPS::RemoveAllVisitor::visit_element_remove(), and OpenDDS::DCPS::QueueRemoveVisitor::visit_element_remove().
00028 { 00029 DBG_ENTRY_LVL("TransportQueueElement", "data_dropped", 6); 00030 this->dropped_ = true; 00031 return this->decision_made(dropped_by_transport); 00032 }
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::decision_made | ( | bool | dropped_by_transport | ) | [private] |
Common logic for data_dropped() and data_delivered().
Definition at line 47 of file TransportQueueElement.inl.
References DBG_ENTRY_LVL, release_element(), and sub_loan_count_.
Referenced by data_delivered(), data_dropped(), and OpenDDS::DCPS::TransportCustomizedElement::release_element().
00048 { 00049 DBG_ENTRY_LVL("TransportQueueElement", "decision_made", 6); 00050 00051 const unsigned long new_count = --this->sub_loan_count_; 00052 if (new_count == 0) { 00053 // All interested subscriptions have been satisfied. 00054 00055 // The queue elements are released to its cached allocator 00056 // in release_element() call. 00057 // It's not necessary to set the released_ flag to true 00058 // as this element will be released anyway and not be 00059 // accessible. Note it can not be set after release_element 00060 // call. 00061 // this->released_ = true; 00062 this->release_element(dropped_by_transport); 00063 return true; 00064 } 00065 00066 // ciju: The sub_loan_count_ has been observed to drop below zero. 00067 // Since it isn't exactly a ref count and the object is created in 00068 // allocater memory (user space) we *probably* can disregard the 00069 // count for now. Ideally we would like to prevent the count from 00070 // falling below 0 and opening up this assert. 00071 // assert (new_count > 0); 00072 return false; 00073 }
ElementPair OpenDDS::DCPS::TransportQueueElement::fragment | ( | size_t | size | ) | [virtual] |
Create two TransportQueueElements representing the same data payload as the current TransportQueueElement, with the first one (including its DataSampleHeader) fitting in "size" bytes. This method leaves the current TransportQueueElement alone (but can't be made const because the newly-created elements will need to invoke non-const methods on it). Each element in the pair will contain its own serialized modified DataSampleHeader.
Reimplemented in OpenDDS::DCPS::RtpsCustomizedElement.
Definition at line 43 of file TransportQueueElement.cpp.
References OpenDDS::DCPS::move(), msg(), publication_id(), OpenDDS::DCPS::TransportCustomizedElement::set_msg(), OpenDDS::DCPS::TransportCustomizedElement::set_publication_id(), OpenDDS::DCPS::DataSampleHeader::split(), and TransportCustomizedElement.
Referenced by OpenDDS::DCPS::TransportSendStrategy::send().
00044 { 00045 Message_Block_Ptr head; 00046 Message_Block_Ptr tail; 00047 DataSampleHeader::split(*msg(), size, head, tail); 00048 00049 TransportCustomizedElement* frag = new TransportCustomizedElement(0, true); 00050 frag->set_publication_id(publication_id()); 00051 frag->set_msg(move(head)); 00052 00053 TransportCustomizedElement* rest = 00054 new TransportCustomizedElement(this, true); 00055 rest->set_msg(move(tail)); 00056 00057 return ElementPair(frag, rest); 00058 }
bool OpenDDS::DCPS::TransportQueueElement::is_control | ( | RepoId | pub_id | ) | const [virtual] |
Is the element a "control" sample from the specified pub_id?
Reimplemented in OpenDDS::DCPS::TransportSendControlElement.
Definition at line 36 of file TransportQueueElement.cpp.
References DBG_ENTRY_LVL.
00037 { 00038 DBG_ENTRY_LVL("TransportQueueElement", "is_control", 6); 00039 return false; 00040 }
virtual bool OpenDDS::DCPS::TransportQueueElement::is_fragment | ( | ) | const [inline, virtual] |
Is this QueueElement the result of fragmentation?
Reimplemented in OpenDDS::DCPS::TransportCustomizedElement.
Definition at line 147 of file TransportQueueElement.h.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert().
virtual bool OpenDDS::DCPS::TransportQueueElement::is_request_ack | ( | ) | const [inline, virtual] |
Reimplemented in OpenDDS::DCPS::TransportSendControlElement.
Definition at line 149 of file TransportQueueElement.h.
Referenced by OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(), and OpenDDS::DCPS::DataLink::send().
virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg | ( | ) | const [pure virtual] |
The marshalled sample (sample header + sample data).
Implemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportReplacedElement, OpenDDS::DCPS::TransportRetainedElement, OpenDDS::DCPS::TransportSendControlElement, and OpenDDS::DCPS::TransportSendElement.
Referenced by OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::RtpsUdpDataLink::durability_resend(), fragment(), OpenDDS::DCPS::SingleSendBuffer::insert(), OpenDDS::DCPS::TransportSendStrategy::send(), OpenDDS::DCPS::TransportReplacedElement::TransportReplacedElement(), OpenDDS::DCPS::CopyChainVisitor::visit_element(), OpenDDS::DCPS::BuildChainVisitor::visit_element(), OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref(), OpenDDS::DCPS::RemoveAllVisitor::visit_element_remove(), and OpenDDS::DCPS::QueueRemoveVisitor::visit_element_remove().
virtual const ACE_Message_Block* OpenDDS::DCPS::TransportQueueElement::msg_payload | ( | ) | const [pure virtual] |
The marshalled payload only (sample data).
Implemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportReplacedElement, OpenDDS::DCPS::TransportRetainedElement, OpenDDS::DCPS::TransportSendControlElement, OpenDDS::DCPS::TransportSendElement, and OpenDDS::DCPS::RtpsCustomizedElement.
Referenced by OpenDDS::DCPS::TransportQueueElement::MatchOnDataPayload::matches(), and OpenDDS::DCPS::TransportCustomizedElement::msg_payload().
virtual bool OpenDDS::DCPS::TransportQueueElement::owned_by_transport | ( | ) | [pure virtual] |
Is the sample created by the transport?
Implemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportReplacedElement, OpenDDS::DCPS::TransportRetainedElement, OpenDDS::DCPS::TransportSendControlElement, and OpenDDS::DCPS::TransportSendElement.
Referenced by OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications().
virtual RepoId OpenDDS::DCPS::TransportQueueElement::publication_id | ( | ) | const [pure virtual] |
Accessor for the publication id that sent the sample.
Implemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportReplacedElement, OpenDDS::DCPS::TransportRetainedElement, OpenDDS::DCPS::TransportSendControlElement, and OpenDDS::DCPS::TransportSendElement.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::add_delayed_notification(), OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), fragment(), OpenDDS::DCPS::TcpDataLink::handle_send_request_ack(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), OpenDDS::DCPS::TransportQueueElement::MatchOnPubId::matches(), OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper(), OpenDDS::DCPS::TransportReplacedElement::TransportReplacedElement(), and OpenDDS::DCPS::CopyChainVisitor::visit_element().
virtual void OpenDDS::DCPS::TransportQueueElement::release_element | ( | bool | dropped_by_transport | ) | [protected, pure virtual] |
Invoked when the counter reaches 0.
Implemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportReplacedElement, OpenDDS::DCPS::TransportRetainedElement, OpenDDS::DCPS::TransportSendControlElement, and OpenDDS::DCPS::TransportSendElement.
Referenced by decision_made().
ACE_INLINE void OpenDDS::DCPS::TransportQueueElement::released | ( | bool | flag | ) |
Definition at line 91 of file TransportQueueElement.inl.
References released_.
00092 { 00093 this->released_ = flag; 00094 }
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::released | ( | ) | const |
Is the listener get called ?
Definition at line 84 of file TransportQueueElement.inl.
References released_.
00085 { 00086 return this->released_; 00087 }
bool OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet | ( | ) | const [virtual] |
Does the sample require an exclusive transport packet?
Reimplemented in OpenDDS::DCPS::TransportControlElement, OpenDDS::DCPS::TransportCustomizedElement, and OpenDDS::DCPS::TransportSendControlElement.
Definition at line 29 of file TransportQueueElement.cpp.
References DBG_ENTRY_LVL.
Referenced by OpenDDS::DCPS::TransportSendStrategy::send().
00030 { 00031 DBG_ENTRY_LVL("TransportQueueElement", "requires_exclusive_packet", 6); 00032 return false; 00033 }
virtual SequenceNumber OpenDDS::DCPS::TransportQueueElement::sequence | ( | ) | const [inline, virtual] |
Reimplemented in OpenDDS::DCPS::TransportCustomizedElement, OpenDDS::DCPS::TransportSendControlElement, OpenDDS::DCPS::TransportSendElement, and OpenDDS::DCPS::RtpsCustomizedElement.
Definition at line 111 of file TransportQueueElement.h.
References OpenDDS::RTPS::SEQUENCENUMBER_UNKNOWN.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::add_elem_awaiting_ack(), OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::TcpDataLink::handle_send_request_ack(), OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert(), and OpenDDS::DCPS::TransportCustomizedElement::sequence().
00111 { 00112 return SequenceNumber::SEQUENCENUMBER_UNKNOWN(); 00113 }
virtual RepoId OpenDDS::DCPS::TransportQueueElement::subscription_id | ( | ) | const [inline, virtual] |
Accessor for the subscription id, if sent the sample is sent to 1 sub.
Reimplemented in OpenDDS::DCPS::TransportCustomizedElement, and OpenDDS::DCPS::TransportSendElement.
Definition at line 107 of file TransportQueueElement.h.
References OpenDDS::DCPS::GUID_UNKNOWN.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::add_gap_submsg(), OpenDDS::DCPS::RtpsUdpDataLink::customize_queue_element(), OpenDDS::DCPS::RtpsUdpDataLink::durability_resend(), OpenDDS::DCPS::SingleSendBuffer::insert(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().
00107 { 00108 return GUID_UNKNOWN; 00109 }
ACE_INLINE bool OpenDDS::DCPS::TransportQueueElement::was_dropped | ( | ) | const [protected] |
May be used by subclass' implementation of release_element() to determine if any DataLinks dropped the data instead of delivering it.
Definition at line 77 of file TransportQueueElement.inl.
References dropped_.
Referenced by OpenDDS::DCPS::TransportSendElement::release_element(), and OpenDDS::DCPS::TransportSendControlElement::release_element().
00078 { 00079 return this->dropped_; 00080 }
friend class TransportCustomizedElement [friend] |
Definition at line 169 of file TransportQueueElement.h.
Referenced by fragment().
bool OpenDDS::DCPS::TransportQueueElement::dropped_ [private] |
Flag flipped to true if any DataLink dropped the sample.
Definition at line 175 of file TransportQueueElement.h.
Referenced by data_delivered(), data_dropped(), and was_dropped().
bool OpenDDS::DCPS::TransportQueueElement::released_ [private] |
If the callback to DW is made.
Definition at line 178 of file TransportQueueElement.h.
Referenced by released().
ACE_Atomic_Op<ACE_Thread_Mutex, unsigned long> OpenDDS::DCPS::TransportQueueElement::sub_loan_count_ [private] |
Counts the number of outstanding sub-loans.
Definition at line 172 of file TransportQueueElement.h.
Referenced by decision_made().