OpenDDS::DCPS::TransportReassembly Class Reference

#include <TransportReassembly.h>

List of all members.

Public Member Functions

bool reassemble (const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data)
bool reassemble (const SequenceRange &seqRange, ReceivedDataSample &data)
void data_unavailable (const SequenceRange &transportSeqDropped)
void data_unavailable (const SequenceNumber &dataSampleSeq, const RepoId &pub_id)
bool has_frags (const SequenceNumber &seq, const RepoId &pub_id) const
CORBA::ULong get_gaps (const SequenceNumber &msg_seq, const RepoId &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const

Private Member Functions

bool reassemble_i (const SequenceRange &seqRange, bool firstFrag, ReceivedDataSample &data)
typedef OPENDDS_MAP (FragKey, OPENDDS_LIST(FragRange)) FragMap
 OPENDDS_SET (FragKey) have_first_

Static Private Member Functions

static bool insert (OPENDDS_LIST(FragRange)&flist, const SequenceRange &seqRange, ReceivedDataSample &data)

Private Attributes

FragMap fragments_

Classes

struct  FragKey
struct  FragRange


Detailed Description

Definition at line 19 of file TransportReassembly.h.


Member Function Documentation

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const SequenceNumber dataSampleSeq,
const RepoId pub_id 
)

Definition at line 337 of file TransportReassembly.cpp.

References fragments_.

00339 {
00340   const FragKey key(pub_id, dataSampleSeq);
00341   fragments_.erase(key);
00342   have_first_.erase(key);
00343 }

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const SequenceRange transportSeqDropped  ) 

Called by TransportReceiveStrategy to indicate that we can stop tracking partially-reassembled messages when we know the remaining fragments are not expected to arrive.

Definition at line 285 of file TransportReassembly.cpp.

References dummy, fragments_, insert(), OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.

Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments().

00286 {
00287   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::data_unavailable() "
00288     "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue()));
00289   typedef OPENDDS_LIST(FragRange)::iterator list_iterator;
00290 
00291   for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end();
00292        ++iter) {
00293     const FragKey& key = iter->first;
00294     OPENDDS_LIST(FragRange)& flist = iter->second;
00295 
00296     ReceivedDataSample dummy(0);
00297     dummy.header_.sequence_ = key.data_sample_seq_;
00298 
00299     // check if we should expand the front element (only if !have_first)
00300     const SequenceNumber::Value prev =
00301       flist.front().transport_seq_.first.getValue() - 1;
00302     if (dropped.second.getValue() == prev && !have_first_.count(key)) {
00303       have_first_.insert(key);
00304       dummy.header_.more_fragments_ = true;
00305       insert(flist, dropped, dummy);
00306       continue;
00307     }
00308 
00309     // find a gap between list elements where "dropped" fits
00310     for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00311       list_iterator it_next = it;
00312       ++it_next;
00313       if (it_next == flist.end()) {
00314         break;
00315       }
00316       FragRange& fr1 = *it;
00317       FragRange& fr2 = *it_next;
00318       if (dropped.first > fr1.transport_seq_.second
00319           && dropped.second < fr2.transport_seq_.first) {
00320         dummy.header_.more_fragments_ = true;
00321         insert(flist, dropped, dummy);
00322         break;
00323       }
00324     }
00325 
00326     // check if we should expand the last element
00327     const SequenceNumber next =
00328       ++SequenceNumber(flist.back().transport_seq_.second);
00329     if (dropped.first == next) {
00330       flist.back().rec_ds_.header_.more_fragments_ = true;
00331       insert(flist, dropped, dummy);
00332     }
00333   }
00334 }

CORBA::ULong OpenDDS::DCPS::TransportReassembly::get_gaps ( const SequenceNumber msg_seq,
const RepoId pub_id,
CORBA::Long  bitmap[],
CORBA::ULong  length,
CORBA::ULong &  numBits 
) const

Populates bitmap for missing fragment sequence numbers and set numBits for the given message sequence and publisher ID.

Returns:
the base fragment sequence number for bit zero in the bitmap

Definition at line 164 of file TransportReassembly.cpp.

References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), fragments_, OpenDDS::DCPS::SequenceNumber::getLow(), and OpenDDS::DCPS::OPENDDS_LIST().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments().

00167 {
00168   // length is number of (allocated) words in bitmap, max of 8
00169   // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
00170   const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
00171   if (iter == fragments_.end() || length == 0) {
00172     // Nothing missing
00173     return 0;
00174   }
00175 
00176   // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
00177   // low 32 bits of the 64-bit generalized sequence numbers in
00178   // FragRange::transport_seq_.
00179 
00180   const OPENDDS_LIST(FragRange)& flist = iter->second;
00181   const SequenceNumber& first = flist.front().transport_seq_.first;
00182   const CORBA::ULong base = (first == 1)
00183     ? flist.front().transport_seq_.second.getLow() + 1
00184     : 1;
00185 
00186   if (first != 1) {
00187     // Represent the "gap" before the first list element.
00188     // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
00189     DisjointSequence::fill_bitmap_range(0, first.getLow() - 2,
00190                                         bitmap, length, numBits);
00191   } else if (flist.size() == 1) {
00192     // No gaps, but we know there is (at least 1) more_framents
00193     DisjointSequence::fill_bitmap_range(0, 0,
00194                                         bitmap, length, numBits);
00195     // NOTE: this could send a nack for fragments that are in flight
00196     // need to defer setting bitmap till heartbeat extending logic
00197     // in RtpsUdpDataLink::generate_nack_frags
00198     return base;
00199   }
00200 
00201   typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator;
00202   for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00203     const list_iterator it_next = ++list_iterator(it);
00204     if (it_next == flist.end()) {
00205       break;
00206     }
00207     const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base,
00208                        high = it_next->transport_seq_.first.getLow() - 1 - base;
00209     DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits);
00210   }
00211 
00212   return base;
00213 }

bool OpenDDS::DCPS::TransportReassembly::has_frags ( const SequenceNumber seq,
const RepoId pub_id 
) const

Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.

Definition at line 157 of file TransportReassembly.cpp.

References fragments_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap().

00159 {
00160   return fragments_.count(FragKey(pub_id, seq));
00161 }

bool OpenDDS::DCPS::TransportReassembly::insert ( OPENDDS_LIST(FragRange)&  flist,
const SequenceRange seqRange,
ReceivedDataSample data 
) [static, private]

Definition at line 44 of file TransportReassembly.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::join(), OpenDDS::DCPS::join_err(), OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::ReceivedDataSample::sample_, and VDBG.

Referenced by data_unavailable(), and reassemble_i().

00047 {
00048   const SequenceNumber::Value prev = seqRange.first.getValue() - 1,
00049     next = seqRange.second.getValue() + 1;
00050 
00051   for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) {
00052     FragRange& fr = *it;
00053     if (next < fr.transport_seq_.first.getValue()) {
00054       // insert before 'it'
00055       flist.insert(it, FragRange(seqRange, data));
00056       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00057         "inserted on left\n"));
00058       return true;
00059 
00060     } else if (next == fr.transport_seq_.first.getValue()) {
00061       // combine on left of fr
00062       DataSampleHeader joined;
00063       if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
00064         join_err("left");
00065         return false;
00066       }
00067       fr.rec_ds_.header_ = joined;
00068       if (fr.rec_ds_.sample_ && !data.sample_) {
00069         ACE_Message_Block::release(fr.rec_ds_.sample_);
00070         fr.rec_ds_.sample_ = 0;
00071       } else if (!fr.rec_ds_.sample_) {
00072         ACE_Message_Block::release(data.sample_);
00073       } else {
00074         ACE_Message_Block* last;
00075         for (last = data.sample_; last->cont(); last = last->cont()) ;
00076         last->cont(fr.rec_ds_.sample_);
00077         fr.rec_ds_.sample_ = data.sample_;
00078       }
00079       data.sample_ = 0;
00080       fr.transport_seq_.first = seqRange.first;
00081       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00082         "combined on left\n"));
00083       return true;
00084 
00085     } else if (prev == fr.transport_seq_.second.getValue()) {
00086       // combine on right of fr
00087       if (!fr.rec_ds_.sample_) {
00088         fr.rec_ds_.header_.more_fragments_ = true;
00089       }
00090       DataSampleHeader joined;
00091       if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
00092         join_err("right");
00093         return false;
00094       }
00095       fr.rec_ds_.header_ = joined;
00096       if (fr.rec_ds_.sample_ && !data.sample_) {
00097         ACE_Message_Block::release(fr.rec_ds_.sample_);
00098         fr.rec_ds_.sample_ = 0;
00099       } else if (!fr.rec_ds_.sample_) {
00100         ACE_Message_Block::release(data.sample_);
00101       } else {
00102         ACE_Message_Block* last;
00103         for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00104         last->cont(data.sample_);
00105       }
00106       data.sample_ = 0;
00107       fr.transport_seq_.second = seqRange.second;
00108       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00109         "combined on right\n"));
00110 
00111       // check if the next FragRange in the list needs to be combined
00112       if (++it != flist.end()) {
00113         if (next == it->transport_seq_.first.getValue()) {
00114           if (!fr.rec_ds_.sample_) {
00115             fr.rec_ds_.header_.more_fragments_ = true;
00116           }
00117           if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_,
00118                                       joined)) {
00119             join_err("combined next");
00120             return false;
00121           }
00122           fr.rec_ds_.header_ = joined;
00123           if (!it->rec_ds_.sample_) {
00124             ACE_Message_Block::release(fr.rec_ds_.sample_);
00125             fr.rec_ds_.sample_ = 0;
00126           } else {
00127             if (!fr.rec_ds_.sample_) {
00128               ACE_ERROR((LM_ERROR,
00129                          ACE_TEXT("(%P|%t) ERROR: ")
00130                          ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ")
00131                          ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n")));
00132               return false;
00133             }
00134             ACE_Message_Block* last;
00135             for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ;
00136             last->cont(it->rec_ds_.sample_);
00137             it->rec_ds_.sample_ = 0;
00138           }
00139           fr.transport_seq_.second = it->transport_seq_.second;
00140           flist.erase(it);
00141           VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00142             "coalesced on right\n"));
00143         }
00144       }
00145       return true;
00146     }
00147   }
00148 
00149   // add to end of list
00150   flist.push_back(FragRange(seqRange, data));
00151   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00152     "inserted at end of list\n"));
00153   return true;
00154 }

typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP ( FragKey  ,
OPENDDS_LIST(FragRange  
) [private]

OpenDDS::DCPS::TransportReassembly::OPENDDS_SET ( FragKey   )  [private]

bool OpenDDS::DCPS::TransportReassembly::reassemble ( const SequenceRange seqRange,
ReceivedDataSample data 
)

Definition at line 216 of file TransportReassembly.cpp.

References reassemble_i().

00218 {
00219   return reassemble_i(seqRange, seqRange.first == 1, data);
00220 }

bool OpenDDS::DCPS::TransportReassembly::reassemble ( const SequenceNumber transportSeq,
bool  firstFrag,
ReceivedDataSample data 
)

Called by TransportReceiveStrategy if the fragmentation header flag is set. Returns true/false to indicate if data should be delivered to the datalink. The 'data' argument may be modified by this method.

Definition at line 223 of file TransportReassembly.cpp.

References reassemble_i().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastSession::reassemble().

00226 {
00227   return reassemble_i(SequenceRange(transportSeq, transportSeq),
00228                       firstFrag, data);
00229 }

bool OpenDDS::DCPS::TransportReassembly::reassemble_i ( const SequenceRange seqRange,
bool  firstFrag,
ReceivedDataSample data 
) [private]

Definition at line 232 of file TransportReassembly.cpp.

References fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, insert(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::swap(), OpenDDS::DCPS::Transport_debug_level, and VDBG.

Referenced by reassemble().

00235 {
00236   if (Transport_debug_level > 5) {
00237     GuidConverter conv(data.header_.publication_id_);
00238     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00239       "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(),
00240       seqRange.second.getValue(), firstFrag ? 1 : 0,
00241       data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str()));
00242   }
00243 
00244   const FragKey key(data.header_.publication_id_, data.header_.sequence_);
00245 
00246   if (firstFrag) {
00247     have_first_.insert(key);
00248   }
00249 
00250   FragMap::iterator iter = fragments_.find(key);
00251   if (iter == fragments_.end()) {
00252     fragments_[key].push_back(FragRange(seqRange, data));
00253     // since this is the first fragment we've seen, it can't possibly be done
00254     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00255       "stored first frag, returning false (incomplete)\n"));
00256     return false;
00257   }
00258 
00259   if (!insert(iter->second, seqRange, data)) {
00260     // error condition, already logged by insert()
00261     return false;
00262   }
00263 
00264   // We can deliver data if all three of these conditions are met:
00265   // 1. we've seen the "first fragment" flag  [first frag is here]
00266   // 2. all fragments have been coalesced     [no gaps in the seq numbers]
00267   // 3. the "more fragments" flag is not set  [last frag is here]
00268   if (have_first_.count(key)
00269       && iter->second.size() == 1
00270       && !iter->second.front().rec_ds_.header_.more_fragments_) {
00271     swap(data, iter->second.front().rec_ds_);
00272     fragments_.erase(iter);
00273     have_first_.erase(key);
00274     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00275       "removed frag, returning %C\n", data.sample_ ? "true" : "false"));
00276     return data.sample_; // could be false if we had data_unavailable()
00277   }
00278 
00279   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00280     "returning false (incomplete)\n"));
00281   return false;
00282 }


Member Data Documentation

FragMap OpenDDS::DCPS::TransportReassembly::fragments_ [private]

Definition at line 90 of file TransportReassembly.h.

Referenced by data_unavailable(), get_gaps(), has_frags(), and reassemble_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:31 2016 for OpenDDS by  doxygen 1.4.7