OpenDDS::DCPS::SingleSendBuffer Class Reference

#include <TransportSendBuffer.h>

Inheritance diagram for OpenDDS::DCPS::SingleSendBuffer:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::SingleSendBuffer:

Collaboration graph
[legend]
List of all members.

Public Member Functions

void release_all ()
typedef OPENDDS_MAP (SequenceNumber, BufferType) BufferMap
void release_acked (SequenceNumber seq)
void release (BufferMap::iterator buffer_iter)
size_t n_chunks () const
 SingleSendBuffer (size_t capacity, size_t max_samples_per_packet)
 ~SingleSendBuffer ()
bool resend (const SequenceRange &range, DisjointSequence *gaps=0)
bool resend_i (const SequenceRange &range, DisjointSequence *gaps=0)
void resend_fragments_i (const SequenceNumber &sequence, const DisjointSequence &fragments)
SequenceNumber low () const
SequenceNumber high () const
bool empty () const
bool contains (const SequenceNumber &seq) const
void retain_all (RepoId pub_id)
void insert (SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
void insert_fragment (SequenceNumber sequence, SequenceNumber fragment, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)

Static Public Attributes

static const size_t UNLIMITED = 0

Private Member Functions

void check_capacity ()
RemoveResult retain_buffer (const RepoId &pub_id, BufferType &buffer)
void insert_buffer (BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
typedef OPENDDS_MAP (SequenceNumber, BufferMap) FragmentMap

Private Attributes

size_t n_chunks_
TransportRetainedElementAllocator retained_allocator_
MessageBlockAllocator retained_mb_allocator_
DataBlockAllocator retained_db_allocator_
TransportReplacedElementAllocator replaced_allocator_
MessageBlockAllocator replaced_mb_allocator_
DataBlockAllocator replaced_db_allocator_
BufferMap buffers_
FragmentMap fragments_

Detailed Description

Implementation of TransportSendBuffer that manages data for a single domain of SequenceNumbers -- for a given SingleSendBuffer object, the sequence numbers passed to insert() must be generated from the same place.

Definition at line 69 of file TransportSendBuffer.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::SingleSendBuffer::SingleSendBuffer ( size_t  capacity,
size_t  max_samples_per_packet 
)

Definition at line 42 of file TransportSendBuffer.cpp.

00044   : TransportSendBuffer(capacity),
00045     n_chunks_(capacity * max_samples_per_packet),
00046     retained_allocator_(this->n_chunks_),
00047     retained_mb_allocator_(this->n_chunks_ * 2),
00048     retained_db_allocator_(this->n_chunks_ * 2),
00049     replaced_allocator_(this->n_chunks_),
00050     replaced_mb_allocator_(this->n_chunks_ * 2),
00051     replaced_db_allocator_(this->n_chunks_ * 2)
00052 {
00053 }

OpenDDS::DCPS::SingleSendBuffer::~SingleSendBuffer (  ) 

Definition at line 55 of file TransportSendBuffer.cpp.

References release_all().

00056 {
00057   release_all();
00058 }


Member Function Documentation

void OpenDDS::DCPS::SingleSendBuffer::check_capacity (  )  [private]

Definition at line 265 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::TransportSendBuffer::capacity_, release(), OpenDDS::DCPS::Transport_debug_level, and UNLIMITED.

Referenced by insert(), and insert_fragment().

00266 {
00267   if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00268     return;
00269   }
00270   // Age off oldest sample if we are at capacity:
00271   if (this->buffers_.size() == this->capacity_) {
00272     BufferMap::iterator it(this->buffers_.begin());
00273     if (it == this->buffers_.end()) return;
00274 
00275     if (Transport_debug_level > 5) {
00276       ACE_DEBUG((LM_DEBUG,
00277         ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00278         ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00279         it->first.getValue(),
00280         it->second.first, it->second.second
00281       ));
00282     }
00283 
00284     release(it);
00285   }
00286 }

ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::contains ( const SequenceNumber seq  )  const

Definition at line 53 of file TransportSendBuffer.inl.

References buffers_.

00054 {
00055   return this->buffers_.count(seq);
00056 }

ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::empty (  )  const

Definition at line 47 of file TransportSendBuffer.inl.

References buffers_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

00048 {
00049   return this->buffers_.empty();
00050 }

ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::high (  )  const

Definition at line 40 of file TransportSendBuffer.inl.

References buffers_.

Referenced by resend_i().

00041 {
00042   if (this->buffers_.empty()) throw std::exception();
00043   return this->buffers_.rbegin()->first;
00044 }

void OpenDDS::DCPS::SingleSendBuffer::insert ( SequenceNumber  sequence,
TransportSendStrategy::QueueType queue,
ACE_Message_Block *  chain 
) [virtual]

Implements OpenDDS::DCPS::TransportSendBuffer.

Definition at line 196 of file TransportSendBuffer.cpp.

References buffers_, check_capacity(), OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), and OpenDDS::DCPS::Transport_debug_level.

00199 {
00200   check_capacity();
00201 
00202   BufferType& buffer = this->buffers_[sequence];
00203   insert_buffer(buffer, queue, chain);
00204 
00205   if (Transport_debug_level > 5) {
00206     ACE_DEBUG((LM_DEBUG,
00207       ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00208       ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00209       sequence.getValue(),
00210       buffer.first, buffer.second
00211     ));
00212   }
00213 }

void OpenDDS::DCPS::SingleSendBuffer::insert_buffer ( BufferType buffer,
TransportSendStrategy::QueueType queue,
ACE_Message_Block *  chain 
) [private]

Definition at line 216 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::TransportQueueElement::clone_mb(), and OpenDDS::DCPS::BasicQueue< T >::size().

Referenced by insert(), and insert_fragment().

00219 {
00220   // Copy sample's TransportQueueElements:
00221   TransportSendStrategy::QueueType*& elems = buffer.first;
00222   ACE_NEW(elems, TransportSendStrategy::QueueType(queue->size(), 1));
00223 
00224   CopyChainVisitor visitor(*elems,
00225                            &this->retained_allocator_,
00226                            &this->retained_mb_allocator_,
00227                            &this->retained_db_allocator_);
00228   queue->accept_visitor(visitor);
00229 
00230   // Copy sample's message/data block descriptors:
00231   ACE_Message_Block*& data = buffer.second;
00232   data = TransportQueueElement::clone_mb(chain,
00233                                          &this->retained_mb_allocator_,
00234                                          &this->retained_db_allocator_);
00235 }

void OpenDDS::DCPS::SingleSendBuffer::insert_fragment ( SequenceNumber  sequence,
SequenceNumber  fragment,
TransportSendStrategy::QueueType queue,
ACE_Message_Block *  chain 
)

Definition at line 238 of file TransportSendBuffer.cpp.

References buffers_, check_capacity(), fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), and OpenDDS::DCPS::Transport_debug_level.

00242 {
00243   check_capacity();
00244 
00245   // Insert into buffers_ so that the overall capacity is maintained
00246   // The entry in buffers_ with two null pointers indicates that the
00247   // actual data is stored in fragments_[sequence].
00248   buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00249                                       static_cast<ACE_Message_Block*>(0));
00250 
00251   BufferType& buffer = fragments_[sequence][fragment];
00252   insert_buffer(buffer, queue, chain);
00253 
00254   if (Transport_debug_level > 5) {
00255     ACE_DEBUG((LM_DEBUG,
00256       ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00257       ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00258       sequence.getValue(), fragment.getValue(),
00259       buffer.first, buffer.second
00260     ));
00261   }
00262 }

ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::low (  )  const

Definition at line 33 of file TransportSendBuffer.inl.

References buffers_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), and resend_i().

00034 {
00035   if (this->buffers_.empty()) throw std::exception();
00036   return this->buffers_.begin()->first;
00037 }

ACE_INLINE size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks (  )  const

Definition at line 27 of file TransportSendBuffer.inl.

References n_chunks_.

00028 {
00029   return this->n_chunks_;
00030 }

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferMap   
) [private]

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferType   
)

void OpenDDS::DCPS::SingleSendBuffer::release ( BufferMap::iterator  buffer_iter  ) 

Definition at line 91 of file TransportSendBuffer.cpp.

References buffers_, fragments_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by check_capacity(), release_acked(), release_all(), and retain_all().

00092 {
00093   BufferType& buffer(buffer_iter->second);
00094   if (Transport_debug_level > 5) {
00095     ACE_DEBUG((LM_DEBUG,
00096       ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00097       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00098       buffer.first, buffer.second
00099     ));
00100   }
00101 
00102   if (buffer.first && buffer.second) {
00103     // not a fragment
00104     RemoveAllVisitor visitor;
00105     buffer.first->accept_remove_visitor(visitor);
00106     delete buffer.first;
00107 
00108     buffer.second->release();
00109     buffer.second = 0;
00110 
00111   } else {
00112     // data actually stored in fragments_
00113     const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00114     if (fm_it != fragments_.end()) {
00115       for (BufferMap::iterator bm_it = fm_it->second.begin();
00116            bm_it != fm_it->second.end(); ++bm_it) {
00117         RemoveAllVisitor visitor;
00118         bm_it->second.first->accept_remove_visitor(visitor);
00119         delete bm_it->second.first;
00120 
00121         bm_it->second.second->release();
00122         bm_it->second.second = 0;
00123       }
00124       fragments_.erase(fm_it);
00125     }
00126   }
00127 
00128   buffers_.erase(buffer_iter);
00129 }

void OpenDDS::DCPS::SingleSendBuffer::release_acked ( SequenceNumber  seq  ) 

Definition at line 70 of file TransportSendBuffer.cpp.

References buffers_, release(), and OpenDDS::DCPS::Transport_debug_level.

00070                                                   {
00071   BufferMap::iterator buffer_iter = buffers_.begin();
00072   BufferType& buffer(buffer_iter->second);
00073 
00074   if (Transport_debug_level > 5) {
00075     ACE_DEBUG((LM_DEBUG,
00076       ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00077       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00078       buffer.first, buffer.second
00079     ));
00080   }
00081   while (buffer_iter != buffers_.end()) {
00082     if (buffer_iter->first == seq) {
00083       release(buffer_iter);
00084       return;
00085     }
00086     ++buffer_iter;
00087   }
00088 }

void OpenDDS::DCPS::SingleSendBuffer::release_all (  ) 

Definition at line 61 of file TransportSendBuffer.cpp.

References buffers_, and release().

Referenced by ~SingleSendBuffer().

00062 {
00063   for (BufferMap::iterator it(this->buffers_.begin());
00064        it != this->buffers_.end();) {
00065     release(it++);
00066   }
00067 }

bool OpenDDS::DCPS::SingleSendBuffer::resend ( const SequenceRange range,
DisjointSequence gaps = 0 
)

Definition at line 289 of file TransportSendBuffer.cpp.

References resend_i(), and OpenDDS::DCPS::TransportSendBuffer::strategy_lock().

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

00290 {
00291   ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00292   return resend_i(range, gaps);
00293 }

void OpenDDS::DCPS::SingleSendBuffer::resend_fragments_i ( const SequenceNumber sequence,
const DisjointSequence fragments 
)

Definition at line 337 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::DisjointSequence::empty(), fragments_, OpenDDS::DCPS::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::SequenceNumber::ZERO().

00339 {
00340   if (fragments_.empty() || requested_frags.empty()) {
00341     return;
00342   }
00343   const BufferMap& buffers = fragments_[seq];
00344   const OPENDDS_VECTOR(SequenceRange) psr =
00345     requested_frags.present_sequence_ranges();
00346   SequenceNumber sent = SequenceNumber::ZERO();
00347   for (size_t i = 0; i < psr.size(); ++i) {
00348     BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00349     if (it == buffers.end()) {
00350       return;
00351     }
00352     BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00353     while (true) {
00354       if (sent < it->first) {
00355         resend_one(it->second);
00356         sent = it->first;
00357       }
00358       if (it == it2) {
00359         break;
00360       }
00361       ++it;
00362     }
00363   }
00364 }

bool OpenDDS::DCPS::SingleSendBuffer::resend_i ( const SequenceRange range,
DisjointSequence gaps = 0 
)

Definition at line 296 of file TransportSendBuffer.cpp.

References fragments_, high(), low(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by resend(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().

00297 {
00298   //Special case, nak to make sure it has all history
00299   const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00300 
00301   for (SequenceNumber sequence(range.first);
00302        sequence <= range.second; ++sequence) {
00303     // Re-send requested sample if still buffered; missing samples
00304     // will be scored against the given DisjointSequence:
00305     BufferMap::iterator it(this->buffers_.find(sequence));
00306     if (it == this->buffers_.end()) {
00307       if (gaps) {
00308         gaps->insert(sequence);
00309       }
00310     } else {
00311       if (Transport_debug_level > 5) {
00312         ACE_DEBUG((LM_DEBUG,
00313                    ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00314                    ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00315                    sequence.getValue(),
00316                    it->second.first,
00317                    it->second.second));
00318       }
00319       if (it->second.first && it->second.second) {
00320         resend_one(it->second);
00321       } else {
00322         const FragmentMap::iterator fm_it = fragments_.find(it->first);
00323         if (fm_it != fragments_.end()) {
00324           for (BufferMap::iterator bm_it = fm_it->second.begin();
00325                 bm_it != fm_it->second.end(); ++bm_it) {
00326             resend_one(bm_it->second);
00327           }
00328         }
00329       }
00330     }
00331   }
00332   // Have we resent all requested data?
00333   return lowForAllResent >= low() && range.second <= high();
00334 }

void OpenDDS::DCPS::SingleSendBuffer::retain_all ( RepoId  pub_id  )  [virtual]

Implements OpenDDS::DCPS::TransportSendBuffer.

Definition at line 132 of file TransportSendBuffer.cpp.

References buffers_, fragments_, OPENDDS_STRING, release(), OpenDDS::DCPS::REMOVE_ERROR, retain_buffer(), and OpenDDS::DCPS::Transport_debug_level.

00133 {
00134   if (Transport_debug_level > 5) {
00135     GuidConverter converter(pub_id);
00136     ACE_DEBUG((LM_DEBUG,
00137       ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00138       ACE_TEXT("copying out blocks for publication: %C\n"),
00139       OPENDDS_STRING(converter).c_str()
00140     ));
00141   }
00142   for (BufferMap::iterator it(this->buffers_.begin());
00143        it != this->buffers_.end();) {
00144     if (it->second.first && it->second.second) {
00145       if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00146         GuidConverter converter(pub_id);
00147         ACE_ERROR((LM_WARNING,
00148                    ACE_TEXT("(%P|%t) WARNING: ")
00149                    ACE_TEXT("SingleSendBuffer::retain_all: ")
00150                    ACE_TEXT("failed to retain data from publication: %C!\n"),
00151                    OPENDDS_STRING(converter).c_str()));
00152         release(it++);
00153       } else {
00154         ++it;
00155       }
00156 
00157     } else {
00158       const FragmentMap::iterator fm_it = fragments_.find(it->first);
00159       if (fm_it != fragments_.end()) {
00160         for (BufferMap::iterator bm_it = fm_it->second.begin();
00161              bm_it != fm_it->second.end();) {
00162           if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00163             GuidConverter converter(pub_id);
00164             ACE_ERROR((LM_WARNING,
00165                        ACE_TEXT("(%P|%t) WARNING: ")
00166                        ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00167                        ACE_TEXT("retain fragment data from publication: %C!\n"),
00168                        OPENDDS_STRING(converter).c_str()));
00169             release(bm_it++);
00170           } else {
00171             ++bm_it;
00172           }
00173         }
00174       }
00175       ++it;
00176     }
00177   }
00178 }

RemoveResult OpenDDS::DCPS::SingleSendBuffer::retain_buffer ( const RepoId pub_id,
BufferType buffer 
) [private]

Definition at line 181 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::PacketRemoveVisitor::status().

Referenced by retain_all().

00182 {
00183   TransportQueueElement::MatchOnPubId match(pub_id);
00184   PacketRemoveVisitor visitor(match,
00185                               buffer.second,
00186                               buffer.second,
00187                               this->replaced_allocator_,
00188                               this->replaced_mb_allocator_,
00189                               this->replaced_db_allocator_);
00190 
00191   buffer.first->accept_replace_visitor(visitor);
00192   return visitor.status();
00193 }


Member Data Documentation

BufferMap OpenDDS::DCPS::SingleSendBuffer::buffers_ [private]

Definition at line 123 of file TransportSendBuffer.h.

Referenced by contains(), empty(), high(), insert(), insert_fragment(), low(), release(), release_acked(), release_all(), and retain_all().

FragmentMap OpenDDS::DCPS::SingleSendBuffer::fragments_ [private]

Definition at line 126 of file TransportSendBuffer.h.

Referenced by insert_fragment(), release(), resend_fragments_i(), resend_i(), and retain_all().

size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks_ [private]

Definition at line 114 of file TransportSendBuffer.h.

Referenced by n_chunks().

TransportReplacedElementAllocator OpenDDS::DCPS::SingleSendBuffer::replaced_allocator_ [private]

Definition at line 119 of file TransportSendBuffer.h.

DataBlockAllocator OpenDDS::DCPS::SingleSendBuffer::replaced_db_allocator_ [private]

Definition at line 121 of file TransportSendBuffer.h.

MessageBlockAllocator OpenDDS::DCPS::SingleSendBuffer::replaced_mb_allocator_ [private]

Definition at line 120 of file TransportSendBuffer.h.

TransportRetainedElementAllocator OpenDDS::DCPS::SingleSendBuffer::retained_allocator_ [private]

Definition at line 116 of file TransportSendBuffer.h.

DataBlockAllocator OpenDDS::DCPS::SingleSendBuffer::retained_db_allocator_ [private]

Definition at line 118 of file TransportSendBuffer.h.

MessageBlockAllocator OpenDDS::DCPS::SingleSendBuffer::retained_mb_allocator_ [private]

Definition at line 117 of file TransportSendBuffer.h.

const size_t OpenDDS::DCPS::SingleSendBuffer::UNLIMITED = 0 [static]

Definition at line 73 of file TransportSendBuffer.h.

Referenced by check_capacity(), and OpenDDS::DCPS::RtpsUdpDataLink::MultiSendBuffer::insert().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:33 2016 for OpenDDS by  doxygen 1.4.7