Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "TransportReassembly.h"
10 : #include "TransportDebug.h"
11 :
12 : #include "dds/DCPS/GuidConverter.h"
13 : #include "dds/DCPS/DisjointSequence.h"
14 :
15 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
16 :
17 : namespace OpenDDS {
18 : namespace DCPS {
19 :
20 733 : FragKey::FragKey(const GUID_t& pubId,
21 733 : const SequenceNumber& dataSampleSeq)
22 733 : : publication_(pubId)
23 733 : , data_sample_seq_(dataSampleSeq)
24 : {
25 733 : }
26 :
27 : GUID_tKeyLessThan FragKey::compare_;
28 :
29 433 : TransportReassembly::FragSample::FragSample(const FragmentRange& fragRange,
30 433 : const ReceivedDataSample& data)
31 433 : : frag_range_(fragRange)
32 433 : , rec_ds_(data)
33 : {
34 433 : }
35 :
36 152 : TransportReassembly::TransportReassembly(const TimeDuration& timeout)
37 152 : : timeout_(timeout)
38 : {
39 152 : }
40 :
41 : bool
42 3 : TransportReassembly::has_frags(const SequenceNumber& seq,
43 : const GUID_t& pub_id,
44 : ACE_UINT32& total_frags) const
45 : {
46 3 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
47 3 : const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
48 3 : if (iter != fragments_.end()) {
49 2 : total_frags = iter->second.total_frags_;
50 2 : return true;
51 : }
52 1 : return false;
53 3 : }
54 :
55 : void
56 1 : TransportReassembly::clear_completed(const GUID_t& pub_id)
57 : {
58 1 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
59 1 : completed_.erase(pub_id);
60 1 : }
61 :
62 : CORBA::ULong
63 14 : TransportReassembly::get_gaps(const SequenceNumber& seq, const GUID_t& pub_id,
64 : CORBA::Long bitmap[], CORBA::ULong length,
65 : CORBA::ULong& numBits) const
66 : {
67 14 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
68 : // length is number of (allocated) words in bitmap, max of 8
69 : // numBits is number of valid bits in the bitmap, <= length * 32, to account for partial words
70 14 : if (length == 0) {
71 0 : return 0;
72 : }
73 :
74 14 : const FragInfoMap::const_iterator iter = fragments_.find(FragKey(pub_id, seq));
75 14 : if (iter == fragments_.end()) {
76 : // Nothing missing
77 3 : return 0;
78 : }
79 :
80 : // RTPS's FragmentNumbers are 32-bit values, so we'll only be using the
81 : // low 32 bits of the 64-bit generalized sequence numbers in
82 : // FragSample::frag_range_.
83 :
84 11 : const OPENDDS_LIST(FragSample)& flist = iter->second.sample_list_;
85 11 : const FragmentNumber first = flist.front().frag_range_.first;
86 : const CORBA::ULong base = static_cast<CORBA::ULong>((first == 1)
87 10 : ? flist.front().frag_range_.second + 1
88 21 : : 1);
89 :
90 11 : if (first != 1) {
91 : // Represent the "gap" before the first list element.
92 : // base == 1 and the first 2 args to fill_bitmap_range() are deltas of base
93 1 : ACE_CDR::ULong bits_added = 0;
94 1 : DisjointSequence::fill_bitmap_range(0, static_cast<CORBA::ULong>(first - 2),
95 : bitmap, length, numBits, bits_added);
96 10 : } else if (flist.size() == 1) {
97 : // No gaps, but we know there are (at least 1) more_fragments
98 3 : if (iter->second.total_frags_ == 0) {
99 3 : ACE_CDR::ULong bits_added = 0;
100 3 : DisjointSequence::fill_bitmap_range(0, 0, bitmap, length, numBits, bits_added);
101 : } else {
102 0 : const size_t rlimit = static_cast<size_t>(flist.back().frag_range_.second - 1);
103 0 : const CORBA::ULong ulimit = static_cast<CORBA::ULong>(iter->second.total_frags_ - (base < rlimit ? rlimit : base));
104 0 : ACE_CDR::ULong bits_added = 0;
105 0 : DisjointSequence::fill_bitmap_range(0,
106 : ulimit,
107 : bitmap, length, numBits, bits_added);
108 : }
109 : // NOTE: this could send a nack for fragments that are in flight
110 : // need to defer setting bitmap till heartbeat extending logic
111 : // in RtpsUdpDataLink::generate_nack_frags
112 3 : return base;
113 : }
114 :
115 : typedef OPENDDS_LIST(FragSample)::const_iterator list_iterator;
116 16 : for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
117 16 : const list_iterator it_next = ++list_iterator(it);
118 16 : if (it_next == flist.end()) {
119 8 : break;
120 : }
121 8 : const CORBA::ULong low = static_cast<CORBA::ULong>(it->frag_range_.second + 1 - base),
122 8 : high = static_cast<CORBA::ULong>(it_next->frag_range_.first - 1 - base);
123 8 : ACE_CDR::ULong bits_added = 0;
124 8 : DisjointSequence::fill_bitmap_range(low, high, bitmap, length, numBits, bits_added);
125 : }
126 :
127 8 : return base;
128 14 : }
129 :
130 : bool
131 629 : TransportReassembly::reassemble(const FragmentRange& fragRange,
132 : ReceivedDataSample& data,
133 : ACE_UINT32 total_frags)
134 : {
135 629 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
136 1258 : return reassemble_i(fragRange, fragRange.first == 1, data, total_frags);
137 629 : }
138 :
139 : bool
140 87 : TransportReassembly::reassemble(const SequenceNumber& transportSeq,
141 : bool firstFrag,
142 : ReceivedDataSample& data,
143 : ACE_UINT32 total_frags)
144 : {
145 87 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
146 87 : return reassemble_i(FragmentRange(transportSeq.getValue(), transportSeq.getValue()),
147 174 : firstFrag, data, total_frags);
148 87 : }
149 :
150 : bool
151 716 : TransportReassembly::reassemble_i(const FragmentRange& fragRange,
152 : bool firstFrag,
153 : ReceivedDataSample& data,
154 : ACE_UINT32 total_frags)
155 : {
156 716 : if (Transport_debug_level > 5) {
157 0 : LogGuid logger(data.header_.publication_id_);
158 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
159 : "tseq %q-%q first %d dseq %q pub %C\n", fragRange.first,
160 : fragRange.second, firstFrag ? 1 : 0,
161 : data.header_.sequence_.getValue(), logger.c_str()));
162 0 : }
163 :
164 716 : const MonotonicTimePoint now = MonotonicTimePoint::now();
165 716 : check_expirations(now);
166 :
167 716 : const FragKey key(data.header_.publication_id_, data.header_.sequence_);
168 716 : FragInfoMap::iterator iter = fragments_.find(key);
169 716 : const MonotonicTimePoint expiration = now + timeout_;
170 :
171 716 : if (iter == fragments_.end()) {
172 153 : FragInfo& finfo = fragments_[key];
173 153 : finfo = FragInfo(firstFrag, FragInfo::FragSampleList(), total_frags, expiration);
174 153 : finfo.insert(fragRange, data);
175 153 : expiration_queue_.push_back(std::make_pair(expiration, key));
176 153 : data.clear();
177 : // since this is the first fragment we've seen, it can't possibly be done
178 153 : if (Transport_debug_level > 5 || transport_debug.log_fragment_storage) {
179 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
180 : "stored first frag, returning false (incomplete) with %B fragments\n",
181 : fragments_.size()));
182 : }
183 153 : return false;
184 : } else {
185 563 : const CompletedMap::const_iterator citer = completed_.find(key.publication_);
186 563 : if (citer != completed_.end() && citer->second.contains(key.data_sample_seq_)) {
187 : // already completed, not storing or delivering this message
188 3 : return false;
189 : }
190 560 : if (firstFrag) {
191 128 : iter->second.have_first_ = true;
192 : }
193 560 : if (iter->second.total_frags_ < total_frags) {
194 0 : iter->second.total_frags_ = total_frags;
195 : }
196 560 : iter->second.expiration_ = expiration;
197 : }
198 :
199 560 : if (!iter->second.insert(fragRange, data)) {
200 : // error condition, already logged by insert()
201 15 : return false;
202 : }
203 :
204 : // We can deliver data if all three of these conditions are met:
205 : // 1. we've seen the "first fragment" flag [first frag is here]
206 : // 2. all fragments have been coalesced [no gaps in the seq numbers]
207 : // 3. the "more fragments" flag is not set [last frag is here]
208 545 : if (iter->second.have_first_
209 394 : && iter->second.sample_list_.size() == 1
210 939 : && !iter->second.sample_list_.front().rec_ds_.header_.more_fragments_) {
211 145 : std::swap(data, iter->second.sample_list_.front().rec_ds_);
212 145 : fragments_.erase(iter);
213 145 : completed_[key.publication_].insert(key.data_sample_seq_);
214 145 : if (Transport_debug_level > 5 || transport_debug.log_fragment_storage) {
215 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
216 : "removed frag, returning %C with %B fragments\n",
217 : data.has_data() ? "true (complete)" : "false (incomplete)", fragments_.size()));
218 : }
219 145 : return data.has_data(); // could be false if we had data_unavailable()
220 : }
221 :
222 400 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::reassemble_i: "
223 : "returning false (incomplete)\n"));
224 400 : return false;
225 716 : }
226 :
227 : void
228 3 : TransportReassembly::data_unavailable(const FragmentRange& dropped)
229 : {
230 3 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
231 3 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable(): "
232 : "dropped %q-%q\n", dropped.first, dropped.second));
233 : typedef OPENDDS_LIST(FragSample)::iterator list_iterator;
234 :
235 6 : for (FragInfoMap::iterator iter = fragments_.begin(); iter != fragments_.end();
236 3 : ++iter) {
237 3 : const FragKey& key = iter->first;
238 3 : FragInfo& finfo = iter->second;
239 3 : FragInfo::FragSampleList& flist = finfo.sample_list_;
240 :
241 3 : ReceivedDataSample dummy;
242 3 : dummy.header_.sequence_ = key.data_sample_seq_;
243 :
244 : // check if we should expand the front element (only if !have_first)
245 3 : const FragmentNumber prev = flist.front().frag_range_.first - 1;
246 3 : if (dropped.second == prev && !finfo.have_first_) {
247 1 : finfo.have_first_ = true;
248 1 : dummy.header_.more_fragments_ = true;
249 1 : finfo.insert(dropped, dummy);
250 1 : continue;
251 : }
252 :
253 : // find a gap between list elements where "dropped" fits
254 2 : for (list_iterator it = flist.begin(); it != flist.end(); ++it) {
255 2 : list_iterator it_next = it;
256 2 : ++it_next;
257 2 : if (it_next == flist.end()) {
258 2 : break;
259 : }
260 0 : FragSample& fr1 = *it;
261 0 : FragSample& fr2 = *it_next;
262 0 : if (dropped.first > fr1.frag_range_.second
263 0 : && dropped.second < fr2.frag_range_.first) {
264 0 : dummy.header_.more_fragments_ = true;
265 0 : finfo.insert(dropped, dummy);
266 0 : break;
267 : }
268 : }
269 :
270 : // check if we should expand the last element
271 2 : const FragmentNumber next = flist.back().frag_range_.second + 1;
272 2 : if (dropped.first == next) {
273 2 : flist.back().rec_ds_.header_.more_fragments_ = true;
274 2 : finfo.insert(dropped, dummy);
275 : }
276 3 : }
277 3 : }
278 :
279 : void
280 0 : TransportReassembly::data_unavailable(const SequenceNumber& dataSampleSeq,
281 : const GUID_t& pub_id)
282 : {
283 0 : ACE_Guard<ACE_Thread_Mutex> guard(mutex_);
284 0 : if (fragments_.erase(FragKey(pub_id, dataSampleSeq)) &&
285 0 : (Transport_debug_level > 5 || transport_debug.log_fragment_storage)) {
286 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::data_unavailable: "
287 : "removed leaving %B fragments\n", fragments_.size()));
288 : }
289 0 : }
290 :
291 716 : void TransportReassembly::check_expirations(const MonotonicTimePoint& now)
292 : {
293 716 : while (!expiration_queue_.empty() && expiration_queue_.front().first <= now) {
294 0 : FragInfoMap::iterator iter = fragments_.find(expiration_queue_.front().second);
295 0 : if (iter != fragments_.end()) {
296 : // FragInfo::expiration_ may have changed after insertion into expiration_queue_
297 0 : if (iter->second.expiration_ <= now) {
298 0 : fragments_.erase(iter);
299 0 : if (Transport_debug_level > 5 || transport_debug.log_fragment_storage) {
300 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportReassembly::check_expirations: "
301 : "purge expired leaving %B fragments\n", fragments_.size()));
302 : }
303 : } else {
304 0 : expiration_queue_.push_back(std::make_pair(iter->second.expiration_, iter->first));
305 : }
306 : }
307 0 : expiration_queue_.pop_front();
308 : }
309 716 : }
310 :
311 153 : TransportReassembly::FragInfo::FragInfo()
312 153 : : have_first_(false)
313 153 : , total_frags_(0)
314 153 : {}
315 :
316 153 : TransportReassembly::FragInfo::FragInfo(bool hf, const FragSampleList& rl, ACE_UINT32 tf, const MonotonicTimePoint& expiration)
317 153 : : have_first_(hf)
318 153 : , sample_list_(rl)
319 153 : , total_frags_(tf)
320 153 : , expiration_(expiration)
321 : {
322 153 : for (FragSampleList::iterator it = sample_list_.begin(), prev = it; it != sample_list_.end(); ++it) {
323 0 : sample_finder_[it->frag_range_.second] = it;
324 0 : if (it != sample_list_.begin()) {
325 0 : gap_list_.push_back(FragmentRange(prev->frag_range_.second + 1, it->frag_range_.first - 1));
326 : }
327 0 : prev = it;
328 : }
329 153 : for (FragGapList::iterator it = gap_list_.begin(); it != gap_list_.end(); ++it) {
330 0 : gap_finder_[it->second] = it;
331 : }
332 153 : }
333 :
334 0 : TransportReassembly::FragInfo::FragInfo(const FragInfo& val)
335 : {
336 0 : *this = val;
337 0 : }
338 :
339 : TransportReassembly::FragInfo&
340 153 : TransportReassembly::FragInfo::operator=(const FragInfo& rhs)
341 : {
342 153 : if (this != &rhs) {
343 153 : have_first_ = rhs.have_first_;
344 153 : sample_list_ = rhs.sample_list_;
345 153 : gap_list_ = rhs.gap_list_;
346 153 : total_frags_ = rhs.total_frags_;
347 153 : expiration_ = rhs.expiration_;
348 153 : sample_finder_.clear();
349 153 : gap_finder_.clear();
350 153 : for (FragSampleList::iterator it = sample_list_.begin(); it != sample_list_.end(); ++it) {
351 0 : sample_finder_[it->frag_range_.second] = it;
352 : }
353 153 : for (FragGapList::iterator it = gap_list_.begin(); it != gap_list_.end(); ++it) {
354 0 : gap_finder_[it->second] = it;
355 : }
356 : }
357 153 : return *this;
358 : }
359 :
360 : namespace {
361 0 : inline void join_err(const char* detail)
362 : {
363 0 : ACE_ERROR((LM_ERROR,
364 : ACE_TEXT("(%P|%t) ERROR: TransportReassembly::FragInfo::insert: ")
365 : ACE_TEXT("DataSampleHeaders could not be joined: %C\n"), detail));
366 0 : }
367 : }
368 :
369 : bool
370 1002 : TransportReassembly::FragInfo::insert(const FragmentRange& fragRange, ReceivedDataSample& data)
371 : {
372 1002 : const FragmentNumber prev = fragRange.first - 1, next = fragRange.second + 1;
373 :
374 1002 : FragSampleList::iterator start = sample_list_.begin();
375 1002 : FragSampleListIterMap::iterator fit = sample_finder_.begin();
376 1002 : if (!sample_list_.empty()) {
377 756 : fit = sample_finder_.lower_bound(prev);
378 756 : if (fit != sample_finder_.end()) {
379 662 : start = fit->second;
380 662 : if (start->frag_range_.second != prev && start != sample_list_.begin()) {
381 44 : --start;
382 44 : --fit;
383 : }
384 : } else {
385 94 : start = sample_list_.end();
386 94 : --start;
387 94 : --fit;
388 : }
389 : }
390 :
391 1002 : const SequenceNumber::Value sn = data.header_.sequence_.getValue();
392 :
393 1140 : for (FragSampleList::iterator it = start; it != sample_list_.end(); ++it) {
394 800 : FragSample& fr = *it;
395 800 : if (next < fr.frag_range_.first) {
396 : // insert before 'it'
397 93 : sample_finder_[fragRange.second] = sample_list_.insert(it, FragSample(fragRange, data));
398 93 : data.clear();
399 93 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) inserted %q-%q on the left of %q-%q\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
400 662 : return true;
401 :
402 707 : } else if (next == fr.frag_range_.first) {
403 : // combine on left of fr
404 270 : DataSampleHeader joined;
405 270 : if (!DataSampleHeader::join(data.header_, fr.rec_ds_.header_, joined)) {
406 0 : join_err("left");
407 0 : return false;
408 : }
409 270 : fr.rec_ds_.header_ = joined;
410 270 : if (fr.rec_ds_.has_data() && data.has_data()) {
411 261 : fr.rec_ds_.prepend(data);
412 : } else {
413 9 : fr.rec_ds_.clear();
414 9 : data.clear();
415 : }
416 270 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the left\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
417 270 : fr.frag_range_.first = fragRange.first;
418 270 : return true;
419 :
420 707 : } else if (fragRange.first < fr.frag_range_.first) {
421 : // split and recursively insert both parts
422 3 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q into %q-%q and %q-%q and recursively inserting both\n", sn, fragRange.first, fragRange.second, fragRange.first, fr.frag_range_.first - 1, fr.frag_range_.first, fragRange.second));
423 3 : ReceivedDataSample front_split = data.get_fragment_range(0, fr.frag_range_.first - fragRange.first - 1);
424 3 : ReceivedDataSample back_split = data.get_fragment_range(fr.frag_range_.first - fragRange.first);
425 3 : data.clear();
426 3 : const bool r1 = insert(FragmentRange(fragRange.first, fr.frag_range_.first - 1), front_split);
427 3 : const bool r2 = insert(FragmentRange(fr.frag_range_.first, fragRange.second), back_split);
428 3 : return r1 || r2; // r1 will likely always be true, but check both
429 :
430 437 : } else if (fragRange.first < fr.frag_range_.second && fr.frag_range_.second < fragRange.second) {
431 : // split and recursively insert just the back
432 3 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) splitting %q-%q in order to recursively insert %q-%q\n", sn, fragRange.first, fragRange.second, fr.frag_range_.second + 1, fragRange.second));
433 3 : ReceivedDataSample back_split = data.get_fragment_range(fr.frag_range_.second - fragRange.first);
434 3 : data.clear();
435 3 : return insert(FragmentRange(fr.frag_range_.second + 1, fragRange.second), back_split);
436 :
437 434 : } else if (prev == fr.frag_range_.second) {
438 : // combine on right of fr
439 277 : if (!fr.rec_ds_.has_data()) {
440 4 : fr.rec_ds_.header_.more_fragments_ = true;
441 : }
442 277 : DataSampleHeader joined;
443 277 : if (!DataSampleHeader::join(fr.rec_ds_.header_, data.header_, joined)) {
444 0 : join_err("right");
445 0 : return false;
446 : }
447 277 : fr.rec_ds_.header_ = joined;
448 277 : if (fr.rec_ds_.has_data() && data.has_data()) {
449 269 : fr.rec_ds_.append(data);
450 : } else {
451 8 : fr.rec_ds_.clear();
452 8 : data.clear();
453 : }
454 :
455 277 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) combined %q-%q with %q-%q on the right, removing and recursingly inserting\n", sn, fragRange.first, fragRange.second, fr.frag_range_.first, fr.frag_range_.second));
456 :
457 277 : FragmentRange range(fr.frag_range_.first, fragRange.second);
458 277 : ReceivedDataSample copy(fr.rec_ds_);
459 :
460 277 : sample_list_.erase(it);
461 277 : sample_finder_.erase(fit);
462 :
463 277 : return insert(range, copy);
464 :
465 431 : } else if (fr.frag_range_.first <= fragRange.first && fr.frag_range_.second >= fragRange.second) {
466 16 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) duplicate fragment range %q-%q, dropping\n", sn, fragRange.first, fragRange.second));
467 16 : return false;
468 : }
469 138 : ++fit;
470 : }
471 :
472 : // add to end of list
473 340 : sample_finder_[fragRange.second] = sample_list_.insert(sample_list_.end(), FragSample(fragRange, data));
474 340 : VDBG((LM_DEBUG, "(%P|%t) TransportReassembly::insert: (SN: %q) inserting %q-%q at the end of the fragment buffer list\n", sn, fragRange.first, fragRange.second));
475 340 : data.clear();
476 340 : return true;
477 : }
478 :
479 : }
480 : }
481 :
482 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|