OpenDDS  Snapshot(2023/04/28-20:55)
ReliableSession.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "ReliableSession.h"
9 
10 #include "MulticastDataLink.h"
11 #include "MulticastInst.h"
13 
14 #include "ace/Global_Macros.h"
15 #include "ace/Truncate.h"
16 
17 #include "dds/DCPS/Serializer.h"
18 #include "dds/DCPS/GuidConverter.h"
19 #include "dds/DCPS/TimeTypes.h"
20 
21 #include <cstdlib>
22 
24 
25 namespace OpenDDS {
26 namespace DCPS {
27 
28 namespace {
29  const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
30  const Encoding encoding_unaligned_native(encoding_kind);
31 }
32 
34  MulticastDataLink* link,
35  MulticastPeer remote_peer)
36  : MulticastSession(interceptor, link, remote_peer)
37  , nak_watchdog_(make_rch<Sporadic>(TheServiceParticipant->time_source(),
38  interceptor,
39  rchandle_from(this),
40  &ReliableSession::process_naks))
41 {}
42 
44 {
45  nak_watchdog_->cancel();
46 }
47 
48 bool
50 {
51  // Not from the remote peer for this session.
52  if (this->remote_peer_ != header.source_) return false;
53 
54  // Active sessions don't need to track nak_sequence_
55  if (this->active_) return true;
56 
57  // Update last seen sequence for remote peer; return false if we
58  // have already seen this datagram to prevent duplicate delivery
59  // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd
60  return this->nak_sequence_.insert(SequenceRange(
61  header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
62  header.sequence_));
63 }
64 
65 void
67 {
68  // Not from the remote peer for this session.
69  if (this->remote_peer_ != header.source_) return;
70 
71  // Active sessions don't need to track nak_sequence_
72  if (this->active_) return;
73 
74  // Update nak sequence for seen sequence from remote peer
75  this->nak_sequence_.insert(header.sequence_);
77 }
78 
79 
80 bool
82  const ReceivedDataSample& data)
83 {
84  if (!acked()
87  || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
88  || (nak_sequence_.empty() && header.sequence_ > 1)) {
89 
90  if (Transport_debug_level > 5) {
91  LogGuid writer(data.header_.publication_id_);
92  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
93  ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
94  header.sequence_.getValue(),
95  data.header_.sequence_.getValue(),
96  writer.c_str()));
97  }
98  {
100 
101  held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
102 
103  if (Transport_debug_level > 5) {
104  OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
105  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
106  ACE_TEXT(" held_ data currently contains: %d samples\n"),
107  held_.size()));
108  while (it != held_.end()) {
109  LogGuid writer(it->second.header_.publication_id_);
110  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
111  ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
112  it->first.getValue(),
113  it->second.header_.sequence_.getValue(),
114  writer.c_str()));
115  ++it;
116  }
117  }
118  }
120  return false;
121  } else {
122  if (Transport_debug_level > 5) {
123  LogGuid writer(data.header_.publication_id_);
124  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
125  ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
126  header.sequence_.getValue(),
127  data.header_.sequence_.getValue(),
128  writer.c_str()));
129  }
130  return true;
131  }
132 }
133 
134 void
136 {
137  if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
138 
141 
142  {
144 
145  typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
146  const iter end = this->held_.upper_bound(ca);
147  for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) {
148  if (Transport_debug_level > 5) {
149  LogGuid writer(it->second.header_.publication_id_);
150  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
151  ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
152  it->first.getValue(),
153  it->second.header_.sequence_.getValue(),
154  writer.c_str()));
155  }
156  to_deliver.push_back(it->second);
157  this->held_.erase(it++);
158  }
159  }
160  for (size_t i = 0; i < to_deliver.size(); ++i) {
161  this->link_->data_received(to_deliver.at(i));
162  }
163 }
164 
165 void
167 {
169  if (!held_.empty()) {
170  OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
171  while (it != held_.end()) {
172  if (it->second.header_.publication_id_ == remote) {
173  held_.erase(it++);
174  } else {
175  it++;
176  }
177  }
178  }
179 }
180 
181 
182 bool
184  const Message_Block_Ptr& control)
185 {
186  if (MulticastSession::control_received(submessage_id, control)) {
187  return true; // base class handled message
188  }
189 
190  switch (submessage_id) {
191  case MULTICAST_NAK:
192  nak_received(control);
193  break;
194 
195  case MULTICAST_NAKACK:
196  nakack_received(control);
197  break;
198 
199  default:
201  ACE_TEXT("(%P|%t) WARNING: ")
202  ACE_TEXT("ReliableSession::control_received: ")
203  ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
204  submessage_id));
205  break;
206  }
207  return true;
208 }
209 
210 void
212 {
213  const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
214  this->nak_sequence_.reset();
215  this->nak_sequence_.insert(seq);
216 
217  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
218  iter != ranges.end(); ++iter) {
219  this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
220  }
221 }
222 
223 void
225 {
226  if (this->nak_requests_.empty()) return; // nothing to expire
227 
228  MulticastInst_rch cfg = link_->config();
229  const TimeDuration timeout = cfg ? cfg->nak_timeout_: TimeDuration::from_msec(MulticastInst::DEFAULT_NAK_TIMEOUT);
230  const MonotonicTimePoint deadline(MonotonicTimePoint::now() - timeout);
231  NakRequestMap::iterator first(this->nak_requests_.begin());
232  NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
233 
234  if (first == last) return; // nothing to expire
235 
236  // Skip unrecoverable datagrams to
237  // re-establish a baseline to detect future reception gaps.
238  SequenceNumber lastSeq = (last == this->nak_requests_.end())
239  ? this->nak_requests_.rbegin()->second
240  : last->second;
241 
242  std::vector<SequenceRange> dropped;
244  lastSeq), dropped)) {
245 
246  for (size_t i = 0; i < dropped.size(); ++i) {
247  const SequenceRange& sr = dropped[i];
248  reassembly_.data_unavailable(FragmentRange(sr.first.getValue(), sr.second.getValue()));
249  }
250 
251  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ReliableSession::expire_naks: ")
252  ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
253  (unsigned int)(this->remote_peer_ >> 32),
254  (unsigned int) this->remote_peer_,
255  this->nak_sequence_.low().getValue(),
256  lastSeq.getValue()));
257  }
258 
259  // Clear expired repair requests:
260  this->nak_requests_.erase(first, last);
262 }
263 
264 void
266 {
267  // Could get data samples before syn control message.
268  // No use nak'ing until syn control message is received and session is acked.
269  if (!this->acked()) {
270  if (DCPS_debug_level > 5) {
271  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
272  ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
273  (unsigned int)(this->link()->local_peer() >> 32),
274  (unsigned int) this->link()->local_peer(),
275  (unsigned int)(this->remote_peer_ >> 32),
276  (unsigned int) this->remote_peer_));
277  }
278  return;
279  }
280 
281  if (DCPS_debug_level > 5) {
282  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
283  ACE_TEXT("remote %#08x%08x nak request size %d\n"),
284  (unsigned int)(this->link()->local_peer() >> 32),
285  (unsigned int) this->link()->local_peer(),
286  (unsigned int)(this->remote_peer_ >> 32),
287  (unsigned int) this->remote_peer_,
288  this->nak_requests_.size()));
289  }
290 
291  if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
292  if (DCPS_debug_level > 5) {
293  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
294  ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks\n"),
295  (unsigned int)(this->link()->local_peer() >> 32),
296  (unsigned int) this->link()->local_peer(),
297  (unsigned int)(this->remote_peer_ >> 32),
298  (unsigned int) this->remote_peer_));
299  }
300 
301  if (DCPS_debug_level > 9) {
302  const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
303  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
304  iter != ranges.end(); ++iter) {
305  ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
306  (unsigned int)(this->link()->local_peer() >> 32),
307  (unsigned int) this->link()->local_peer(),
308  (unsigned int)(this->remote_peer_ >> 32),
309  (unsigned int) this->remote_peer_,
310  iter->first.getValue(),
311  iter->second.getValue()));
312  }
313  }
314  return; // nothing to send
315  }
316 
317  // Record low-water mark for this interval; this value will
318  // be used to reset the low-water mark in the event the remote
319  // peer becomes unresponsive:
321  if (this->nak_sequence_.low() > 1) {
322  this->nak_requests_[now] = SequenceNumber();
323  } else {
324  this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
325  }
326 
327  typedef std::vector<SequenceRange> RangeVector;
328  RangeVector ignored;
329 
330  /// The range first - second will be skipped (no naks sent for it).
331  SequenceNumber first;
332  SequenceNumber second;
333 
334  NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
335 
336  if (this->nak_requests_.size() > 1) {
337  // The sequences between rbegin - 1 and rbegin will not be ignored for naking.
338  ++itr;
339 
340  MulticastInst_rch cfg = link_->config();
341  size_t nak_delay_intervals = cfg ? cfg->nak_delay_intervals_ : MulticastInst::DEFAULT_NAK_DELAY_INTERVALS;
342  size_t nak_max = cfg ? cfg->nak_max_ : MulticastInst::DEFAULT_NAK_MAX;
343  size_t sz = nak_requests_.size();
344 
345  // Image i is the index of element in nak_requests_ in reverse order.
346  // index 0 sequence is most recent high water mark.
347  // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
348  // 0 (rbegin) is always skipped because missing sample between 1 and 0 interval
349  // should always be naked.,
350  // if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11
351  // are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for
352  // naking due to nak_max.
353  for (size_t i = 1; i < sz; ++i) {
354  if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
355  if (first != SequenceNumber()) {
356  first = this->nak_requests_.begin()->second;
357  }
358  else {
359  ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
360  }
361  break;
362  }
363 
364  if (i % (nak_delay_intervals + 1) == 1) {
365  second = itr->second;
366  }
367  if (second != SequenceNumber()) {
368  first = itr->second;
369  }
370 
371  if (i % (nak_delay_intervals + 1) == 0) {
372  first = itr->second;
373 
374  if (first != SequenceNumber() && second != SequenceNumber()) {
375  ignored.push_back(std::make_pair(first, second));
376  first = SequenceNumber();
377  second = SequenceNumber();
378  }
379  }
380 
381  ++itr;
382  }
383 
384  if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
385  ignored.push_back(std::make_pair(first, second));
386  }
387  }
388 
389  // Take a copy to facilitate temporary suppression:
390  DisjointSequence received(this->nak_sequence_);
391  if (DCPS_debug_level > 0) {
392  received.dump();
393  }
394 
395  size_t sz = ignored.size();
396  for (size_t i = 0; i < sz; ++i) {
397 
398  if (ignored[i].second > received.cumulative_ack()) {
399  SequenceNumber high = ignored[i].second;
400  SequenceNumber low = ignored[i].first;
401  if (low < received.cumulative_ack()) {
402  low = received.cumulative_ack();
403  }
404 
405  if (DCPS_debug_level > 5) {
406  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
407  ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
408  (unsigned int)(this->link()->local_peer() >> 32),
409  (unsigned int) this->link()->local_peer(),
410  (unsigned int)(this->remote_peer_ >> 32),
411  (unsigned int) this->remote_peer_,
412  low.getValue(), high.getValue()));
413  }
414 
415  // Make contiguous between ignored sequences.
416  received.insert(SequenceRange(low, high));
417  }
418  }
419 
420  for (NakPeerSet::iterator it(this->nak_peers_.begin());
421  it != this->nak_peers_.end(); ++it) {
422  // Update sequence to temporarily suppress repair requests for
423  // ranges already requested by other peers for this interval:
424  received.insert(*it);
425  }
426  bool sending_naks = false;
427  if (received.low() > 1){
428  //Special case: nak from beginning to make sure no missing sequence
429  //number below the first received
430  sending_naks = true;
431  std::vector<SequenceRange> ranges;
432  ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
433 
434  CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
435 
436  size_t len = sizeof(this->remote_peer_)
437  + sizeof(size)
438  + size * 2 * sizeof(SequenceNumber);
439 
440  Message_Block_Ptr data(new ACE_Message_Block(len));
441 
442  Serializer serializer(data.get(), encoding_unaligned_native);
443 
444  serializer << this->remote_peer_;
445  serializer << size;
446  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
447  iter != ranges.end(); ++iter) {
448  serializer << iter->first;
449  serializer << iter->second;
450  if (DCPS_debug_level > 0) {
451  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
452  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
453  (unsigned int)(this->link()->local_peer() >> 32),
454  (unsigned int) this->link()->local_peer(),
455  (unsigned int)(this->remote_peer_ >> 32),
456  (unsigned int) this->remote_peer_,
457  iter->first.getValue(), iter->second.getValue()));
458  }
459  }
460  // Send control sample to remote peer:
462  }
463  if (received.disjoint()) {
464  sending_naks = true;
465  send_naks(received);
466  }
467 
468  if (!sending_naks && DCPS_debug_level > 5){
469  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
470  ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks\n"),
471  (unsigned int)(this->link()->local_peer() >> 32),
472  (unsigned int) this->link()->local_peer(),
473  (unsigned int)(this->remote_peer_ >> 32),
474  (unsigned int) this->remote_peer_));
475  }
476 
477  // Clear peer repair requests:
478  this->nak_peers_.clear();
479 }
480 
481 void
483 {
484  if (!this->active_) return; // sub send naks, then doesn't receive them.
485 
486  const TransportHeader& header =
488 
489  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
490 
491  MulticastPeer local_peer;
492  CORBA::ULong size = 0;
493  serializer >> local_peer; // sent as remote_peer
494  serializer >> size;
495 
496  std::vector<SequenceRange> ranges;
497 
498  for (CORBA::ULong i = 0; i < size; ++i) {
499  SequenceRange range;
500  serializer >> range.first;
501  serializer >> range.second;
502  ranges.push_back(range);
503  }
504 
505  // Ignore sample if not destined for us:
506  if ((local_peer != this->link_->local_peer()) // Not to us.
507  || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session.
508 
509  SingleSendBuffer* send_buffer = this->link_->send_buffer();
510  // Broadcast a MULTICAST_NAKACK control sample before resending to suppress
511  // repair requests for unrecoverable samples by providing a
512  // new low-water mark for affected peers:
514  {
515  const SingleSendBuffer::Proxy proxy(*send_buffer);
516  if (!proxy.empty() && proxy.low() > ranges.begin()->first) {
519  ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
520  ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
521  (unsigned int)(this->link()->local_peer() >> 32),
522  (unsigned int) this->link()->local_peer(),
523  (unsigned int)(this->remote_peer_ >> 32),
524  (unsigned int) this->remote_peer_,
525  proxy.low().getValue()));
526  }
527  sn = proxy.low();
528  }
529  }
531  send_nakack(sn);
532  }
533 
534  for (CORBA::ULong i = 0; i < size; ++i) {
535  bool ret = send_buffer->resend(ranges[i]);
538  ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
539  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
540  (unsigned int)(this->link()->local_peer() >> 32),
541  (unsigned int) this->link()->local_peer(),
542  (unsigned int)(this->remote_peer_ >> 32),
543  (unsigned int) this->remote_peer_,
544  ranges[i].first.getValue(), ranges[i].second.getValue(),
545  ret ? "SUCCESS" : "FAILED"));
546  }
547  }
548 }
549 
550 void
552 {
553  const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
554 
555  CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
556 
557  size_t len = sizeof(this->remote_peer_)
558  + sizeof(size)
559  + size * 2 * sizeof(SequenceNumber);
560 
561  Message_Block_Ptr data(new ACE_Message_Block(len));
562 
563  Serializer serializer(data.get(), encoding_unaligned_native);
564 
565  serializer << this->remote_peer_;
566  serializer << size;
567  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
568  iter != ranges.end(); ++iter) {
569  serializer << iter->first;
570  serializer << iter->second;
572  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
573  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
574  (unsigned int)(this->link()->local_peer() >> 32),
575  (unsigned int) this->link()->local_peer(),
576  (unsigned int)(this->remote_peer_ >> 32),
577  (unsigned int) this->remote_peer_,
578  iter->first.getValue(), iter->second.getValue()));
579  }
580  }
581  // Send control sample to remote peer:
583 }
584 
585 
586 void
588 {
589  if (this->active_) return; // pub send nakack, doesn't receive them.
590 
591  const TransportHeader& header =
593 
594  // Not from the remote peer for this session.
595  if (this->remote_peer_ != header.source_) return;
596 
597  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
598 
599  SequenceNumber low;
600  serializer >> low;
601 
602  // MULTICAST_NAKACK control samples indicate data which cannot be
603  // repaired by a remote peer; if any values were needed below
604  // this value, then the sequence needs to be shifted:
605  std::vector<SequenceRange> dropped;
606  SequenceNumber range_low = SequenceNumber();
607  SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
608 
609  if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
610 
611  this->nak_sequence_.insert(range_low);
612 
613  } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
614 
615  for (size_t i = 0; i < dropped.size(); ++i) {
616  const SequenceRange& sr = dropped[i];
617  reassembly_.data_unavailable(FragmentRange(sr.first.getValue(), sr.second.getValue()));
618  }
619 
620  if (DCPS_debug_level > 0) {
622  ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
623  ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
624  ACE_TEXT("- some ranges dropped.\n"),
625  (unsigned int)(this->link()->local_peer() >> 32),
626  (unsigned int) this->link()->local_peer(),
627  (unsigned int)(this->remote_peer_ >> 32),
628  (unsigned int) this->remote_peer_,
629  low.getValue()));
630  }
631  }
633 }
634 
635 void
637 {
638  size_t len = sizeof(low.getValue());
639 
640  Message_Block_Ptr data(new ACE_Message_Block(len));
641 
642  Serializer serializer(data.get(), encoding_unaligned_native);
643 
644  serializer << low;
645  // Broadcast control sample to all peers:
647 }
648 
649 bool
650 ReliableSession::start(bool active, bool acked)
651 {
652  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
653 
654  if (this->started_) {
655  return true; // already started
656  }
657 
658  this->active_ = active;
659  {
660  //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection
661  ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
662 
663  // A watchdog timer is scheduled to periodically check for gaps in
664  // received data. If a gap is discovered, MULTICAST_NAK control
665  // samples will be sent to initiate repairs.
666  // Only subscriber send naks so just schedule for sub role.
667  if (!active) {
668  if (acked) {
669  this->set_acked();
670  }
671  this->nak_watchdog_->schedule(nak_delay());
672  }
673  } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_
674 
675  return this->started_ = true;
676 }
677 
678 void
680 {
682  this->nak_watchdog_->cancel();
683 }
684 
687 {
688  MulticastInst_rch cfg = link_->config();
689  TimeDuration interval = cfg ? cfg->nak_interval_ : TimeDuration::from_msec(MulticastInst::DEFAULT_NAK_INTERVAL);
690 
691  // Apply random backoff to minimize potential collisions:
692  interval *= static_cast<double>(std::rand()) /
693  static_cast<double>(RAND_MAX) + 1.0;
694 
695  return interval;
696 }
697 
698 void
700 {
701  // Expire outstanding repair requests that have not yet been
702  // fulfilled; this prevents NAK implosions due to remote
703  // peers becoming unresponsive:
704  expire_naks();
705 
706  // Initiate repairs by sending MULTICAST_NAK control samples
707  // to remote peers from which we are missing data:
708  send_naks();
709 
710  nak_watchdog_->schedule(nak_delay());
711 }
712 
713 } // namespace DCPS
714 } // namespace OpenDDS
715 
DataSampleHeader header_
The demarshalled sample header.
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
#define ACE_DEBUG(X)
ACE_INT64 MulticastPeer
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
SequenceNumber cumulative_ack() const
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
RcHandle< Sporadic > nak_watchdog_
bool swap_bytes() const
Determine if the serializer should swap bytes.
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
const char * c_str() const
static const long DEFAULT_NAK_TIMEOUT
Definition: MulticastInst.h:34
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void nak_received(const Message_Block_Ptr &control)
void nakack_received(const Message_Block_Ptr &control)
OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_
static const long DEFAULT_NAK_INTERVAL
Definition: MulticastInst.h:31
virtual void syn_hook(const SequenceNumber &seq)
static const long DEFAULT_NAK_DELAY_INTERVALS
Definition: MulticastInst.h:32
virtual void record_header_received(const TransportHeader &header)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LM_DEBUG
TO truncate_cast(FROM val)
virtual void send_nakack(SequenceNumber low)
ACE_CDR::ULong ULong
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
ReliableSession(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
static TimeDuration from_msec(const ACE_UINT64 &ms)
void data_unavailable(const FragmentRange &transportSeqDropped)
void send_control(char submessage_id, Message_Block_Ptr data)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual bool ready_to_deliver(const TransportHeader &header, const ReceivedDataSample &data)
virtual bool check_header(const TransportHeader &header)
Defines class that represents a transport packet header.
virtual bool start(bool active, bool acked)
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
LM_WARNING
ACE_TEXT("TCP_Factory")
std::pair< SequenceNumber, SequenceNumber > SequenceRange
virtual void release_remote(const GUID_t &remote)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
Sequence number abstraction. Only allows positive 64 bit values.
bool resend(const SequenceRange &range, DisjointSequence *gaps=0)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void process_naks(const MonotonicTimePoint &)
static const long DEFAULT_NAK_MAX
Definition: MulticastInst.h:33
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
MulticastReceiveStrategy * receive_strategy()