Public Member Functions | |
MultiSendBuffer (RtpsUdpDataLink *outer, size_t capacity) | |
void | retain_all (RepoId pub_id) |
void | insert (SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain) |
Public Attributes | |
RtpsUdpDataLink * | outer_ |
Definition at line 191 of file RtpsUdpDataLink.h.
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::MultiSendBuffer | ( | RtpsUdpDataLink * | outer, | |
size_t | capacity | |||
) | [inline] |
Definition at line 193 of file RtpsUdpDataLink.h.
00194 : TransportSendBuffer(capacity) 00195 , outer_(outer) 00196 {}
void OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert | ( | SequenceNumber | sequence, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 609 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::TransportQueueElement::is_fragment(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OpenDDS::DCPS::RtpsCustomizedElement::last_fragment(), LM_DEBUG, LM_ERROR, OPENDDS_STRING, outer_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy(), OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::SingleSendBuffer::UNLIMITED, and OpenDDS::DCPS::RtpsUdpDataLink::writers_.
00612 { 00613 // Called from TransportSendStrategy::send_packet(). 00614 // RtpsUdpDataLink is already locked. 00615 const TransportQueueElement* const tqe = q->peek(); 00616 const SequenceNumber seq = tqe->sequence(); 00617 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 00618 return; 00619 } 00620 00621 const RepoId pub_id = tqe->publication_id(); 00622 00623 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id); 00624 if (wi == outer_->writers_.end()) { 00625 return; // this datawriter is not reliable 00626 } 00627 00628 RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_; 00629 00630 if (send_buff.is_nil()) { 00631 send_buff = make_rch<SingleSendBuffer>(SingleSendBuffer::UNLIMITED, 1 /*mspp*/); 00632 00633 send_buff->bind(outer_->send_strategy()); 00634 } 00635 00636 if (Transport_debug_level > 5) { 00637 const GuidConverter pub(pub_id); 00638 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - " 00639 "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(), 00640 (int)tqe->is_fragment())); 00641 } 00642 00643 if (tqe->is_fragment()) { 00644 const RtpsCustomizedElement* const rce = 00645 dynamic_cast<const RtpsCustomizedElement*>(tqe); 00646 if (rce) { 00647 send_buff->insert_fragment(seq, rce->last_fragment(), q, chain); 00648 } else if (Transport_debug_level) { 00649 const GuidConverter pub(pub_id); 00650 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()" 00651 " - ERROR: couldn't get fragment number for pub_id %C seq %q\n", 00652 OPENDDS_STRING(pub).c_str(), seq.getValue())); 00653 } 00654 } else { 00655 send_buff->insert(seq, q, chain); 00656 } 00657 }
void OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all | ( | RepoId | pub_id | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 599 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::lock_, outer_, and OpenDDS::DCPS::RtpsUdpDataLink::writers_.
00600 { 00601 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_); 00602 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id); 00603 if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) { 00604 wi->second.send_buff_->retain_all(pub_id); 00605 } 00606 }
Definition at line 203 of file RtpsUdpDataLink.h.
Referenced by insert(), and retain_all().