OpenDDS::DCPS::SingleSendBuffer Class Reference

#include <TransportSendBuffer.h>

Inheritance diagram for OpenDDS::DCPS::SingleSendBuffer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::SingleSendBuffer:
Collaboration graph
[legend]

List of all members.

Public Member Functions

void release_all ()
typedef OPENDDS_MAP (SequenceNumber, BufferType) BufferMap
void release_acked (SequenceNumber seq)
void release (BufferMap::iterator buffer_iter)
size_t n_chunks () const
 SingleSendBuffer (size_t capacity, size_t max_samples_per_packet)
 ~SingleSendBuffer ()
bool resend (const SequenceRange &range, DisjointSequence *gaps=0)
bool resend_i (const SequenceRange &range, DisjointSequence *gaps=0)
bool resend_i (const SequenceRange &range, DisjointSequence *gaps, const RepoId &destination)
void resend_fragments_i (const SequenceNumber &sequence, const DisjointSequence &fragments)
SequenceNumber low () const
SequenceNumber high () const
bool empty () const
bool contains (const SequenceNumber &seq) const
void retain_all (RepoId pub_id)
void insert (SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
void insert_fragment (SequenceNumber sequence, SequenceNumber fragment, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)

Static Public Attributes

static const size_t UNLIMITED = 0

Private Member Functions

void check_capacity ()
RemoveResult retain_buffer (const RepoId &pub_id, BufferType &buffer)
void insert_buffer (BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
typedef OPENDDS_MAP (SequenceNumber, BufferMap) FragmentMap
typedef OPENDDS_MAP (SequenceNumber, RepoId) DestinationMap

Private Attributes

size_t n_chunks_
MessageBlockAllocator retained_mb_allocator_
DataBlockAllocator retained_db_allocator_
MessageBlockAllocator replaced_mb_allocator_
DataBlockAllocator replaced_db_allocator_
BufferMap buffers_
FragmentMap fragments_
DestinationMap destinations_

Detailed Description

Implementation of TransportSendBuffer that manages data for a single domain of SequenceNumbers -- for a given SingleSendBuffer object, the sequence numbers passed to insert() must be generated from the same place.

Definition at line 72 of file TransportSendBuffer.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::SingleSendBuffer::SingleSendBuffer ( size_t  capacity,
size_t  max_samples_per_packet 
)

Definition at line 48 of file TransportSendBuffer.cpp.

00050   : TransportSendBuffer(capacity),
00051     n_chunks_(capacity * max_samples_per_packet),
00052     retained_mb_allocator_(this->n_chunks_ * 2),
00053     retained_db_allocator_(this->n_chunks_ * 2),
00054     replaced_mb_allocator_(this->n_chunks_ * 2),
00055     replaced_db_allocator_(this->n_chunks_ * 2)
00056 {
00057 }

OpenDDS::DCPS::SingleSendBuffer::~SingleSendBuffer (  ) 

Definition at line 59 of file TransportSendBuffer.cpp.

References release_all().

00060 {
00061   release_all();
00062 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::SingleSendBuffer::check_capacity (  )  [private]

Definition at line 277 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, OpenDDS::DCPS::TransportSendBuffer::capacity_, destinations_, LM_DEBUG, release(), OpenDDS::DCPS::Transport_debug_level, and UNLIMITED.

Referenced by insert(), and insert_fragment().

00278 {
00279   if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00280     return;
00281   }
00282   // Age off oldest sample if we are at capacity:
00283   if (this->buffers_.size() == this->capacity_) {
00284     BufferMap::iterator it(this->buffers_.begin());
00285     if (it == this->buffers_.end()) return;
00286 
00287     if (Transport_debug_level > 5) {
00288       ACE_DEBUG((LM_DEBUG,
00289         ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00290         ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00291         it->first.getValue(),
00292         it->second.first, it->second.second
00293       ));
00294     }
00295 
00296     destinations_.erase(it->first);
00297     release(it);
00298   }
00299 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::contains ( const SequenceNumber seq  )  const

Definition at line 55 of file TransportSendBuffer.inl.

References buffers_.

00056 {
00057   return this->buffers_.count(seq);
00058 }

ACE_INLINE bool OpenDDS::DCPS::SingleSendBuffer::empty ( void   )  const

Definition at line 49 of file TransportSendBuffer.inl.

References buffers_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

00050 {
00051   return this->buffers_.empty();
00052 }

Here is the caller graph for this function:

ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::high (  )  const

Definition at line 42 of file TransportSendBuffer.inl.

References buffers_.

Referenced by resend_i().

00043 {
00044   if (this->buffers_.empty()) throw std::exception();
00045   return this->buffers_.rbegin()->first;
00046 }

Here is the caller graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::insert ( SequenceNumber  sequence,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
) [virtual]

Implements OpenDDS::DCPS::TransportSendBuffer.

Definition at line 199 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, check_capacity(), destinations_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, insert_buffer(), LM_DEBUG, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and OpenDDS::DCPS::Transport_debug_level.

00202 {
00203   check_capacity();
00204 
00205   BufferType& buffer = this->buffers_[sequence];
00206   insert_buffer(buffer, queue, chain);
00207 
00208   if (Transport_debug_level > 5) {
00209     ACE_DEBUG((LM_DEBUG,
00210       ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00211       ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00212       sequence.getValue(),
00213       buffer.first, buffer.second
00214     ));
00215   }
00216 
00217   if (queue && queue->size() == 1) {
00218     const TransportQueueElement* elt = queue->peek();
00219     const RepoId subId = elt->subscription_id();
00220     const ACE_Message_Block* msg = elt->msg();
00221     if (msg && subId != GUID_UNKNOWN &&
00222         !DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, msg)) {
00223       destinations_[sequence] = subId;
00224     }
00225   }
00226 }

Here is the call graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::insert_buffer ( BufferType buffer,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
) [private]

Definition at line 229 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::TransportQueueElement::clone_mb(), retained_db_allocator_, and retained_mb_allocator_.

Referenced by insert(), and insert_fragment().

00232 {
00233   // Copy sample's TransportQueueElements:
00234   TransportSendStrategy::QueueType*& elems = buffer.first;
00235   ACE_NEW(elems, TransportSendStrategy::QueueType());
00236 
00237   CopyChainVisitor visitor(*elems,
00238                            &this->retained_mb_allocator_,
00239                            &this->retained_db_allocator_);
00240   queue->accept_visitor(visitor);
00241 
00242   // Copy sample's message/data block descriptors:
00243   ACE_Message_Block*& data = buffer.second;
00244   data = TransportQueueElement::clone_mb(chain,
00245                                          &this->retained_mb_allocator_,
00246                                          &this->retained_db_allocator_);
00247 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::insert_fragment ( SequenceNumber  sequence,
SequenceNumber  fragment,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
)

Definition at line 250 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, check_capacity(), fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.

00254 {
00255   check_capacity();
00256 
00257   // Insert into buffers_ so that the overall capacity is maintained
00258   // The entry in buffers_ with two null pointers indicates that the
00259   // actual data is stored in fragments_[sequence].
00260   buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00261                                       static_cast<ACE_Message_Block*>(0));
00262 
00263   BufferType& buffer = fragments_[sequence][fragment];
00264   insert_buffer(buffer, queue, chain);
00265 
00266   if (Transport_debug_level > 5) {
00267     ACE_DEBUG((LM_DEBUG,
00268       ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00269       ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00270       sequence.getValue(), fragment.getValue(),
00271       buffer.first, buffer.second
00272     ));
00273   }
00274 }

Here is the call graph for this function:

ACE_INLINE SequenceNumber OpenDDS::DCPS::SingleSendBuffer::low (  )  const

Definition at line 35 of file TransportSendBuffer.inl.

References buffers_.

Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), and resend_i().

00036 {
00037   if (this->buffers_.empty()) throw std::exception();
00038   return this->buffers_.begin()->first;
00039 }

Here is the caller graph for this function:

ACE_INLINE size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks (  )  const

Definition at line 29 of file TransportSendBuffer.inl.

References n_chunks_.

00030 {
00031   return this->n_chunks_;
00032 }

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
RepoId   
) [private]
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferMap   
) [private]
typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferType   
)
void OpenDDS::DCPS::SingleSendBuffer::release ( BufferMap::iterator  buffer_iter  ) 

Definition at line 95 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, fragments_, LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.

Referenced by check_capacity(), release_acked(), release_all(), and retain_all().

00096 {
00097   BufferType& buffer(buffer_iter->second);
00098   if (Transport_debug_level > 5) {
00099     ACE_DEBUG((LM_DEBUG,
00100       ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00101       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00102       buffer.first, buffer.second
00103     ));
00104   }
00105 
00106   if (buffer.first && buffer.second) {
00107     // not a fragment
00108     RemoveAllVisitor visitor;
00109     buffer.first->accept_remove_visitor(visitor);
00110     delete buffer.first;
00111 
00112     buffer.second->release();
00113     buffer.second = 0;
00114 
00115   } else {
00116     // data actually stored in fragments_
00117     const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00118     if (fm_it != fragments_.end()) {
00119       for (BufferMap::iterator bm_it = fm_it->second.begin();
00120            bm_it != fm_it->second.end(); ++bm_it) {
00121         RemoveAllVisitor visitor;
00122         bm_it->second.first->accept_remove_visitor(visitor);
00123         delete bm_it->second.first;
00124 
00125         bm_it->second.second->release();
00126         bm_it->second.second = 0;
00127       }
00128       fragments_.erase(fm_it);
00129     }
00130   }
00131 
00132   buffers_.erase(buffer_iter);
00133 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::release_acked ( SequenceNumber  seq  ) 

Definition at line 74 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, LM_DEBUG, release(), and OpenDDS::DCPS::Transport_debug_level.

00074                                                   {
00075   BufferMap::iterator buffer_iter = buffers_.begin();
00076   BufferType& buffer(buffer_iter->second);
00077 
00078   if (Transport_debug_level > 5) {
00079     ACE_DEBUG((LM_DEBUG,
00080       ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00081       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00082       buffer.first, buffer.second
00083     ));
00084   }
00085   while (buffer_iter != buffers_.end()) {
00086     if (buffer_iter->first == seq) {
00087       release(buffer_iter);
00088       return;
00089     }
00090     ++buffer_iter;
00091   }
00092 }

Here is the call graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::release_all (  ) 

Definition at line 65 of file TransportSendBuffer.cpp.

References buffers_, and release().

Referenced by ~SingleSendBuffer().

00066 {
00067   for (BufferMap::iterator it(this->buffers_.begin());
00068        it != this->buffers_.end();) {
00069     release(it++);
00070   }
00071 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::SingleSendBuffer::resend ( const SequenceRange range,
DisjointSequence gaps = 0 
)

Definition at line 302 of file TransportSendBuffer.cpp.

References resend_i(), and OpenDDS::DCPS::TransportSendBuffer::strategy_lock().

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

00303 {
00304   ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00305   return resend_i(range, gaps);
00306 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::resend_fragments_i ( const SequenceNumber sequence,
const DisjointSequence fragments 
)

Definition at line 363 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::DisjointSequence::empty(), fragments_, OpenDDS::DCPS::OPENDDS_VECTOR(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::SequenceNumber::ZERO().

00365 {
00366   if (fragments_.empty() || requested_frags.empty()) {
00367     return;
00368   }
00369   const BufferMap& buffers = fragments_[seq];
00370   const OPENDDS_VECTOR(SequenceRange) psr =
00371     requested_frags.present_sequence_ranges();
00372   SequenceNumber sent = SequenceNumber::ZERO();
00373   for (size_t i = 0; i < psr.size(); ++i) {
00374     BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00375     if (it == buffers.end()) {
00376       return;
00377     }
00378     BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00379     while (true) {
00380       if (sent < it->first) {
00381         resend_one(it->second);
00382         sent = it->first;
00383       }
00384       if (it == it2) {
00385         break;
00386       }
00387       ++it;
00388     }
00389   }
00390 }

Here is the call graph for this function:

bool OpenDDS::DCPS::SingleSendBuffer::resend_i ( const SequenceRange range,
DisjointSequence gaps,
const RepoId destination 
)

Definition at line 315 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, destinations_, fragments_, OpenDDS::DCPS::GUID_UNKNOWN, high(), OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, low(), OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::Transport_debug_level.

00317 {
00318   //Special case, nak to make sure it has all history
00319   const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00320   const bool has_dest = destination != GUID_UNKNOWN;
00321 
00322   for (SequenceNumber sequence(range.first);
00323        sequence <= range.second; ++sequence) {
00324     // Re-send requested sample if still buffered; missing samples
00325     // will be scored against the given DisjointSequence:
00326     BufferMap::iterator it(buffers_.find(sequence));
00327     DestinationMap::iterator dest_data;
00328     if (has_dest) {
00329       dest_data = destinations_.find(sequence);
00330     }
00331     if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() ||
00332                                               dest_data->second != destination))) {
00333       if (gaps) {
00334         gaps->insert(sequence);
00335       }
00336     } else {
00337       if (Transport_debug_level > 5) {
00338         ACE_DEBUG((LM_DEBUG,
00339                    ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00340                    ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00341                    sequence.getValue(),
00342                    it->second.first,
00343                    it->second.second));
00344       }
00345       if (it->second.first && it->second.second) {
00346         resend_one(it->second);
00347       } else {
00348         const FragmentMap::iterator fm_it = fragments_.find(it->first);
00349         if (fm_it != fragments_.end()) {
00350           for (BufferMap::iterator bm_it = fm_it->second.begin();
00351                 bm_it != fm_it->second.end(); ++bm_it) {
00352             resend_one(bm_it->second);
00353           }
00354         }
00355       }
00356     }
00357   }
00358   // Have we resent all requested data?
00359   return lowForAllResent >= low() && range.second <= high();
00360 }

Here is the call graph for this function:

bool OpenDDS::DCPS::SingleSendBuffer::resend_i ( const SequenceRange range,
DisjointSequence gaps = 0 
)

Definition at line 309 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN.

Referenced by resend(), OpenDDS::DCPS::RtpsUdpDataLink::send_directed_nack_replies(), and OpenDDS::DCPS::RtpsUdpDataLink::send_nack_replies().

00310 {
00311   return resend_i(range, gaps, GUID_UNKNOWN);
00312 }

Here is the caller graph for this function:

void OpenDDS::DCPS::SingleSendBuffer::retain_all ( RepoId  pub_id  )  [virtual]

Implements OpenDDS::DCPS::TransportSendBuffer.

Definition at line 136 of file TransportSendBuffer.cpp.

References ACE_TEXT(), buffers_, fragments_, LM_DEBUG, LM_WARNING, OPENDDS_STRING, release(), OpenDDS::DCPS::REMOVE_ERROR, retain_buffer(), and OpenDDS::DCPS::Transport_debug_level.

00137 {
00138   if (Transport_debug_level > 5) {
00139     GuidConverter converter(pub_id);
00140     ACE_DEBUG((LM_DEBUG,
00141       ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00142       ACE_TEXT("copying out blocks for publication: %C\n"),
00143       OPENDDS_STRING(converter).c_str()
00144     ));
00145   }
00146   for (BufferMap::iterator it(this->buffers_.begin());
00147        it != this->buffers_.end();) {
00148     if (it->second.first && it->second.second) {
00149       if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00150         GuidConverter converter(pub_id);
00151         ACE_ERROR((LM_WARNING,
00152                    ACE_TEXT("(%P|%t) WARNING: ")
00153                    ACE_TEXT("SingleSendBuffer::retain_all: ")
00154                    ACE_TEXT("failed to retain data from publication: %C!\n"),
00155                    OPENDDS_STRING(converter).c_str()));
00156         release(it++);
00157       } else {
00158         ++it;
00159       }
00160 
00161     } else {
00162       const FragmentMap::iterator fm_it = fragments_.find(it->first);
00163       if (fm_it != fragments_.end()) {
00164         for (BufferMap::iterator bm_it = fm_it->second.begin();
00165              bm_it != fm_it->second.end();) {
00166           if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00167             GuidConverter converter(pub_id);
00168             ACE_ERROR((LM_WARNING,
00169                        ACE_TEXT("(%P|%t) WARNING: ")
00170                        ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00171                        ACE_TEXT("retain fragment data from publication: %C!\n"),
00172                        OPENDDS_STRING(converter).c_str()));
00173             release(bm_it++);
00174           } else {
00175             ++bm_it;
00176           }
00177         }
00178       }
00179       ++it;
00180     }
00181   }
00182 }

Here is the call graph for this function:

RemoveResult OpenDDS::DCPS::SingleSendBuffer::retain_buffer ( const RepoId pub_id,
BufferType buffer 
) [private]

Definition at line 185 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::PacketRemoveVisitor::status().

Referenced by retain_all().

00186 {
00187   TransportQueueElement::MatchOnPubId match(pub_id);
00188   PacketRemoveVisitor visitor(match,
00189                               buffer.second,
00190                               buffer.second,
00191                               this->replaced_mb_allocator_,
00192                               this->replaced_db_allocator_);
00193 
00194   buffer.first->accept_replace_visitor(visitor);
00195   return visitor.status();
00196 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 132 of file TransportSendBuffer.h.

Referenced by check_capacity(), insert(), and resend_i().

Definition at line 119 of file TransportSendBuffer.h.

Referenced by n_chunks().

Definition at line 124 of file TransportSendBuffer.h.

Definition at line 123 of file TransportSendBuffer.h.

Definition at line 122 of file TransportSendBuffer.h.

Referenced by insert_buffer().

Definition at line 121 of file TransportSendBuffer.h.

Referenced by insert_buffer().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1