Public Member Functions | |
MultiSendBuffer (RtpsUdpDataLink *outer, size_t capacity) | |
void | retain_all (RepoId pub_id) |
void | insert (SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain) |
Public Attributes | |
RtpsUdpDataLink * | outer_ |
Definition at line 165 of file RtpsUdpDataLink.h.
OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::MultiSendBuffer | ( | RtpsUdpDataLink * | outer, | |
size_t | capacity | |||
) | [inline] |
Definition at line 167 of file RtpsUdpDataLink.h.
00168 : TransportSendBuffer(capacity) 00169 , outer_(outer) 00170 {}
void OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert | ( | SequenceNumber | sequence, | |
TransportSendStrategy::QueueType * | queue, | |||
ACE_Message_Block * | chain | |||
) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 550 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::TransportQueueElement::is_fragment(), OpenDDS::DCPS::RcHandle< T >::is_nil(), OPENDDS_STRING, outer_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::TransportQueueElement::publication_id(), OpenDDS::DCPS::RtpsUdpDataLink::send_strategy_, OpenDDS::DCPS::TransportQueueElement::sequence(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::SingleSendBuffer::UNLIMITED, and OpenDDS::DCPS::RtpsUdpDataLink::writers_.
00553 { 00554 // Called from TransportSendStrategy::send_packet(). 00555 // RtpsUdpDataLink is already locked. 00556 const TransportQueueElement* const tqe = q->peek(); 00557 const SequenceNumber seq = tqe->sequence(); 00558 if (seq == SequenceNumber::SEQUENCENUMBER_UNKNOWN()) { 00559 return; 00560 } 00561 00562 const RepoId pub_id = tqe->publication_id(); 00563 00564 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id); 00565 if (wi == outer_->writers_.end()) { 00566 return; // this datawriter is not reliable 00567 } 00568 00569 RcHandle<SingleSendBuffer>& send_buff = wi->second.send_buff_; 00570 00571 if (send_buff.is_nil()) { 00572 send_buff = new SingleSendBuffer(SingleSendBuffer::UNLIMITED, 1 /*mspp*/); 00573 00574 send_buff->bind(outer_->send_strategy_.in()); 00575 } 00576 00577 if (Transport_debug_level > 5) { 00578 const GuidConverter pub(pub_id); 00579 ACE_DEBUG((LM_DEBUG, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert() - " 00580 "pub_id %C seq %q frag %d\n", OPENDDS_STRING(pub).c_str(), seq.getValue(), 00581 (int)tqe->is_fragment())); 00582 } 00583 00584 if (tqe->is_fragment()) { 00585 const RtpsCustomizedElement* const rce = 00586 dynamic_cast<const RtpsCustomizedElement*>(tqe); 00587 if (rce) { 00588 send_buff->insert_fragment(seq, rce->last_fragment(), q, chain); 00589 } else if (Transport_debug_level) { 00590 const GuidConverter pub(pub_id); 00591 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpDataLink::MultiSendBuffer::insert()" 00592 " - ERROR: couldn't get fragment number for pub_id %C seq %q\n", 00593 OPENDDS_STRING(pub).c_str(), seq.getValue())); 00594 } 00595 } else { 00596 send_buff->insert(seq, q, chain); 00597 } 00598 }
void OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::retain_all | ( | RepoId | pub_id | ) | [virtual] |
Implements OpenDDS::DCPS::TransportSendBuffer.
Definition at line 540 of file RtpsUdpDataLink.cpp.
References OpenDDS::DCPS::RtpsUdpDataLink::lock_, outer_, and OpenDDS::DCPS::RtpsUdpDataLink::writers_.
00541 { 00542 ACE_GUARD(ACE_Thread_Mutex, g, outer_->lock_); 00543 const RtpsWriterMap::iterator wi = outer_->writers_.find(pub_id); 00544 if (wi != outer_->writers_.end() && !wi->second.send_buff_.is_nil()) { 00545 wi->second.send_buff_->retain_all(pub_id); 00546 } 00547 }