#include <TransportReassembly.h>
Classes | |
struct | FragKey |
struct | FragRange |
Public Member Functions | |
bool | reassemble (const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data) |
bool | reassemble (const SequenceRange &seqRange, ReceivedDataSample &data) |
void | data_unavailable (const SequenceRange &transportSeqDropped) |
void | data_unavailable (const SequenceNumber &dataSampleSeq, const RepoId &pub_id) |
bool | has_frags (const SequenceNumber &seq, const RepoId &pub_id) const |
CORBA::ULong | get_gaps (const SequenceNumber &msg_seq, const RepoId &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const |
Private Member Functions | |
bool | reassemble_i (const SequenceRange &seqRange, bool firstFrag, ReceivedDataSample &data) |
typedef | OPENDDS_MAP (FragKey, OPENDDS_LIST(FragRange)) FragMap |
OPENDDS_SET (FragKey) have_first_ | |
Static Private Member Functions | |
static bool | insert (OPENDDS_LIST(FragRange)&flist, const SequenceRange &seqRange, ReceivedDataSample &data) |
Private Attributes | |
FragMap | fragments_ |
Definition at line 21 of file TransportReassembly.h.
void OpenDDS::DCPS::TransportReassembly::data_unavailable | ( | const SequenceNumber & | dataSampleSeq, | |
const RepoId & | pub_id | |||
) |
Definition at line 331 of file TransportReassembly.cpp.
References fragments_.
00333 { 00334 const FragKey key(pub_id, dataSampleSeq); 00335 fragments_.erase(key); 00336 have_first_.erase(key); 00337 }
void OpenDDS::DCPS::TransportReassembly::data_unavailable | ( | const SequenceRange & | transportSeqDropped | ) |
Called by TransportReceiveStrategy to indicate that we can stop tracking partially-reassembled messages when we know the remaining fragments are not expected to arrive.
Definition at line 279 of file TransportReassembly.cpp.
References OpenDDS::DCPS::TransportReassembly::FragKey::data_sample_seq_, fragments_, OpenDDS::DCPS::ReceivedDataSample::header_, insert(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.
Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), OpenDDS::DCPS::ReliableSession::expire_naks(), OpenDDS::DCPS::ReliableSession::nakack_received(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments().
00280 { 00281 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::data_unavailable() " 00282 "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue())); 00283 typedef OPENDDS_LIST(FragRange)::iterator list_iterator; 00284 00285 for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end(); 00286 ++iter) { 00287 const FragKey& key = iter->first; 00288 OPENDDS_LIST(FragRange)& flist = iter->second; 00289 00290 ReceivedDataSample dummy(0); 00291 dummy.header_.sequence_ = key.data_sample_seq_; 00292 00293 // check if we should expand the front element (only if !have_first) 00294 const SequenceNumber::Value prev = 00295 flist.front().transport_seq_.first.getValue() - 1; 00296 if (dropped.second.getValue() == prev && !have_first_.count(key)) { 00297 have_first_.insert(key); 00298 dummy.header_.more_fragments_ = true; 00299 insert(flist, dropped, dummy); 00300 continue; 00301 } 00302 00303 // find a gap between list elements where "dropped" fits 00304 for (list_iterator it = flist.begin(); it != flist.end(); ++it) { 00305 list_iterator it_next = it; 00306 ++it_next; 00307 if (it_next == flist.end()) { 00308 break; 00309 } 00310 FragRange& fr1 = *it; 00311 FragRange& fr2 = *it_next; 00312 if (dropped.first > fr1.transport_seq_.second 00313 && dropped.second < fr2.transport_seq_.first) { 00314 dummy.header_.more_fragments_ = true; 00315 insert(flist, dropped, dummy); 00316 break; 00317 } 00318 } 00319 00320 // check if we should expand the last element 00321 const SequenceNumber next = 00322 ++SequenceNumber(flist.back().transport_seq_.second); 00323 if (dropped.first == next) { 00324 flist.back().rec_ds_.header_.more_fragments_ = true; 00325 insert(flist, dropped, dummy); 00326 } 00327 } 00328 }
CORBA::ULong OpenDDS::DCPS::TransportReassembly::get_gaps | ( | const SequenceNumber & | msg_seq, | |
const RepoId & | pub_id, | |||
CORBA::Long | bitmap[], | |||
CORBA::ULong | length, | |||
CORBA::ULong & | numBits | |||
) | const |
Populates bitmap for missing fragment sequence numbers and set numBits for the given message sequence and publisher ID.
Definition at line 158 of file TransportReassembly.cpp.
References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), fragments_, OpenDDS::DCPS::SequenceNumber::getLow(), and OpenDDS::DCPS::OPENDDS_LIST().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments().
00161 { 00162 // length is number of (allocated) words in bitmap, max of 8 00163 // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words 00164 const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq)); 00165 if (iter == fragments_.end() || length == 0) { 00166 // Nothing missing 00167 return 0; 00168 } 00169 00170 // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the 00171 // low 32 bits of the 64-bit generalized sequence numbers in 00172 // FragRange::transport_seq_. 00173 00174 const OPENDDS_LIST(FragRange)& flist = iter->second; 00175 const SequenceNumber& first = flist.front().transport_seq_.first; 00176 const CORBA::ULong base = (first == 1) 00177 ? flist.front().transport_seq_.second.getLow() + 1 00178 : 1; 00179 00180 if (first != 1) { 00181 // Represent the "gap" before the first list element. 00182 // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base 00183 DisjointSequence::fill_bitmap_range(0, first.getLow() - 2, 00184 bitmap, length, numBits); 00185 } else if (flist.size() == 1) { 00186 // No gaps, but we know there is (at least 1) more_framents 00187 DisjointSequence::fill_bitmap_range(0, 0, 00188 bitmap, length, numBits); 00189 // NOTE: this could send a nack for fragments that are in flight 00190 // need to defer setting bitmap till heartbeat extending logic 00191 // in RtpsUdpDataLink::generate_nack_frags 00192 return base; 00193 } 00194 00195 typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator; 00196 for (list_iterator it = flist.begin(); it != flist.end(); ++it) { 00197 const list_iterator it_next = ++list_iterator(it); 00198 if (it_next == flist.end()) { 00199 break; 00200 } 00201 const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base, 00202 high = it_next->transport_seq_.first.getLow() - 1 - base; 00203 DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits); 00204 } 00205 00206 return base; 00207 }
bool OpenDDS::DCPS::TransportReassembly::has_frags | ( | const SequenceNumber & | seq, | |
const RepoId & | pub_id | |||
) | const |
Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.
Definition at line 151 of file TransportReassembly.cpp.
References fragments_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap().
00153 { 00154 return fragments_.count(FragKey(pub_id, seq)); 00155 }
bool OpenDDS::DCPS::TransportReassembly::insert | ( | OPENDDS_LIST(FragRange)& | flist, | |
const SequenceRange & | seqRange, | |||
ReceivedDataSample & | data | |||
) | [static, private] |
Definition at line 46 of file TransportReassembly.cpp.
References ACE_TEXT(), ACE_Message_Block::cont(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::join(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::DataSampleHeader::more_fragments_, OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::TransportReassembly::FragRange::rec_ds_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.
Referenced by data_unavailable(), and reassemble_i().
00049 { 00050 const SequenceNumber::Value prev = seqRange.first.getValue() - 1, 00051 next = seqRange.second.getValue() + 1; 00052 00053 for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) { 00054 FragRange& fr = *it; 00055 if (next < fr.transport_seq_.first.getValue()) { 00056 // insert before 'it' 00057 flist.insert(it, FragRange(seqRange, data)); 00058 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00059 "inserted on left\n")); 00060 return true; 00061 00062 } else if (next == fr.transport_seq_.first.getValue()) { 00063 // combine on left of fr 00064 DataSampleHeader joined; 00065 if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) { 00066 join_err("left"); 00067 return false; 00068 } 00069 fr.rec_ds_.header_ = joined; 00070 if (fr.rec_ds_.sample_ && data.sample_) { 00071 ACE_Message_Block* last; 00072 for (last = data.sample_.get(); last->cont(); last = last->cont()) ; 00073 last->cont(fr.rec_ds_.sample_.release()); 00074 fr.rec_ds_.sample_.reset(data.sample_.release()); 00075 } 00076 data.sample_.reset(); 00077 fr.transport_seq_.first = seqRange.first; 00078 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00079 "combined on left\n")); 00080 return true; 00081 00082 } else if (prev == fr.transport_seq_.second.getValue()) { 00083 // combine on right of fr 00084 if (!fr.rec_ds_.sample_) { 00085 fr.rec_ds_.header_.more_fragments_ = true; 00086 } 00087 DataSampleHeader joined; 00088 if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) { 00089 join_err("right"); 00090 return false; 00091 } 00092 fr.rec_ds_.header_ = joined; 00093 if (fr.rec_ds_.sample_ && data.sample_) { 00094 ACE_Message_Block* last; 00095 for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ; 00096 last->cont(data.sample_.release()); 00097 } 00098 else { 00099 fr.rec_ds_.sample_.reset(); 00100 data.sample_.reset(); 00101 } 00102 00103 fr.transport_seq_.second = seqRange.second; 00104 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00105 "combined on right\n")); 00106 00107 // check if the next FragRange in the list needs to be combined 00108 if (++it != flist.end()) { 00109 if (next == it->transport_seq_.first.getValue()) { 00110 if (!fr.rec_ds_.sample_) { 00111 fr.rec_ds_.header_.more_fragments_ = true; 00112 } 00113 if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_, 00114 joined)) { 00115 join_err("combined next"); 00116 return false; 00117 } 00118 fr.rec_ds_.header_ = joined; 00119 if (!it->rec_ds_.sample_) { 00120 fr.rec_ds_.sample_.reset(); 00121 } else { 00122 if (!fr.rec_ds_.sample_) { 00123 ACE_ERROR((LM_ERROR, 00124 ACE_TEXT("(%P|%t) ERROR: ") 00125 ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ") 00126 ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n"))); 00127 return false; 00128 } 00129 ACE_Message_Block* last; 00130 for (last = fr.rec_ds_.sample_.get(); last->cont(); last = last->cont()) ; 00131 last->cont(it->rec_ds_.sample_.release()); 00132 } 00133 fr.transport_seq_.second = it->transport_seq_.second; 00134 flist.erase(it); 00135 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00136 "coalesced on right\n")); 00137 } 00138 } 00139 return true; 00140 } 00141 } 00142 00143 // add to end of list 00144 flist.push_back(FragRange(seqRange, data)); 00145 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00146 "inserted at end of list\n")); 00147 return true; 00148 }
typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP | ( | FragKey | , | |
OPENDDS_LIST(FragRange) | ||||
) | [private] |
OpenDDS::DCPS::TransportReassembly::OPENDDS_SET | ( | FragKey | ) | [private] |
bool OpenDDS::DCPS::TransportReassembly::reassemble | ( | const SequenceRange & | seqRange, | |
ReceivedDataSample & | data | |||
) |
Definition at line 210 of file TransportReassembly.cpp.
References reassemble_i().
00212 { 00213 return reassemble_i(seqRange, seqRange.first == 1, data); 00214 }
bool OpenDDS::DCPS::TransportReassembly::reassemble | ( | const SequenceNumber & | transportSeq, | |
bool | firstFrag, | |||
ReceivedDataSample & | data | |||
) |
Called by TransportReceiveStrategy if the fragmentation header flag is set. Returns true/false to indicate if data should be delivered to the datalink. The 'data' argument may be modified by this method.
Definition at line 217 of file TransportReassembly.cpp.
References reassemble_i().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastSession::reassemble().
00220 { 00221 return reassemble_i(SequenceRange(transportSeq, transportSeq), 00222 firstFrag, data); 00223 }
bool OpenDDS::DCPS::TransportReassembly::reassemble_i | ( | const SequenceRange & | seqRange, | |
bool | firstFrag, | |||
ReceivedDataSample & | data | |||
) | [private] |
Definition at line 226 of file TransportReassembly.cpp.
References fragments_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, insert(), LM_DEBUG, OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::swap(), OpenDDS::DCPS::Transport_debug_level, and VDBG.
Referenced by reassemble().
00229 { 00230 if (Transport_debug_level > 5) { 00231 GuidConverter conv(data.header_.publication_id_); 00232 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00233 "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(), 00234 seqRange.second.getValue(), firstFrag ? 1 : 0, 00235 data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str())); 00236 } 00237 00238 const FragKey key(data.header_.publication_id_, data.header_.sequence_); 00239 00240 if (firstFrag) { 00241 have_first_.insert(key); 00242 } 00243 00244 FragMap::iterator iter = fragments_.find(key); 00245 if (iter == fragments_.end()) { 00246 fragments_[key].push_back(FragRange(seqRange, data)); 00247 // since this is the first fragment we've seen, it can't possibly be done 00248 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00249 "stored first frag, returning false (incomplete)\n")); 00250 return false; 00251 } 00252 00253 if (!insert(iter->second, seqRange, data)) { 00254 // error condition, already logged by insert() 00255 return false; 00256 } 00257 00258 // We can deliver data if all three of these conditions are met: 00259 // 1. we've seen the "first fragment" flag [first frag is here] 00260 // 2. all fragments have been coalesced [no gaps in the seq numbers] 00261 // 3. the "more fragments" flag is not set [last frag is here] 00262 if (have_first_.count(key) 00263 && iter->second.size() == 1 00264 && !iter->second.front().rec_ds_.header_.more_fragments_) { 00265 swap(data, iter->second.front().rec_ds_); 00266 fragments_.erase(iter); 00267 have_first_.erase(key); 00268 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00269 "removed frag, returning %C\n", data.sample_ ? "true" : "false")); 00270 return data.sample_.get(); // could be false if we had data_unavailable() 00271 } 00272 00273 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00274 "returning false (incomplete)\n")); 00275 return false; 00276 }
FragMap OpenDDS::DCPS::TransportReassembly::fragments_ [private] |
Definition at line 92 of file TransportReassembly.h.
Referenced by data_unavailable(), get_gaps(), has_frags(), and reassemble_i().