TransportSendBuffer.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "TransportSendBuffer.h"
00011 #include "CopyChainVisitor.h"
00012 #include "PacketRemoveVisitor.h"
00013 #include "RemoveAllVisitor.h"
00014 
00015 #include "dds/DCPS/DisjointSequence.h"
00016 
00017 #include "ace/Log_Msg.h"
00018 
00019 #include "dds/DCPS/GuidConverter.h"
00020 
00021 #ifndef __ACE_INLINE__
00022 # include "TransportSendBuffer.inl"
00023 #endif  /* __ACE_INLINE__ */
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 TransportSendBuffer::~TransportSendBuffer()
00029 {
00030 }
00031 
00032 void
00033 TransportSendBuffer::resend_one(const BufferType& buffer)
00034 {
00035   int bp = 0;
00036   this->strategy_->do_send_packet(buffer.second, bp);
00037 }
00038 
00039 
00040 // class SingleSendBuffer
00041 
00042 SingleSendBuffer::SingleSendBuffer(size_t capacity,
00043                                    size_t max_samples_per_packet)
00044   : TransportSendBuffer(capacity),
00045     n_chunks_(capacity * max_samples_per_packet),
00046     retained_allocator_(this->n_chunks_),
00047     retained_mb_allocator_(this->n_chunks_ * 2),
00048     retained_db_allocator_(this->n_chunks_ * 2),
00049     replaced_allocator_(this->n_chunks_),
00050     replaced_mb_allocator_(this->n_chunks_ * 2),
00051     replaced_db_allocator_(this->n_chunks_ * 2)
00052 {
00053 }
00054 
00055 SingleSendBuffer::~SingleSendBuffer()
00056 {
00057   release_all();
00058 }
00059 
00060 void
00061 SingleSendBuffer::release_all()
00062 {
00063   for (BufferMap::iterator it(this->buffers_.begin());
00064        it != this->buffers_.end();) {
00065     release(it++);
00066   }
00067 }
00068 
00069 void
00070 SingleSendBuffer::release_acked(SequenceNumber seq) {
00071   BufferMap::iterator buffer_iter = buffers_.begin();
00072   BufferType& buffer(buffer_iter->second);
00073 
00074   if (Transport_debug_level > 5) {
00075     ACE_DEBUG((LM_DEBUG,
00076       ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00077       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00078       buffer.first, buffer.second
00079     ));
00080   }
00081   while (buffer_iter != buffers_.end()) {
00082     if (buffer_iter->first == seq) {
00083       release(buffer_iter);
00084       return;
00085     }
00086     ++buffer_iter;
00087   }
00088 }
00089 
00090 void
00091 SingleSendBuffer::release(BufferMap::iterator buffer_iter)
00092 {
00093   BufferType& buffer(buffer_iter->second);
00094   if (Transport_debug_level > 5) {
00095     ACE_DEBUG((LM_DEBUG,
00096       ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00097       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00098       buffer.first, buffer.second
00099     ));
00100   }
00101 
00102   if (buffer.first && buffer.second) {
00103     // not a fragment
00104     RemoveAllVisitor visitor;
00105     buffer.first->accept_remove_visitor(visitor);
00106     delete buffer.first;
00107 
00108     buffer.second->release();
00109     buffer.second = 0;
00110 
00111   } else {
00112     // data actually stored in fragments_
00113     const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00114     if (fm_it != fragments_.end()) {
00115       for (BufferMap::iterator bm_it = fm_it->second.begin();
00116            bm_it != fm_it->second.end(); ++bm_it) {
00117         RemoveAllVisitor visitor;
00118         bm_it->second.first->accept_remove_visitor(visitor);
00119         delete bm_it->second.first;
00120 
00121         bm_it->second.second->release();
00122         bm_it->second.second = 0;
00123       }
00124       fragments_.erase(fm_it);
00125     }
00126   }
00127 
00128   buffers_.erase(buffer_iter);
00129 }
00130 
00131 void
00132 SingleSendBuffer::retain_all(RepoId pub_id)
00133 {
00134   if (Transport_debug_level > 5) {
00135     GuidConverter converter(pub_id);
00136     ACE_DEBUG((LM_DEBUG,
00137       ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00138       ACE_TEXT("copying out blocks for publication: %C\n"),
00139       OPENDDS_STRING(converter).c_str()
00140     ));
00141   }
00142   for (BufferMap::iterator it(this->buffers_.begin());
00143        it != this->buffers_.end();) {
00144     if (it->second.first && it->second.second) {
00145       if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00146         GuidConverter converter(pub_id);
00147         ACE_ERROR((LM_WARNING,
00148                    ACE_TEXT("(%P|%t) WARNING: ")
00149                    ACE_TEXT("SingleSendBuffer::retain_all: ")
00150                    ACE_TEXT("failed to retain data from publication: %C!\n"),
00151                    OPENDDS_STRING(converter).c_str()));
00152         release(it++);
00153       } else {
00154         ++it;
00155       }
00156 
00157     } else {
00158       const FragmentMap::iterator fm_it = fragments_.find(it->first);
00159       if (fm_it != fragments_.end()) {
00160         for (BufferMap::iterator bm_it = fm_it->second.begin();
00161              bm_it != fm_it->second.end();) {
00162           if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00163             GuidConverter converter(pub_id);
00164             ACE_ERROR((LM_WARNING,
00165                        ACE_TEXT("(%P|%t) WARNING: ")
00166                        ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00167                        ACE_TEXT("retain fragment data from publication: %C!\n"),
00168                        OPENDDS_STRING(converter).c_str()));
00169             release(bm_it++);
00170           } else {
00171             ++bm_it;
00172           }
00173         }
00174       }
00175       ++it;
00176     }
00177   }
00178 }
00179 
00180 RemoveResult
00181 SingleSendBuffer::retain_buffer(const RepoId& pub_id, BufferType& buffer)
00182 {
00183   TransportQueueElement::MatchOnPubId match(pub_id);
00184   PacketRemoveVisitor visitor(match,
00185                               buffer.second,
00186                               buffer.second,
00187                               this->replaced_allocator_,
00188                               this->replaced_mb_allocator_,
00189                               this->replaced_db_allocator_);
00190 
00191   buffer.first->accept_replace_visitor(visitor);
00192   return visitor.status();
00193 }
00194 
00195 void
00196 SingleSendBuffer::insert(SequenceNumber sequence,
00197                          TransportSendStrategy::QueueType* queue,
00198                          ACE_Message_Block* chain)
00199 {
00200   check_capacity();
00201 
00202   BufferType& buffer = this->buffers_[sequence];
00203   insert_buffer(buffer, queue, chain);
00204 
00205   if (Transport_debug_level > 5) {
00206     ACE_DEBUG((LM_DEBUG,
00207       ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00208       ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00209       sequence.getValue(),
00210       buffer.first, buffer.second
00211     ));
00212   }
00213 }
00214 
00215 void
00216 SingleSendBuffer::insert_buffer(BufferType& buffer,
00217                                 TransportSendStrategy::QueueType* queue,
00218                                 ACE_Message_Block* chain)
00219 {
00220   // Copy sample's TransportQueueElements:
00221   TransportSendStrategy::QueueType*& elems = buffer.first;
00222   ACE_NEW(elems, TransportSendStrategy::QueueType(queue->size(), 1));
00223 
00224   CopyChainVisitor visitor(*elems,
00225                            &this->retained_allocator_,
00226                            &this->retained_mb_allocator_,
00227                            &this->retained_db_allocator_);
00228   queue->accept_visitor(visitor);
00229 
00230   // Copy sample's message/data block descriptors:
00231   ACE_Message_Block*& data = buffer.second;
00232   data = TransportQueueElement::clone_mb(chain,
00233                                          &this->retained_mb_allocator_,
00234                                          &this->retained_db_allocator_);
00235 }
00236 
00237 void
00238 SingleSendBuffer::insert_fragment(SequenceNumber sequence,
00239                                   SequenceNumber fragment,
00240                                   TransportSendStrategy::QueueType* queue,
00241                                   ACE_Message_Block* chain)
00242 {
00243   check_capacity();
00244 
00245   // Insert into buffers_ so that the overall capacity is maintained
00246   // The entry in buffers_ with two null pointers indicates that the
00247   // actual data is stored in fragments_[sequence].
00248   buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00249                                       static_cast<ACE_Message_Block*>(0));
00250 
00251   BufferType& buffer = fragments_[sequence][fragment];
00252   insert_buffer(buffer, queue, chain);
00253 
00254   if (Transport_debug_level > 5) {
00255     ACE_DEBUG((LM_DEBUG,
00256       ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00257       ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00258       sequence.getValue(), fragment.getValue(),
00259       buffer.first, buffer.second
00260     ));
00261   }
00262 }
00263 
00264 void
00265 SingleSendBuffer::check_capacity()
00266 {
00267   if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00268     return;
00269   }
00270   // Age off oldest sample if we are at capacity:
00271   if (this->buffers_.size() == this->capacity_) {
00272     BufferMap::iterator it(this->buffers_.begin());
00273     if (it == this->buffers_.end()) return;
00274 
00275     if (Transport_debug_level > 5) {
00276       ACE_DEBUG((LM_DEBUG,
00277         ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00278         ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00279         it->first.getValue(),
00280         it->second.first, it->second.second
00281       ));
00282     }
00283 
00284     release(it);
00285   }
00286 }
00287 
00288 bool
00289 SingleSendBuffer::resend(const SequenceRange& range, DisjointSequence* gaps)
00290 {
00291   ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00292   return resend_i(range, gaps);
00293 }
00294 
00295 bool
00296 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps)
00297 {
00298   //Special case, nak to make sure it has all history
00299   const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00300 
00301   for (SequenceNumber sequence(range.first);
00302        sequence <= range.second; ++sequence) {
00303     // Re-send requested sample if still buffered; missing samples
00304     // will be scored against the given DisjointSequence:
00305     BufferMap::iterator it(this->buffers_.find(sequence));
00306     if (it == this->buffers_.end()) {
00307       if (gaps) {
00308         gaps->insert(sequence);
00309       }
00310     } else {
00311       if (Transport_debug_level > 5) {
00312         ACE_DEBUG((LM_DEBUG,
00313                    ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00314                    ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00315                    sequence.getValue(),
00316                    it->second.first,
00317                    it->second.second));
00318       }
00319       if (it->second.first && it->second.second) {
00320         resend_one(it->second);
00321       } else {
00322         const FragmentMap::iterator fm_it = fragments_.find(it->first);
00323         if (fm_it != fragments_.end()) {
00324           for (BufferMap::iterator bm_it = fm_it->second.begin();
00325                 bm_it != fm_it->second.end(); ++bm_it) {
00326             resend_one(bm_it->second);
00327           }
00328         }
00329       }
00330     }
00331   }
00332   // Have we resent all requested data?
00333   return lowForAllResent >= low() && range.second <= high();
00334 }
00335 
00336 void
00337 SingleSendBuffer::resend_fragments_i(const SequenceNumber& seq,
00338                                      const DisjointSequence& requested_frags)
00339 {
00340   if (fragments_.empty() || requested_frags.empty()) {
00341     return;
00342   }
00343   const BufferMap& buffers = fragments_[seq];
00344   const OPENDDS_VECTOR(SequenceRange) psr =
00345     requested_frags.present_sequence_ranges();
00346   SequenceNumber sent = SequenceNumber::ZERO();
00347   for (size_t i = 0; i < psr.size(); ++i) {
00348     BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00349     if (it == buffers.end()) {
00350       return;
00351     }
00352     BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00353     while (true) {
00354       if (sent < it->first) {
00355         resend_one(it->second);
00356         sent = it->first;
00357       }
00358       if (it == it2) {
00359         break;
00360       }
00361       ++it;
00362     }
00363   }
00364 }
00365 
00366 } // namespace DCPS
00367 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7