#include <TransportReassembly.h>
Public Member Functions | |
bool | reassemble (const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data) |
bool | reassemble (const SequenceRange &seqRange, ReceivedDataSample &data) |
void | data_unavailable (const SequenceRange &transportSeqDropped) |
void | data_unavailable (const SequenceNumber &dataSampleSeq, const RepoId &pub_id) |
bool | has_frags (const SequenceNumber &seq, const RepoId &pub_id) const |
CORBA::ULong | get_gaps (const SequenceNumber &msg_seq, const RepoId &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const |
Private Member Functions | |
bool | reassemble_i (const SequenceRange &seqRange, bool firstFrag, ReceivedDataSample &data) |
typedef | OPENDDS_MAP (FragKey, OPENDDS_LIST(FragRange)) FragMap |
OPENDDS_SET (FragKey) have_first_ | |
Static Private Member Functions | |
static bool | insert (OPENDDS_LIST(FragRange)&flist, const SequenceRange &seqRange, ReceivedDataSample &data) |
Private Attributes | |
FragMap | fragments_ |
Classes | |
struct | FragKey |
struct | FragRange |
Definition at line 19 of file TransportReassembly.h.
void OpenDDS::DCPS::TransportReassembly::data_unavailable | ( | const SequenceNumber & | dataSampleSeq, | |
const RepoId & | pub_id | |||
) |
Definition at line 337 of file TransportReassembly.cpp.
References fragments_.
00339 { 00340 const FragKey key(pub_id, dataSampleSeq); 00341 fragments_.erase(key); 00342 have_first_.erase(key); 00343 }
void OpenDDS::DCPS::TransportReassembly::data_unavailable | ( | const SequenceRange & | transportSeqDropped | ) |
Called by TransportReceiveStrategy to indicate that we can stop tracking partially-reassembled messages when we know the remaining fragments are not expected to arrive.
Definition at line 285 of file TransportReassembly.cpp.
References dummy, fragments_, insert(), OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::TransportReassembly::FragRange::transport_seq_, and VDBG.
Referenced by OpenDDS::DCPS::BestEffortSession::check_header(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_fragments().
00286 { 00287 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::data_unavailable() " 00288 "dropped %q-%q\n", dropped.first.getValue(), dropped.second.getValue())); 00289 typedef OPENDDS_LIST(FragRange)::iterator list_iterator; 00290 00291 for (FragMap::iterator iter = fragments_.begin(); iter != fragments_.end(); 00292 ++iter) { 00293 const FragKey& key = iter->first; 00294 OPENDDS_LIST(FragRange)& flist = iter->second; 00295 00296 ReceivedDataSample dummy(0); 00297 dummy.header_.sequence_ = key.data_sample_seq_; 00298 00299 // check if we should expand the front element (only if !have_first) 00300 const SequenceNumber::Value prev = 00301 flist.front().transport_seq_.first.getValue() - 1; 00302 if (dropped.second.getValue() == prev && !have_first_.count(key)) { 00303 have_first_.insert(key); 00304 dummy.header_.more_fragments_ = true; 00305 insert(flist, dropped, dummy); 00306 continue; 00307 } 00308 00309 // find a gap between list elements where "dropped" fits 00310 for (list_iterator it = flist.begin(); it != flist.end(); ++it) { 00311 list_iterator it_next = it; 00312 ++it_next; 00313 if (it_next == flist.end()) { 00314 break; 00315 } 00316 FragRange& fr1 = *it; 00317 FragRange& fr2 = *it_next; 00318 if (dropped.first > fr1.transport_seq_.second 00319 && dropped.second < fr2.transport_seq_.first) { 00320 dummy.header_.more_fragments_ = true; 00321 insert(flist, dropped, dummy); 00322 break; 00323 } 00324 } 00325 00326 // check if we should expand the last element 00327 const SequenceNumber next = 00328 ++SequenceNumber(flist.back().transport_seq_.second); 00329 if (dropped.first == next) { 00330 flist.back().rec_ds_.header_.more_fragments_ = true; 00331 insert(flist, dropped, dummy); 00332 } 00333 } 00334 }
CORBA::ULong OpenDDS::DCPS::TransportReassembly::get_gaps | ( | const SequenceNumber & | msg_seq, | |
const RepoId & | pub_id, | |||
CORBA::Long | bitmap[], | |||
CORBA::ULong | length, | |||
CORBA::ULong & | numBits | |||
) | const |
Populates bitmap for missing fragment sequence numbers and set numBits for the given message sequence and publisher ID.
Definition at line 164 of file TransportReassembly.cpp.
References OpenDDS::DCPS::DisjointSequence::fill_bitmap_range(), fragments_, OpenDDS::DCPS::SequenceNumber::getLow(), and OpenDDS::DCPS::OPENDDS_LIST().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments().
00167 { 00168 // length is number of (allocated) words in bitmap, max of 8 00169 // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words 00170 const FragMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq)); 00171 if (iter == fragments_.end() || length == 0) { 00172 // Nothing missing 00173 return 0; 00174 } 00175 00176 // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the 00177 // low 32 bits of the 64-bit generalized sequence numbers in 00178 // FragRange::transport_seq_. 00179 00180 const OPENDDS_LIST(FragRange)& flist = iter->second; 00181 const SequenceNumber& first = flist.front().transport_seq_.first; 00182 const CORBA::ULong base = (first == 1) 00183 ? flist.front().transport_seq_.second.getLow() + 1 00184 : 1; 00185 00186 if (first != 1) { 00187 // Represent the "gap" before the first list element. 00188 // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base 00189 DisjointSequence::fill_bitmap_range(0, first.getLow() - 2, 00190 bitmap, length, numBits); 00191 } else if (flist.size() == 1) { 00192 // No gaps, but we know there is (at least 1) more_framents 00193 DisjointSequence::fill_bitmap_range(0, 0, 00194 bitmap, length, numBits); 00195 // NOTE: this could send a nack for fragments that are in flight 00196 // need to defer setting bitmap till heartbeat extending logic 00197 // in RtpsUdpDataLink::generate_nack_frags 00198 return base; 00199 } 00200 00201 typedef OPENDDS_LIST(FragRange)::const_iterator list_iterator; 00202 for (list_iterator it = flist.begin(); it != flist.end(); ++it) { 00203 const list_iterator it_next = ++list_iterator(it); 00204 if (it_next == flist.end()) { 00205 break; 00206 } 00207 const CORBA::ULong low = it->transport_seq_.second.getLow() + 1 - base, 00208 high = it_next->transport_seq_.first.getLow() - 1 - base; 00209 DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits); 00210 } 00211 00212 return base; 00213 }
bool OpenDDS::DCPS::TransportReassembly::has_frags | ( | const SequenceNumber & | seq, | |
const RepoId & | pub_id | |||
) | const |
Returns true if this object is storing fragments for the given DataSampleHeader sequence number from the given publication.
Definition at line 157 of file TransportReassembly.cpp.
References fragments_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::has_fragments(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::remove_frags_from_bitmap().
00159 { 00160 return fragments_.count(FragKey(pub_id, seq)); 00161 }
bool OpenDDS::DCPS::TransportReassembly::insert | ( | OPENDDS_LIST(FragRange)& | flist, | |
const SequenceRange & | seqRange, | |||
ReceivedDataSample & | data | |||
) | [static, private] |
Definition at line 44 of file TransportReassembly.cpp.
References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::join(), OpenDDS::DCPS::join_err(), OpenDDS::DCPS::OPENDDS_LIST(), OpenDDS::DCPS::ReceivedDataSample::sample_, and VDBG.
Referenced by data_unavailable(), and reassemble_i().
00047 { 00048 const SequenceNumber::Value prev = seqRange.first.getValue() - 1, 00049 next = seqRange.second.getValue() + 1; 00050 00051 for (OPENDDS_LIST(FragRange)::iterator it = flist.begin(); it != flist.end(); ++it) { 00052 FragRange& fr = *it; 00053 if (next < fr.transport_seq_.first.getValue()) { 00054 // insert before 'it' 00055 flist.insert(it, FragRange(seqRange, data)); 00056 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00057 "inserted on left\n")); 00058 return true; 00059 00060 } else if (next == fr.transport_seq_.first.getValue()) { 00061 // combine on left of fr 00062 DataSampleHeader joined; 00063 if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) { 00064 join_err("left"); 00065 return false; 00066 } 00067 fr.rec_ds_.header_ = joined; 00068 if (fr.rec_ds_.sample_ && !data.sample_) { 00069 ACE_Message_Block::release(fr.rec_ds_.sample_); 00070 fr.rec_ds_.sample_ = 0; 00071 } else if (!fr.rec_ds_.sample_) { 00072 ACE_Message_Block::release(data.sample_); 00073 } else { 00074 ACE_Message_Block* last; 00075 for (last = data.sample_; last->cont(); last = last->cont()) ; 00076 last->cont(fr.rec_ds_.sample_); 00077 fr.rec_ds_.sample_ = data.sample_; 00078 } 00079 data.sample_ = 0; 00080 fr.transport_seq_.first = seqRange.first; 00081 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00082 "combined on left\n")); 00083 return true; 00084 00085 } else if (prev == fr.transport_seq_.second.getValue()) { 00086 // combine on right of fr 00087 if (!fr.rec_ds_.sample_) { 00088 fr.rec_ds_.header_.more_fragments_ = true; 00089 } 00090 DataSampleHeader joined; 00091 if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) { 00092 join_err("right"); 00093 return false; 00094 } 00095 fr.rec_ds_.header_ = joined; 00096 if (fr.rec_ds_.sample_ && !data.sample_) { 00097 ACE_Message_Block::release(fr.rec_ds_.sample_); 00098 fr.rec_ds_.sample_ = 0; 00099 } else if (!fr.rec_ds_.sample_) { 00100 ACE_Message_Block::release(data.sample_); 00101 } else { 00102 ACE_Message_Block* last; 00103 for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ; 00104 last->cont(data.sample_); 00105 } 00106 data.sample_ = 0; 00107 fr.transport_seq_.second = seqRange.second; 00108 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00109 "combined on right\n")); 00110 00111 // check if the next FragRange in the list needs to be combined 00112 if (++it != flist.end()) { 00113 if (next == it->transport_seq_.first.getValue()) { 00114 if (!fr.rec_ds_.sample_) { 00115 fr.rec_ds_.header_.more_fragments_ = true; 00116 } 00117 if (!DataSampleHeader::join(fr.rec_ds_.header_, it->rec_ds_.header_, 00118 joined)) { 00119 join_err("combined next"); 00120 return false; 00121 } 00122 fr.rec_ds_.header_ = joined; 00123 if (!it->rec_ds_.sample_) { 00124 ACE_Message_Block::release(fr.rec_ds_.sample_); 00125 fr.rec_ds_.sample_ = 0; 00126 } else { 00127 if (!fr.rec_ds_.sample_) { 00128 ACE_ERROR((LM_ERROR, 00129 ACE_TEXT("(%P|%t) ERROR: ") 00130 ACE_TEXT("OpenDDS::DCPS::TransportReassembly::insert, ") 00131 ACE_TEXT("Cannot dereference null pointer fr.rec_ds_.sample_\n"))); 00132 return false; 00133 } 00134 ACE_Message_Block* last; 00135 for (last = fr.rec_ds_.sample_; last->cont(); last = last->cont()) ; 00136 last->cont(it->rec_ds_.sample_); 00137 it->rec_ds_.sample_ = 0; 00138 } 00139 fr.transport_seq_.second = it->transport_seq_.second; 00140 flist.erase(it); 00141 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00142 "coalesced on right\n")); 00143 } 00144 } 00145 return true; 00146 } 00147 } 00148 00149 // add to end of list 00150 flist.push_back(FragRange(seqRange, data)); 00151 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::insert() " 00152 "inserted at end of list\n")); 00153 return true; 00154 }
typedef OpenDDS::DCPS::TransportReassembly::OPENDDS_MAP | ( | FragKey | , | |
OPENDDS_LIST(FragRange) | ||||
) | [private] |
OpenDDS::DCPS::TransportReassembly::OPENDDS_SET | ( | FragKey | ) | [private] |
bool OpenDDS::DCPS::TransportReassembly::reassemble | ( | const SequenceRange & | seqRange, | |
ReceivedDataSample & | data | |||
) |
Definition at line 216 of file TransportReassembly.cpp.
References reassemble_i().
00218 { 00219 return reassemble_i(seqRange, seqRange.first == 1, data); 00220 }
bool OpenDDS::DCPS::TransportReassembly::reassemble | ( | const SequenceNumber & | transportSeq, | |
bool | firstFrag, | |||
ReceivedDataSample & | data | |||
) |
Called by TransportReceiveStrategy if the fragmentation header flag is set. Returns true/false to indicate if data should be delivered to the datalink. The 'data' argument may be modified by this method.
Definition at line 223 of file TransportReassembly.cpp.
References reassemble_i().
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastSession::reassemble().
00226 { 00227 return reassemble_i(SequenceRange(transportSeq, transportSeq), 00228 firstFrag, data); 00229 }
bool OpenDDS::DCPS::TransportReassembly::reassemble_i | ( | const SequenceRange & | seqRange, | |
bool | firstFrag, | |||
ReceivedDataSample & | data | |||
) | [private] |
Definition at line 232 of file TransportReassembly.cpp.
References fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, insert(), OPENDDS_STRING, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::push_back(), OpenDDS::DCPS::ReceivedDataSample::sample_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::swap(), OpenDDS::DCPS::Transport_debug_level, and VDBG.
Referenced by reassemble().
00235 { 00236 if (Transport_debug_level > 5) { 00237 GuidConverter conv(data.header_.publication_id_); 00238 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00239 "tseq %q-%q first %d dseq %q pub %C\n", seqRange.first.getValue(), 00240 seqRange.second.getValue(), firstFrag ? 1 : 0, 00241 data.header_.sequence_.getValue(), OPENDDS_STRING(conv).c_str())); 00242 } 00243 00244 const FragKey key(data.header_.publication_id_, data.header_.sequence_); 00245 00246 if (firstFrag) { 00247 have_first_.insert(key); 00248 } 00249 00250 FragMap::iterator iter = fragments_.find(key); 00251 if (iter == fragments_.end()) { 00252 fragments_[key].push_back(FragRange(seqRange, data)); 00253 // since this is the first fragment we've seen, it can't possibly be done 00254 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00255 "stored first frag, returning false (incomplete)\n")); 00256 return false; 00257 } 00258 00259 if (!insert(iter->second, seqRange, data)) { 00260 // error condition, already logged by insert() 00261 return false; 00262 } 00263 00264 // We can deliver data if all three of these conditions are met: 00265 // 1. we've seen the "first fragment" flag [first frag is here] 00266 // 2. all fragments have been coalesced [no gaps in the seq numbers] 00267 // 3. the "more fragments" flag is not set [last frag is here] 00268 if (have_first_.count(key) 00269 && iter->second.size() == 1 00270 && !iter->second.front().rec_ds_.header_.more_fragments_) { 00271 swap(data, iter->second.front().rec_ds_); 00272 fragments_.erase(iter); 00273 have_first_.erase(key); 00274 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00275 "removed frag, returning %C\n", data.sample_ ? "true" : "false")); 00276 return data.sample_; // could be false if we had data_unavailable() 00277 } 00278 00279 VDBG((LM_DEBUG, "(%P|%t) DBG: TransportReassembly::reassemble() " 00280 "returning false (incomplete)\n")); 00281 return false; 00282 }
FragMap OpenDDS::DCPS::TransportReassembly::fragments_ [private] |
Definition at line 90 of file TransportReassembly.h.
Referenced by data_unavailable(), get_gaps(), has_frags(), and reassemble_i().