TransportReassembly.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportReassembly.h"
00010 #include "TransportDebug.h"
00011 
00012 #include "dds/DCPS/GuidConverter.h"
00013 #include "dds/DCPS/DisjointSequence.h"
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 TransportReassembly::FragKey::FragKey(const PublicationId& pubId,
00019                                       const SequenceNumber& dataSampleSeq)
00020   : publication_(pubId)
00021   , data_sample_seq_(dataSampleSeq)
00022 {
00023 }
00024 
00025 GUID_tKeyLessThan TransportReassembly::FragKey::compare_;
00026 
00027 TransportReassembly::FragRange::FragRange(const SequenceRange& seqRange,
00028                                           const ReceivedDataSample& data)
00029   : transport_seq_(seqRange)
00030   , rec_ds_(data)
00031 {
00032 }
00033 
00034 namespace {
00035   inline void join_err(const char* detail)
00036   {
00037     ACE_ERROR((LM_ERROR,
00038       ACE_TEXT("(%P|%t) ERROR: TransportReassembly::insert() - ")
00039       ACE_TEXT("DataSampleHeaders could not be joined: %C\n"), detail));
00040   }
00041 }
00042 
00043 bool
00044 TransportReassembly::insert(OPENDDS_LIST(FragRange)& flist,
00045                             const SequenceRange& seqRange,
00046                             ReceivedDataSample& data)
00047 {
00048   const SequenceNumber::Value prev = seqRange.first.getValue() - 1,
00049     next = seqRange.second.getValue() + 1;
00050 
00051   for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) {
00052     FragRange& fr = *it;
00053     if (next < fr.transport_seq_.first.getValue()) {
00054       // insert before 'it'
00055       flist.insert(it, FragRange(seqRange, data));
00056       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00057         "inserted on left\n"));
00058       return true;
00059 
00060     } else if (next == fr.transport_seq_.first.getValue()) {
00061       // combine on left of fr
00062       DataSampleHeader joined;
00063       if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
00064         join_err("left");
00065         return false;
00066       }
00067       fr.rec_ds_.header_ = joined;
00068       if (fr.rec_ds_.sample_ && !data.sample_) {
00069         ACE_Message_Block::release(fr.rec_ds_.sample_);
00070         fr.rec_ds_.sample_ = 0;
00071       } else if (!fr.rec_ds_.sample_) {
00072         ACE_Message_Block::release(data.sample_);
00073       } else {
00074         ACE_Message_Block* last;
00075         for (last = data.sample_; last->cont(); last = last->cont()) ;
00076         last->cont(fr.rec_ds_.sample_);
00077         fr.rec_ds_.sample_ = data.sample_;
00078       }
00079       data.sample_ = 0;
00080       fr.transport_seq_.first = seqRange.first;
00081       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00082         "combined on left\n"));
00083       return true;
00084 
00085     } else if (prev == fr.transport_seq_.second.getValue()) {
00086       // combine on right of fr
00087       if (!fr.rec_ds_.sample_) {
00088         fr.rec_ds_.header_.more_fragments_ = true;
00089       }
00090       DataSampleHeader joined;
00091       if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
00092         join_err("right");
00093         return false;
00094       }
00095       fr.rec_ds_.header_ = joined;
00096       if (fr.rec_ds_.sample_ && !data.sample_) {
00097         ACE_Message_Block::release(fr.rec_ds_.sample_);
00098         fr.rec_ds_.sample_ = 0;
00099       } else if (!fr.rec_ds_.sample_) {
00100         ACE_Message_Block::release(data.sample_);
00101       } else {
00102         ACE_Message_Block* last;
00103         for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00104         last->cont(data.sample_);
00105       }
00106       data.sample_ = 0;
00107       fr.transport_seq_.second = seqRange.second;
00108       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00109         "combined on right\n"));
00110 
00111       // check if the next FragRange in the list needs to be combined
00112       if (++it != flist.end()) {
00113         if (next == it->transport_seq_.first.getValue()) {
00114           if (!fr.rec_ds_.sample_) {
00115             fr.rec_ds_.header_.more_fragments_ = true;
00116           }
00117           if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_,
00118                                       joined)) {
00119             join_err("combined next");
00120             return false;
00121           }
00122           fr.rec_ds_.header_ = joined;
00123           if (!it->rec_ds_.sample_) {
00124             ACE_Message_Block::release(fr.rec_ds_.sample_);
00125             fr.rec_ds_.sample_ = 0;
00126           } else {
00127             if (!fr.rec_ds_.sample_) {
00128               ACE_ERROR((LM_ERROR,
00129                          ACE_TEXT("(%P|%t) ERROR: ")
00130                          ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ")
00131                          ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n")));
00132               return false;
00133             }
00134             ACE_Message_Block* last;
00135             for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00136             last->cont(it->rec_ds_.sample_);
00137             it->rec_ds_.sample_ = 0;
00138           }
00139           fr.transport_seq_.second = it->transport_seq_.second;
00140           flist.erase(it);
00141           VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00142             "coalesced on right\n"));
00143         }
00144       }
00145       return true;
00146     }
00147   }
00148 
00149   // add to end of list
00150   flist.push_back(FragRange(seqRange, data));
00151   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00152     "inserted at end of list\n"));
00153   return true;
00154 }
00155 
00156 bool
00157 TransportReassembly::has_frags(const SequenceNumber& seq,
00158                                const RepoId& pub_id) const
00159 {
00160   return fragments_.count(FragKey(pub_id, seq));
00161 }
00162 
00163 CORBA::ULong
00164 TransportReassembly::get_gaps(const SequenceNumber& seq, const RepoId& pub_id,
00165                               CORBA::Long bitmap[], CORBA::ULong length,
00166                               CORBA::ULong& numBits) const
00167 {
00168   // length is number of (allocated) words in bitmap, max of 8
00169   // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
00170   const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
00171   if (iter == fragments_.end() || length == 0) {
00172     // Nothing missing
00173     return 0;
00174   }
00175 
00176   // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
00177   // low 32 bits of the 64-bit generalized sequence numbers in
00178   // FragRange::transport_seq_.
00179 
00180   const OPENDDS_LIST(FragRange)& flist = iter->second;
00181   const SequenceNumber& first = flist.front().transport_seq_.first;
00182   const CORBA::ULong base = (first == 1)
00183     ? flist.front().transport_seq_.second.getLow() + 1
00184     : 1;
00185 
00186   if (first != 1) {
00187     // Represent the "gap" before the first list element.
00188     // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
00189     DisjointSequence::fill_bitmap_range(0, first.getLow() - 2,
00190                                         bitmap, length, numBits);
00191   } else if (flist.size() == 1) {
00192     // No gaps, but we know there is (at least 1) more_framents
00193     DisjointSequence::fill_bitmap_range(0, 0,
00194                                         bitmap, length, numBits);
00195     // NOTE: this could send a nack for fragments that are in flight
00196     // need to defer setting bitmap till heartbeat extending logic
00197     // in RtpsUdpDataLink::generate_nack_frags
00198     return base;
00199   }
00200 
00201   typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator;
00202   for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00203     const list_iterator it_next = ++list_iterator(it);
00204     if (it_next == flist.end()) {
00205       break;
00206     }
00207     const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base,
00208                        high = it_next->transport_seq_.first.getLow() - 1 - base;
00209     DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits);
00210   }
00211 
00212   return base;
00213 }
00214 
00215 bool
00216 TransportReassembly::reassemble(const SequenceRange& seqRange,
00217                                 ReceivedDataSample& data)
00218 {
00219   return reassemble_i(seqRange, seqRange.first == 1, data);
00220 }
00221 
00222 bool
00223 TransportReassembly::reassemble(const SequenceNumber& transportSeq,
00224                                 bool firstFrag,
00225                                 ReceivedDataSample& data)
00226 {
00227   return reassemble_i(SequenceRange(transportSeq, transportSeq),
00228                       firstFrag, data);
00229 }
00230 
00231 bool
00232 TransportReassembly::reassemble_i(const SequenceRange& seqRange,
00233                                   bool firstFrag,
00234                                   ReceivedDataSample& data)
00235 {
00236   if (Transport_debug_level > 5) {
00237     GuidConverter conv(data.header_.publication_id_);
00238     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00239       "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(),
00240       seqRange.second.getValue(), firstFrag ? 1 : 0,
00241       data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str()));
00242   }
00243 
00244   const FragKey key(data.header_.publication_id_, data.header_.sequence_);
00245 
00246   if (firstFrag) {
00247     have_first_.insert(key);
00248   }
00249 
00250   FragMap::iterator iter = fragments_.find(key);
00251   if (iter == fragments_.end()) {
00252     fragments_[key].push_back(FragRange(seqRange, data));
00253     // since this is the first fragment we've seen, it can't possibly be done
00254     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00255       "stored first frag, returning false (incomplete)\n"));
00256     return false;
00257   }
00258 
00259   if (!insert(iter->second, seqRange, data)) {
00260     // error condition, already logged by insert()
00261     return false;
00262   }
00263 
00264   // We can deliver data if all three of these conditions are met:
00265   // 1. we've seen the "first fragment" flag  [first frag is here]
00266   // 2. all fragments have been coalesced     [no gaps in the seq numbers]
00267   // 3. the "more fragments" flag is not set  [last frag is here]
00268   if (have_first_.count(key)
00269       && iter->second.size() == 1
00270       && !iter->second.front().rec_ds_.header_.more_fragments_) {
00271     swap(data, iter->second.front().rec_ds_);
00272     fragments_.erase(iter);
00273     have_first_.erase(key);
00274     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00275       "removed frag, returning %C\n", data.sample_ ? "true" : "false"));
00276     return data.sample_; // could be false if we had data_unavailable()
00277   }
00278 
00279   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00280     "returning false (incomplete)\n"));
00281   return false;
00282 }
00283 
00284 void
00285 TransportReassembly::data_unavailable(const SequenceRange& dropped)
00286 {
00287   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::data_unavailable() "
00288     "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue()));
00289   typedef OPENDDS_LIST(FragRange)::iterator list_iterator;
00290 
00291   for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end();
00292        ++iter) {
00293     const FragKey& key = iter->first;
00294     OPENDDS_LIST(FragRange)& flist = iter->second;
00295 
00296     ReceivedDataSample dummy(0);
00297     dummy.header_.sequence_ = key.data_sample_seq_;
00298 
00299     // check if we should expand the front element (only if !have_first)
00300     const SequenceNumber::Value prev =
00301       flist.front().transport_seq_.first.getValue() - 1;
00302     if (dropped.second.getValue() == prev && !have_first_.count(key)) {
00303       have_first_.insert(key);
00304       dummy.header_.more_fragments_ = true;
00305       insert(flist, dropped, dummy);
00306       continue;
00307     }
00308 
00309     // find a gap between list elements where "dropped" fits
00310     for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00311       list_iterator it_next = it;
00312       ++it_next;
00313       if (it_next == flist.end()) {
00314         break;
00315       }
00316       FragRange& fr1 = *it;
00317       FragRange& fr2 = *it_next;
00318       if (dropped.first > fr1.transport_seq_.second
00319           && dropped.second < fr2.transport_seq_.first) {
00320         dummy.header_.more_fragments_ = true;
00321         insert(flist, dropped, dummy);
00322         break;
00323       }
00324     }
00325 
00326     // check if we should expand the last element
00327     const SequenceNumber next =
00328       ++SequenceNumber(flist.back().transport_seq_.second);
00329     if (dropped.first == next) {
00330       flist.back().rec_ds_.header_.more_fragments_ = true;
00331       insert(flist, dropped, dummy);
00332     }
00333   }
00334 }
00335 
00336 void
00337 TransportReassembly::data_unavailable(const SequenceNumber& dataSampleSeq,
00338                                       const RepoId& pub_id)
00339 {
00340   const FragKey key(pub_id, dataSampleSeq);
00341   fragments_.erase(key);
00342   have_first_.erase(key);
00343 }
00344 
00345 }
00346 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7