TransportSendBuffer.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 
00010 #include "TransportSendBuffer.h"
00011 #include "CopyChainVisitor.h"
00012 #include "PacketRemoveVisitor.h"
00013 #include "RemoveAllVisitor.h"
00014 
00015 #include "dds/DCPS/DataSampleHeader.h"
00016 #include "dds/DCPS/DisjointSequence.h"
00017 
00018 #include "ace/Log_Msg.h"
00019 
00020 #include "dds/DCPS/GuidConverter.h"
00021 
00022 #ifndef __ACE_INLINE__
00023 # include "TransportSendBuffer.inl"
00024 #endif  /* __ACE_INLINE__ */
00025 
00026 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00027 
00028 namespace OpenDDS {
00029 namespace DCPS {
00030 
00031 
00032 TransportSendBuffer::~TransportSendBuffer()
00033 {
00034 }
00035 
00036 void
00037 TransportSendBuffer::resend_one(const BufferType& buffer)
00038 {
00039   int bp = 0;
00040   this->strategy_->do_send_packet(buffer.second, bp);
00041 }
00042 
00043 
00044 // class SingleSendBuffer
00045 
00046 const size_t SingleSendBuffer::UNLIMITED = 0;
00047 
00048 SingleSendBuffer::SingleSendBuffer(size_t capacity,
00049                                    size_t max_samples_per_packet)
00050   : TransportSendBuffer(capacity),
00051     n_chunks_(capacity * max_samples_per_packet),
00052     retained_mb_allocator_(this->n_chunks_ * 2),
00053     retained_db_allocator_(this->n_chunks_ * 2),
00054     replaced_mb_allocator_(this->n_chunks_ * 2),
00055     replaced_db_allocator_(this->n_chunks_ * 2)
00056 {
00057 }
00058 
00059 SingleSendBuffer::~SingleSendBuffer()
00060 {
00061   release_all();
00062 }
00063 
00064 void
00065 SingleSendBuffer::release_all()
00066 {
00067   for (BufferMap::iterator it(this->buffers_.begin());
00068        it != this->buffers_.end();) {
00069     release(it++);
00070   }
00071 }
00072 
00073 void
00074 SingleSendBuffer::release_acked(SequenceNumber seq) {
00075   BufferMap::iterator buffer_iter = buffers_.begin();
00076   BufferType& buffer(buffer_iter->second);
00077 
00078   if (Transport_debug_level > 5) {
00079     ACE_DEBUG((LM_DEBUG,
00080       ACE_TEXT("(%P|%t) SingleSendBuffer::release_acked() - ")
00081       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00082       buffer.first, buffer.second
00083     ));
00084   }
00085   while (buffer_iter != buffers_.end()) {
00086     if (buffer_iter->first == seq) {
00087       release(buffer_iter);
00088       return;
00089     }
00090     ++buffer_iter;
00091   }
00092 }
00093 
00094 void
00095 SingleSendBuffer::release(BufferMap::iterator buffer_iter)
00096 {
00097   BufferType& buffer(buffer_iter->second);
00098   if (Transport_debug_level > 5) {
00099     ACE_DEBUG((LM_DEBUG,
00100       ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
00101       ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
00102       buffer.first, buffer.second
00103     ));
00104   }
00105 
00106   if (buffer.first && buffer.second) {
00107     // not a fragment
00108     RemoveAllVisitor visitor;
00109     buffer.first->accept_remove_visitor(visitor);
00110     delete buffer.first;
00111 
00112     buffer.second->release();
00113     buffer.second = 0;
00114 
00115   } else {
00116     // data actually stored in fragments_
00117     const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
00118     if (fm_it != fragments_.end()) {
00119       for (BufferMap::iterator bm_it = fm_it->second.begin();
00120            bm_it != fm_it->second.end(); ++bm_it) {
00121         RemoveAllVisitor visitor;
00122         bm_it->second.first->accept_remove_visitor(visitor);
00123         delete bm_it->second.first;
00124 
00125         bm_it->second.second->release();
00126         bm_it->second.second = 0;
00127       }
00128       fragments_.erase(fm_it);
00129     }
00130   }
00131 
00132   buffers_.erase(buffer_iter);
00133 }
00134 
00135 void
00136 SingleSendBuffer::retain_all(RepoId pub_id)
00137 {
00138   if (Transport_debug_level > 5) {
00139     GuidConverter converter(pub_id);
00140     ACE_DEBUG((LM_DEBUG,
00141       ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
00142       ACE_TEXT("copying out blocks for publication: %C\n"),
00143       OPENDDS_STRING(converter).c_str()
00144     ));
00145   }
00146   for (BufferMap::iterator it(this->buffers_.begin());
00147        it != this->buffers_.end();) {
00148     if (it->second.first && it->second.second) {
00149       if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
00150         GuidConverter converter(pub_id);
00151         ACE_ERROR((LM_WARNING,
00152                    ACE_TEXT("(%P|%t) WARNING: ")
00153                    ACE_TEXT("SingleSendBuffer::retain_all: ")
00154                    ACE_TEXT("failed to retain data from publication: %C!\n"),
00155                    OPENDDS_STRING(converter).c_str()));
00156         release(it++);
00157       } else {
00158         ++it;
00159       }
00160 
00161     } else {
00162       const FragmentMap::iterator fm_it = fragments_.find(it->first);
00163       if (fm_it != fragments_.end()) {
00164         for (BufferMap::iterator bm_it = fm_it->second.begin();
00165              bm_it != fm_it->second.end();) {
00166           if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
00167             GuidConverter converter(pub_id);
00168             ACE_ERROR((LM_WARNING,
00169                        ACE_TEXT("(%P|%t) WARNING: ")
00170                        ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
00171                        ACE_TEXT("retain fragment data from publication: %C!\n"),
00172                        OPENDDS_STRING(converter).c_str()));
00173             release(bm_it++);
00174           } else {
00175             ++bm_it;
00176           }
00177         }
00178       }
00179       ++it;
00180     }
00181   }
00182 }
00183 
00184 RemoveResult
00185 SingleSendBuffer::retain_buffer(const RepoId& pub_id, BufferType& buffer)
00186 {
00187   TransportQueueElement::MatchOnPubId match(pub_id);
00188   PacketRemoveVisitor visitor(match,
00189                               buffer.second,
00190                               buffer.second,
00191                               this->replaced_mb_allocator_,
00192                               this->replaced_db_allocator_);
00193 
00194   buffer.first->accept_replace_visitor(visitor);
00195   return visitor.status();
00196 }
00197 
00198 void
00199 SingleSendBuffer::insert(SequenceNumber sequence,
00200                          TransportSendStrategy::QueueType* queue,
00201                          ACE_Message_Block* chain)
00202 {
00203   check_capacity();
00204 
00205   BufferType& buffer = this->buffers_[sequence];
00206   insert_buffer(buffer, queue, chain);
00207 
00208   if (Transport_debug_level > 5) {
00209     ACE_DEBUG((LM_DEBUG,
00210       ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
00211       ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
00212       sequence.getValue(),
00213       buffer.first, buffer.second
00214     ));
00215   }
00216 
00217   if (queue && queue->size() == 1) {
00218     const TransportQueueElement* elt = queue->peek();
00219     const RepoId subId = elt->subscription_id();
00220     const ACE_Message_Block* msg = elt->msg();
00221     if (msg && subId != GUID_UNKNOWN &&
00222         !DataSampleHeader::test_flag(HISTORIC_SAMPLE_FLAG, msg)) {
00223       destinations_[sequence] = subId;
00224     }
00225   }
00226 }
00227 
00228 void
00229 SingleSendBuffer::insert_buffer(BufferType& buffer,
00230                                 TransportSendStrategy::QueueType* queue,
00231                                 ACE_Message_Block* chain)
00232 {
00233   // Copy sample's TransportQueueElements:
00234   TransportSendStrategy::QueueType*& elems = buffer.first;
00235   ACE_NEW(elems, TransportSendStrategy::QueueType());
00236 
00237   CopyChainVisitor visitor(*elems,
00238                            &this->retained_mb_allocator_,
00239                            &this->retained_db_allocator_);
00240   queue->accept_visitor(visitor);
00241 
00242   // Copy sample's message/data block descriptors:
00243   ACE_Message_Block*& data = buffer.second;
00244   data = TransportQueueElement::clone_mb(chain,
00245                                          &this->retained_mb_allocator_,
00246                                          &this->retained_db_allocator_);
00247 }
00248 
00249 void
00250 SingleSendBuffer::insert_fragment(SequenceNumber sequence,
00251                                   SequenceNumber fragment,
00252                                   TransportSendStrategy::QueueType* queue,
00253                                   ACE_Message_Block* chain)
00254 {
00255   check_capacity();
00256 
00257   // Insert into buffers_ so that the overall capacity is maintained
00258   // The entry in buffers_ with two null pointers indicates that the
00259   // actual data is stored in fragments_[sequence].
00260   buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
00261                                       static_cast<ACE_Message_Block*>(0));
00262 
00263   BufferType& buffer = fragments_[sequence][fragment];
00264   insert_buffer(buffer, queue, chain);
00265 
00266   if (Transport_debug_level > 5) {
00267     ACE_DEBUG((LM_DEBUG,
00268       ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
00269       ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
00270       sequence.getValue(), fragment.getValue(),
00271       buffer.first, buffer.second
00272     ));
00273   }
00274 }
00275 
00276 void
00277 SingleSendBuffer::check_capacity()
00278 {
00279   if (this->capacity_ == SingleSendBuffer::UNLIMITED) {
00280     return;
00281   }
00282   // Age off oldest sample if we are at capacity:
00283   if (this->buffers_.size() == this->capacity_) {
00284     BufferMap::iterator it(this->buffers_.begin());
00285     if (it == this->buffers_.end()) return;
00286 
00287     if (Transport_debug_level > 5) {
00288       ACE_DEBUG((LM_DEBUG,
00289         ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
00290         ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
00291         it->first.getValue(),
00292         it->second.first, it->second.second
00293       ));
00294     }
00295 
00296     destinations_.erase(it->first);
00297     release(it);
00298   }
00299 }
00300 
00301 bool
00302 SingleSendBuffer::resend(const SequenceRange& range, DisjointSequence* gaps)
00303 {
00304   ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
00305   return resend_i(range, gaps);
00306 }
00307 
00308 bool
00309 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps)
00310 {
00311   return resend_i(range, gaps, GUID_UNKNOWN);
00312 }
00313 
00314 bool
00315 SingleSendBuffer::resend_i(const SequenceRange& range, DisjointSequence* gaps,
00316                            const RepoId& destination)
00317 {
00318   //Special case, nak to make sure it has all history
00319   const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? low() : range.first;
00320   const bool has_dest = destination != GUID_UNKNOWN;
00321 
00322   for (SequenceNumber sequence(range.first);
00323        sequence <= range.second; ++sequence) {
00324     // Re-send requested sample if still buffered; missing samples
00325     // will be scored against the given DisjointSequence:
00326     BufferMap::iterator it(buffers_.find(sequence));
00327     DestinationMap::iterator dest_data;
00328     if (has_dest) {
00329       dest_data = destinations_.find(sequence);
00330     }
00331     if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() ||
00332                                               dest_data->second != destination))) {
00333       if (gaps) {
00334         gaps->insert(sequence);
00335       }
00336     } else {
00337       if (Transport_debug_level > 5) {
00338         ACE_DEBUG((LM_DEBUG,
00339                    ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
00340                    ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
00341                    sequence.getValue(),
00342                    it->second.first,
00343                    it->second.second));
00344       }
00345       if (it->second.first && it->second.second) {
00346         resend_one(it->second);
00347       } else {
00348         const FragmentMap::iterator fm_it = fragments_.find(it->first);
00349         if (fm_it != fragments_.end()) {
00350           for (BufferMap::iterator bm_it = fm_it->second.begin();
00351                 bm_it != fm_it->second.end(); ++bm_it) {
00352             resend_one(bm_it->second);
00353           }
00354         }
00355       }
00356     }
00357   }
00358   // Have we resent all requested data?
00359   return lowForAllResent >= low() && range.second <= high();
00360 }
00361 
00362 void
00363 SingleSendBuffer::resend_fragments_i(const SequenceNumber& seq,
00364                                      const DisjointSequence& requested_frags)
00365 {
00366   if (fragments_.empty() || requested_frags.empty()) {
00367     return;
00368   }
00369   const BufferMap& buffers = fragments_[seq];
00370   const OPENDDS_VECTOR(SequenceRange) psr =
00371     requested_frags.present_sequence_ranges();
00372   SequenceNumber sent = SequenceNumber::ZERO();
00373   for (size_t i = 0; i < psr.size(); ++i) {
00374     BufferMap::const_iterator it = buffers.lower_bound(psr[i].first);
00375     if (it == buffers.end()) {
00376       return;
00377     }
00378     BufferMap::const_iterator it2 = buffers.lower_bound(psr[i].second);
00379     while (true) {
00380       if (sent < it->first) {
00381         resend_one(it->second);
00382         sent = it->first;
00383       }
00384       if (it == it2) {
00385         break;
00386       }
00387       ++it;
00388     }
00389   }
00390 }
00391 
00392 } // namespace DCPS
00393 } // namespace OpenDDS
00394 
00395 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1