OpenDDS  Snapshot(2023/04/28-20:55)
TransportReassembly.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "TransportReassembly.h"
10 #include "TransportDebug.h"
11 
12 #include "dds/DCPS/GuidConverter.h"
14 
16 
17 namespace OpenDDS {
18 namespace DCPS {
19 
20 FragKey::FragKey(const GUID_t& pubId,
21  const SequenceNumber& dataSampleSeq)
22  : publication_(pubId)
23  , data_sample_seq_(dataSampleSeq)
24 {
25 }
26 
28 
30  const ReceivedDataSample& data)
31  : frag_range_(fragRange)
32  , rec_ds_(data)
33 {
34 }
35 
37  : timeout_(timeout)
38 {
39 }
40 
41 bool
43  const GUID_t& pub_id,
44  ACE_UINT32& total_frags) const
45 {
47  const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
48  if (iter != fragments_.end()) {
49  total_frags = iter->second.total_frags_;
50  return true;
51  }
52  return false;
53 }
54 
55 void
57 {
59  completed_.erase(pub_id);
60 }
61 
64  CORBA::Long bitmap[], CORBA::ULong length,
65  CORBA::ULong& numBits) const
66 {
68  // length is number of (allocated) words in bitmap, max of 8
69  // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
70  if (length == 0) {
71  return 0;
72  }
73 
74  const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
75  if (iter == fragments_.end()) {
76  // Nothing missing
77  return 0;
78  }
79 
80  // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
81  // low 32 bits of the 64-bit generalized sequence numbers in
82  // FragSample::frag_range_.
83 
84  const OPENDDS_LIST(FragSample)& flist = iter->second.sample_list_;
85  const FragmentNumber first = flist.front().frag_range_.first;
86  const CORBA::ULong base = static_cast<CORBA::ULong>((first == 1)
87  ? flist.front().frag_range_.second + 1
88  : 1);
89 
90  if (first != 1) {
91  // Represent the "gap" before the first list element.
92  // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
93  ACE_CDR::ULong bits_added = 0;
94  DisjointSequence::fill_bitmap_range(0, static_cast<CORBA::ULong>(first - 2),
95  bitmap, length, numBits, bits_added);
96  } else if (flist.size() == 1) {
97  // No gaps, but we know there are (at least 1) more_fragments
98  if (iter->second.total_frags_ == 0) {
99  ACE_CDR::ULong bits_added = 0;
100  DisjointSequence::fill_bitmap_range(0, 0, bitmap, length, numBits, bits_added);
101  } else {
102  const size_t rlimit = static_cast<size_t>(flist.back().frag_range_.second - 1);
103  const CORBA::ULong ulimit = static_cast<CORBA::ULong>(iter->second.total_frags_ - (base < rlimit ? rlimit : base));
104  ACE_CDR::ULong bits_added = 0;
106  ulimit,
107  bitmap, length, numBits, bits_added);
108  }
109  // NOTE: this could send a nack for fragments that are in flight
110  // need to defer setting bitmap till heartbeat extending logic
111  // in RtpsUdpDataLink::generate_nack_frags
112  return base;
113  }
114 
115  typedef OPENDDS_LIST(FragSample)::const_iterator list_iterator;
116  for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
117  const list_iterator it_next = ++list_iterator(it);
118  if (it_next == flist.end()) {
119  break;
120  }
121  const CORBA::ULong low = static_cast<CORBA::ULong>(it->frag_range_.second + 1 - base),
122  high = static_cast<CORBA::ULong>(it_next->frag_range_.first - 1 - base);
123  ACE_CDR::ULong bits_added = 0;
124  DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits, bits_added);
125  }
126 
127  return base;
128 }
129 
130 bool
132  ReceivedDataSample& data,
133  ACE_UINT32 total_frags)
134 {
136  return reassemble_i(fragRange, fragRange.first == 1, data, total_frags);
137 }
138 
139 bool
141  bool firstFrag,
142  ReceivedDataSample& data,
143  ACE_UINT32 total_frags)
144 {
146  return reassemble_i(FragmentRange(transportSeq.getValue(), transportSeq.getValue()),
147  firstFrag, data, total_frags);
148 }
149 
150 bool
152  bool firstFrag,
153  ReceivedDataSample& data,
154  ACE_UINT32 total_frags)
155 {
156  if (Transport_debug_level > 5) {
157  LogGuid logger(data.header_.publication_id_);
158  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
159  "tseq %q-%q first %d dseq %q pub %C\n", fragRange.first,
160  fragRange.second, firstFrag ? 1 : 0,
161  data.header_.sequence_.getValue(), logger.c_str()));
162  }
163 
165  check_expirations(now);
166 
168  FragInfoMap::iterator iter = fragments_.find(key);
169  const MonotonicTimePoint expiration = now + timeout_;
170 
171  if (iter == fragments_.end()) {
172  FragInfo& finfo = fragments_[key];
173  finfo = FragInfo(firstFrag, FragInfo::FragSampleList(), total_frags, expiration);
174  finfo.insert(fragRange, data);
175  expiration_queue_.push_back(std::make_pair(expiration, key));
176  data.clear();
177  // since this is the first fragment we've seen, it can't possibly be done
179  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
180  "stored first frag, returning false (incomplete) with %B fragments\n",
181  fragments_.size()));
182  }
183  return false;
184  } else {
185  const CompletedMap::const_iterator citer = completed_.find(key.publication_);
186  if (citer != completed_.end() && citer->second.contains(key.data_sample_seq_)) {
187  // already completed, not storing or delivering this message
188  return false;
189  }
190  if (firstFrag) {
191  iter->second.have_first_ = true;
192  }
193  if (iter->second.total_frags_ < total_frags) {
194  iter->second.total_frags_ = total_frags;
195  }
196  iter->second.expiration_ = expiration;
197  }
198 
199  if (!iter->second.insert(fragRange, data)) {
200  // error condition, already logged by insert()
201  return false;
202  }
203 
204  // We can deliver data if all three of these conditions are met:
205  // 1. we've seen the "first fragment" flag [first frag is here]
206  // 2. all fragments have been coalesced [no gaps in the seq numbers]
207  // 3. the "more fragments" flag is not set [last frag is here]
208  if (iter->second.have_first_
209  && iter->second.sample_list_.size() == 1
210  && !iter->second.sample_list_.front().rec_ds_.header_.more_fragments_) {
211  std::swap(data, iter->second.sample_list_.front().rec_ds_);
212  fragments_.erase(iter);
213  completed_[key.publication_].insert(key.data_sample_seq_);
215  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
216  "removed frag, returning %C with %B fragments\n",
217  data.has_data() ? "true (complete)" : "false (incomplete)", fragments_.size()));
218  }
219  return data.has_data(); // could be false if we had data_unavailable()
220  }
221 
222  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
223  "returning false (incomplete)\n"));
224  return false;
225 }
226 
227 void
229 {
231  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable(): "
232  "dropped %q-%q\n", dropped.first, dropped.second));
233  typedef OPENDDS_LIST(FragSample)::iterator list_iterator;
234 
235  for (FragInfoMap::iterator iter = fragments_.begin(); iter != fragments_.end();
236  ++iter) {
237  const FragKey& key = iter->first;
238  FragInfo& finfo = iter->second;
239  FragInfo::FragSampleList& flist = finfo.sample_list_;
240 
241  ReceivedDataSample dummy;
242  dummy.header_.sequence_ = key.data_sample_seq_;
243 
244  // check if we should expand the front element (only if !have_first)
245  const FragmentNumber prev = flist.front().frag_range_.first - 1;
246  if (dropped.second == prev && !finfo.have_first_) {
247  finfo.have_first_ = true;
248  dummy.header_.more_fragments_ = true;
249  finfo.insert(dropped, dummy);
250  continue;
251  }
252 
253  // find a gap between list elements where "dropped" fits
254  for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
255  list_iterator it_next = it;
256  ++it_next;
257  if (it_next == flist.end()) {
258  break;
259  }
260  FragSample& fr1 = *it;
261  FragSample& fr2 = *it_next;
262  if (dropped.first > fr1.frag_range_.second
263  && dropped.second < fr2.frag_range_.first) {
264  dummy.header_.more_fragments_ = true;
265  finfo.insert(dropped, dummy);
266  break;
267  }
268  }
269 
270  // check if we should expand the last element
271  const FragmentNumber next = flist.back().frag_range_.second + 1;
272  if (dropped.first == next) {
273  flist.back().rec_ds_.header_.more_fragments_ = true;
274  finfo.insert(dropped, dummy);
275  }
276  }
277 }
278 
279 void
281  const GUID_t& pub_id)
282 {
284  if (fragments_.erase(FragKey(pub_id, dataSampleSeq)) &&
286  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable: "
287  "removed leaving %B fragments\n", fragments_.size()));
288  }
289 }
290 
292 {
293  while (!expiration_queue_.empty() && expiration_queue_.front().first <= now) {
294  FragInfoMap::iterator iter = fragments_.find(expiration_queue_.front().second);
295  if (iter != fragments_.end()) {
296  // FragInfo::expiration_ may have changed after insertion into expiration_queue_
297  if (iter->second.expiration_ <= now) {
298  fragments_.erase(iter);
300  ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::check_expirations: "
301  "purge expired leaving %B fragments\n", fragments_.size()));
302  }
303  } else {
304  expiration_queue_.push_back(std::make_pair(iter->second.expiration_, iter->first));
305  }
306  }
307  expiration_queue_.pop_front();
308  }
309 }
310 
312  : have_first_(false)
313  , total_frags_(0)
314 {}
315 
316 TransportReassembly::FragInfo::FragInfo(bool hf, const FragSampleList& rl, ACE_UINT32 tf, const MonotonicTimePoint& expiration)
317  : have_first_(hf)
318  , sample_list_(rl)
319  , total_frags_(tf)
320  , expiration_(expiration)
321 {
322  for (FragSampleList::iterator it = sample_list_.begin(), prev = it; it != sample_list_.end(); ++it) {
323  sample_finder_[it->frag_range_.second] = it;
324  if (it != sample_list_.begin()) {
325  gap_list_.push_back(FragmentRange(prev->frag_range_.second + 1, it->frag_range_.first - 1));
326  }
327  prev = it;
328  }
329  for (FragGapList::iterator it = gap_list_.begin(); it != gap_list_.end(); ++it) {
330  gap_finder_[it->second] = it;
331  }
332 }
333 
335 {
336  *this = val;
337 }
338 
341 {
342  if (this != &rhs) {
343  have_first_ = rhs.have_first_;
345  gap_list_ = rhs.gap_list_;
347  expiration_ = rhs.expiration_;
348  sample_finder_.clear();
349  gap_finder_.clear();
350  for (FragSampleList::iterator it = sample_list_.begin(); it != sample_list_.end(); ++it) {
351  sample_finder_[it->frag_range_.second] = it;
352  }
353  for (FragGapList::iterator it = gap_list_.begin(); it != gap_list_.end(); ++it) {
354  gap_finder_[it->second] = it;
355  }
356  }
357  return *this;
358 }
359 
360 namespace {
361  inline void join_err(const char* detail)
362  {
364  ACE_TEXT("(%P|%t) ERROR: TransportReassembly::FragInfo::insert: ")
365  ACE_TEXT("DataSampleHeaders could not be joined: %C\n"), detail));
366  }
367 }
368 
369 bool
371 {
372  const FragmentNumber prev = fragRange.first - 1, next = fragRange.second + 1;
373 
374  FragSampleList::iterator start = sample_list_.begin();
375  FragSampleListIterMap::iterator fit = sample_finder_.begin();
376  if (!sample_list_.empty()) {
377  fit = sample_finder_.lower_bound(prev);
378  if (fit != sample_finder_.end()) {
379  start = fit->second;
380  if (start->frag_range_.second != prev && start != sample_list_.begin()) {
381  --start;
382  --fit;
383  }
384  } else {
385  start = sample_list_.end();
386  --start;
387  --fit;
388  }
389  }
390 
392 
393  for (FragSampleList::iterator it = start; it != sample_list_.end(); ++it) {
394  FragSample& fr = *it;
395  if (next < fr.frag_range_.first) {
396  // insert before 'it'
397  sample_finder_[fragRange.second] = sample_list_.insert(it, FragSample(fragRange, data));
398  data.clear();
399  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) inserted %q-%q on the left of %q-%q\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
400  return true;
401 
402  } else if (next == fr.frag_range_.first) {
403  // combine on left of fr
404  DataSampleHeader joined;
405  if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
406  join_err("left");
407  return false;
408  }
409  fr.rec_ds_.header_ = joined;
410  if (fr.rec_ds_.has_data() && data.has_data()) {
411  fr.rec_ds_.prepend(data);
412  } else {
413  fr.rec_ds_.clear();
414  data.clear();
415  }
416  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the left\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
417  fr.frag_range_.first = fragRange.first;
418  return true;
419 
420  } else if (fragRange.first < fr.frag_range_.first) {
421  // split and recursively insert both parts
422  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q into %q-%q and %q-%q and recursively inserting both\n", sn, fragRange.first, fragRange.second, fragRange.first, fr.frag_range_.first - 1, fr.frag_range_.first, fragRange.second));
423  ReceivedDataSample front_split = data.get_fragment_range(0, fr.frag_range_.first - fragRange.first - 1);
424  ReceivedDataSample back_split = data.get_fragment_range(fr.frag_range_.first - fragRange.first);
425  data.clear();
426  const bool r1 = insert(FragmentRange(fragRange.first, fr.frag_range_.first - 1), front_split);
427  const bool r2 = insert(FragmentRange(fr.frag_range_.first, fragRange.second), back_split);
428  return r1 || r2; // r1 will likely always be true, but check both
429 
430  } else if (fragRange.first < fr.frag_range_.second && fr.frag_range_.second < fragRange.second) {
431  // split and recursively insert just the back
432  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q in order to recursively insert %q-%q\n", sn, fragRange.first, fragRange.second, fr.frag_range_.second + 1, fragRange.second));
433  ReceivedDataSample back_split = data.get_fragment_range(fr.frag_range_.second - fragRange.first);
434  data.clear();
435  return insert(FragmentRange(fr.frag_range_.second + 1, fragRange.second), back_split);
436 
437  } else if (prev == fr.frag_range_.second) {
438  // combine on right of fr
439  if (!fr.rec_ds_.has_data()) {
440  fr.rec_ds_.header_.more_fragments_ = true;
441  }
442  DataSampleHeader joined;
443  if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
444  join_err("right");
445  return false;
446  }
447  fr.rec_ds_.header_ = joined;
448  if (fr.rec_ds_.has_data() && data.has_data()) {
449  fr.rec_ds_.append(data);
450  } else {
451  fr.rec_ds_.clear();
452  data.clear();
453  }
454 
455  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the right, removing and recursingly inserting\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
456 
457  FragmentRange range(fr.frag_range_.first, fragRange.second);
459 
460  sample_list_.erase(it);
461  sample_finder_.erase(fit);
462 
463  return insert(range, copy);
464 
465  } else if (fr.frag_range_.first <= fragRange.first && fr.frag_range_.second >= fragRange.second) {
466  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) duplicate fragment range %q-%q, dropping\n", sn, fragRange.first, fragRange.second));
467  return false;
468  }
469  ++fit;
470  }
471 
472  // add to end of list
473  sample_finder_[fragRange.second] = sample_list_.insert(sample_list_.end(), FragSample(fragRange, data));
474  VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) inserting %q-%q at the end of the fragment buffer list\n", sn, fragRange.first, fragRange.second));
475  data.clear();
476  return true;
477 }
478 
479 }
480 }
481 
DataSampleHeader header_
The demarshalled sample header.
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
ACE_CDR::Long Long
#define ACE_ERROR(X)
FragKey(const GUID_t &pubId, const SequenceNumber &dataSampleSeq)
bool has_data() const
true if at least one Data Block is stored (even if it has 0 useable bytes)
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
ReceivedDataSample get_fragment_range(FragmentNumber start_frag, FragmentNumber end_frag=INVALID_FRAGMENT)
sequence< octet > key
TransportReassembly(const TimeDuration &timeout=TimeDuration(300))
const char * c_str() const
void check_expirations(const MonotonicTimePoint &now)
bool has_frags(const SequenceNumber &seq, const GUID_t &pub_id) const
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
typedef OPENDDS_LIST(ElementType) ExpirationQueue
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LM_DEBUG
ACE_CDR::ULong ULong
#define VDBG(DBG_ARGS)
void data_unavailable(const FragmentRange &transportSeqDropped)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Holds a data sample received by the transport.
SequenceNumber::Value FragmentNumber
bool more_fragments_
The current "Data Sample" needs reassembly before further processing.
ACE_UINT32 ULong
static bool join(const DataSampleHeader &first, const DataSampleHeader &second, DataSampleHeader &result)
ACE_TEXT("TCP_Factory")
void clear_completed(const GUID_t &pub_id)
SequenceNumber data_sample_seq_
Sequence number abstraction. Only allows positive 64 bit values.
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
static GUID_tKeyLessThan compare_
FragSample(const FragmentRange &fragRange, const ReceivedDataSample &data)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
CORBA::ULong get_gaps(const SequenceNumber &msg_seq, const GUID_t &pub_id, CORBA::Long bitmap[], CORBA::ULong length, CORBA::ULong &numBits) const
bool insert(const FragmentRange &fragRange, ReceivedDataSample &data)
static bool fill_bitmap_range(ACE_CDR::ULong low, ACE_CDR::ULong high, ACE_CDR::Long bitmap[], ACE_CDR::ULong length, ACE_CDR::ULong &num_bits, ACE_CDR::ULong &cumulative_bits_added)
Set the bits in range [low, high] in the bitmap, updating num_bits.
void append(ReceivedDataSample &suffix)
Update this ReceivedDataSample&#39;s data payload to include the suffix&#39;s data payload after any existing...
void prepend(ReceivedDataSample &prefix)
Update this ReceivedDataSample&#39;s data payload to include the prefix&#39;s data payload before any existin...
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
bool reassemble_i(const FragmentRange &fragRange, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags)