OpenDDS::DCPS::TransportReassembly Class Reference

#include <TransportReassembly.h>

List of all members.

Classes

struct  FragKey
struct  FragRange

Public Member Functions

bool reassemble (const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data)
bool reassemble (const SequenceRange &seqRange, ReceivedDataSample &data)
void data_unavailable (const SequenceRange &transportSeqDropped)
void data_unavailable (const SequenceNumber &dataSampleSeq, const RepoId &pub_id)
bool has_frags (const SequenceNumber &seq, const RepoId &pub_id) const
CORBA::ULong get_gaps (const SequenceNumber &msg_seq, const RepoId &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const

Private Member Functions

bool reassemble_i (const SequenceRange &seqRange, bool firstFrag, ReceivedDataSample &data)
typedef OPENDDS_MAP (FragKey, OPENDDS_LIST(FragRange)) FragMap
 OPENDDS_SET (FragKey) have_first_

Static Private Member Functions

static bool insert (OPENDDS_LIST(FragRange)&flist, const SequenceRange &seqRange, ReceivedDataSample &data)

Private Attributes

FragMap fragments_

Detailed Description

Definition at line 21 of file TransportReassembly.h.


Member Function Documentation

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const SequenceNumber dataSampleSeq,
const RepoId pub_id 
)

Definition at line 331 of file TransportReassembly.cpp.

References fragments_.

00333 {
00334   const FragKey key(pub_id, dataSampleSeq);
00335   fragments_.erase(key);
00336   have_first_.erase(key);
00337 }

void OpenDDS::DCPS::TransportReassembly::data_unavailable ( const SequenceRange transportSeqDropped  ) 

Called by TransportReceiveStrategy to indicate that we can stop tracking partially-reassembled messages when we know the remaining fragments are not expected to arrive.

Definition at line 279 of file TransportReassembly.cpp.

References OpenDDS::DCPS::TransportReassembly::FragKey::data_sample_seq_, fragments_, OpenDDS::DCPS::ReceivedDataSample::header_, insert(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.

Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::ReliableSession::nakack_received(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments().

00280 {
00281   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::data_unavailable() "
00282     "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue()));
00283   typedef OPENDDS_LIST(FragRange)::iterator list_iterator;
00284 
00285   for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end();
00286        ++iter) {
00287     const FragKey& key = iter->first;
00288     OPENDDS_LIST(FragRange)& flist = iter->second;
00289 
00290     ReceivedDataSample dummy(0);
00291     dummy.header_.sequence_ = key.data_sample_seq_;
00292 
00293     // check if we should expand the front element (only if !have_first)
00294     const SequenceNumber::Value prev =
00295       flist.front().transport_seq_.first.getValue() - 1;
00296     if (dropped.second.getValue() == prev && !have_first_.count(key)) {
00297       have_first_.insert(key);
00298       dummy.header_.more_fragments_ = true;
00299       insert(flist, dropped, dummy);
00300       continue;
00301     }
00302 
00303     // find a gap between list elements where "dropped" fits
00304     for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00305       list_iterator it_next = it;
00306       ++it_next;
00307       if (it_next == flist.end()) {
00308         break;
00309       }
00310       FragRange& fr1 = *it;
00311       FragRange& fr2 = *it_next;
00312       if (dropped.first > fr1.transport_seq_.second
00313           && dropped.second < fr2.transport_seq_.first) {
00314         dummy.header_.more_fragments_ = true;
00315         insert(flist, dropped, dummy);
00316         break;
00317       }
00318     }
00319 
00320     // check if we should expand the last element
00321     const SequenceNumber next =
00322       ++SequenceNumber(flist.back().transport_seq_.second);
00323     if (dropped.first == next) {
00324       flist.back().rec_ds_.header_.more_fragments_ = true;
00325       insert(flist, dropped, dummy);
00326     }
00327   }
00328 }

Here is the call graph for this function:

Here is the caller graph for this function:

CORBA::ULong OpenDDS::DCPS::TransportReassembly::get_gaps ( const SequenceNumber msg_seq,
const RepoId pub_id,
CORBA::Long  bitmap[],
CORBA::ULong  length,
CORBA::ULong numBits 
) const

Populates bitmap for missing fragment sequence numbers and set numBits for the given message sequence and publisher ID.

Returns:
the base fragment sequence number for bit zero in the bitmap

Definition at line 158 of file TransportReassembly.cpp.

References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), fragments_, OpenDDS::DCPS::SequenceNumber::getLow(), and OpenDDS::DCPS::OPENDDS_LIST().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments().

00161 {
00162   // length is number of (allocated) words in bitmap, max of 8
00163   // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
00164   const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
00165   if (iter == fragments_.end() || length == 0) {
00166     // Nothing missing
00167     return 0;
00168   }
00169 
00170   // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
00171   // low 32 bits of the 64-bit generalized sequence numbers in
00172   // FragRange::transport_seq_.
00173 
00174   const OPENDDS_LIST(FragRange)& flist = iter->second;
00175   const SequenceNumber& first = flist.front().transport_seq_.first;
00176   const CORBA::ULong base = (first == 1)
00177     ? flist.front().transport_seq_.second.getLow() + 1
00178     : 1;
00179 
00180   if (first != 1) {
00181     // Represent the "gap" before the first list element.
00182     // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
00183     DisjointSequence::fill_bitmap_range(0, first.getLow() - 2,
00184                                         bitmap, length, numBits);
00185   } else if (flist.size() == 1) {
00186     // No gaps, but we know there is (at least 1) more_framents
00187     DisjointSequence::fill_bitmap_range(0, 0,
00188                                         bitmap, length, numBits);
00189     // NOTE: this could send a nack for fragments that are in flight
00190     // need to defer setting bitmap till heartbeat extending logic
00191     // in RtpsUdpDataLink::generate_nack_frags
00192     return base;
00193   }
00194 
00195   typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator;
00196   for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
00197     const list_iterator it_next = ++list_iterator(it);
00198     if (it_next == flist.end()) {
00199       break;
00200     }
00201     const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base,
00202                        high = it_next->transport_seq_.first.getLow() - 1 - base;
00203     DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits);
00204   }
00205 
00206   return base;
00207 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportReassembly::has_frags ( const SequenceNumber seq,
const RepoId pub_id 
) const

Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.

Definition at line 151 of file TransportReassembly.cpp.

References fragments_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap().

00153 {
00154   return fragments_.count(FragKey(pub_id, seq));
00155 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportReassembly::insert ( OPENDDS_LIST(FragRange)&  flist,
const SequenceRange seqRange,
ReceivedDataSample data 
) [static, private]

Definition at line 46 of file TransportReassembly.cpp.

References ACE_TEXT(), ACE_Message_Block::cont(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::join(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::TransportReassembly::FragRange::rec_ds_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.

Referenced by data_unavailable(), and reassemble_i().

00049 {
00050   const SequenceNumber::Value prev = seqRange.first.getValue() - 1,
00051     next = seqRange.second.getValue() + 1;
00052 
00053   for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) {
00054     FragRange& fr = *it;
00055     if (next < fr.transport_seq_.first.getValue()) {
00056       // insert before 'it'
00057       flist.insert(it, FragRange(seqRange, data));
00058       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00059         "inserted on left\n"));
00060       return true;
00061 
00062     } else if (next == fr.transport_seq_.first.getValue()) {
00063       // combine on left of fr
00064       DataSampleHeader joined;
00065       if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
00066         join_err("left");
00067         return false;
00068       }
00069       fr.rec_ds_.header_ = joined;
00070       if (fr.rec_ds_.sample_ && data.sample_) {
00071         ACE_Message_Block* last;
00072         for (last = data.sample_.get(); last->cont(); last = last->cont()) ;
00073         last->cont(fr.rec_ds_.sample_.release());
00074         fr.rec_ds_.sample_.reset(data.sample_.release());
00075       }
00076       data.sample_.reset();
00077       fr.transport_seq_.first = seqRange.first;
00078       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00079         "combined on left\n"));
00080       return true;
00081 
00082     } else if (prev == fr.transport_seq_.second.getValue()) {
00083       // combine on right of fr
00084       if (!fr.rec_ds_.sample_) {
00085         fr.rec_ds_.header_.more_fragments_ = true;
00086       }
00087       DataSampleHeader joined;
00088       if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
00089         join_err("right");
00090         return false;
00091       }
00092       fr.rec_ds_.header_ = joined;
00093       if (fr.rec_ds_.sample_ && data.sample_) {
00094         ACE_Message_Block* last;
00095         for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ;
00096         last->cont(data.sample_.release());
00097       }
00098       else {
00099         fr.rec_ds_.sample_.reset();
00100         data.sample_.reset();
00101       }
00102 
00103       fr.transport_seq_.second = seqRange.second;
00104       VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00105         "combined on right\n"));
00106 
00107       // check if the next FragRange in the list needs to be combined
00108       if (++it != flist.end()) {
00109         if (next == it->transport_seq_.first.getValue()) {
00110           if (!fr.rec_ds_.sample_) {
00111             fr.rec_ds_.header_.more_fragments_ = true;
00112           }
00113           if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_,
00114                                       joined)) {
00115             join_err("combined next");
00116             return false;
00117           }
00118           fr.rec_ds_.header_ = joined;
00119           if (!it->rec_ds_.sample_) {
00120             fr.rec_ds_.sample_.reset();
00121           } else {
00122             if (!fr.rec_ds_.sample_) {
00123               ACE_ERROR((LM_ERROR,
00124                          ACE_TEXT("(%P|%t) ERROR: ")
00125                          ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ")
00126                          ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n")));
00127               return false;
00128             }
00129             ACE_Message_Block* last;
00130             for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ;
00131             last->cont(it->rec_ds_.sample_.release());
00132           }
00133           fr.transport_seq_.second = it->transport_seq_.second;
00134           flist.erase(it);
00135           VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00136             "coalesced on right\n"));
00137         }
00138       }
00139       return true;
00140     }
00141   }
00142 
00143   // add to end of list
00144   flist.push_back(FragRange(seqRange, data));
00145   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::insert() "
00146     "inserted at end of list\n"));
00147   return true;
00148 }

Here is the call graph for this function:

Here is the caller graph for this function:

typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP ( FragKey  ,
OPENDDS_LIST(FragRange  
) [private]
OpenDDS::DCPS::TransportReassembly::OPENDDS_SET ( FragKey   )  [private]
bool OpenDDS::DCPS::TransportReassembly::reassemble ( const SequenceRange seqRange,
ReceivedDataSample data 
)

Definition at line 210 of file TransportReassembly.cpp.

References reassemble_i().

00212 {
00213   return reassemble_i(seqRange, seqRange.first == 1, data);
00214 }

Here is the call graph for this function:

bool OpenDDS::DCPS::TransportReassembly::reassemble ( const SequenceNumber transportSeq,
bool  firstFrag,
ReceivedDataSample data 
)

Called by TransportReceiveStrategy if the fragmentation header flag is set. Returns true/false to indicate if data should be delivered to the datalink. The 'data' argument may be modified by this method.

Definition at line 217 of file TransportReassembly.cpp.

References reassemble_i().

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastSession::reassemble().

00220 {
00221   return reassemble_i(SequenceRange(transportSeq, transportSeq),
00222                       firstFrag, data);
00223 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportReassembly::reassemble_i ( const SequenceRange seqRange,
bool  firstFrag,
ReceivedDataSample data 
) [private]

Definition at line 226 of file TransportReassembly.cpp.

References fragments_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, insert(), LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::swap(), OpenDDS::DCPS::Transport_debug_level, and VDBG.

Referenced by reassemble().

00229 {
00230   if (Transport_debug_level > 5) {
00231     GuidConverter conv(data.header_.publication_id_);
00232     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00233       "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(),
00234       seqRange.second.getValue(), firstFrag ? 1 : 0,
00235       data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str()));
00236   }
00237 
00238   const FragKey key(data.header_.publication_id_, data.header_.sequence_);
00239 
00240   if (firstFrag) {
00241     have_first_.insert(key);
00242   }
00243 
00244   FragMap::iterator iter = fragments_.find(key);
00245   if (iter == fragments_.end()) {
00246     fragments_[key].push_back(FragRange(seqRange, data));
00247     // since this is the first fragment we've seen, it can't possibly be done
00248     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00249       "stored first frag, returning false (incomplete)\n"));
00250     return false;
00251   }
00252 
00253   if (!insert(iter->second, seqRange, data)) {
00254     // error condition, already logged by insert()
00255     return false;
00256   }
00257 
00258   // We can deliver data if all three of these conditions are met:
00259   // 1. we've seen the "first fragment" flag  [first frag is here]
00260   // 2. all fragments have been coalesced     [no gaps in the seq numbers]
00261   // 3. the "more fragments" flag is not set  [last frag is here]
00262   if (have_first_.count(key)
00263       && iter->second.size() == 1
00264       && !iter->second.front().rec_ds_.header_.more_fragments_) {
00265     swap(data, iter->second.front().rec_ds_);
00266     fragments_.erase(iter);
00267     have_first_.erase(key);
00268     VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00269       "removed frag, returning %C\n", data.sample_ ? "true" : "false"));
00270     return data.sample_.get(); // could be false if we had data_unavailable()
00271   }
00272 
00273   VDBG((LM_DEBUG, "(%P|%t) DBG:   TransportReassembly::reassemble() "
00274     "returning false (incomplete)\n"));
00275   return false;
00276 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 92 of file TransportReassembly.h.

Referenced by data_unavailable(), get_gaps(), has_frags(), and reassemble_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1