OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::ReliableSession Class Reference

#include <ReliableSession.h>

Inheritance diagram for OpenDDS::DCPS::ReliableSession:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReliableSession:
Collaboration graph
[legend]

Public Member Functions

 ReliableSession (RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
 
 ~ReliableSession ()
 
virtual bool check_header (const TransportHeader &header)
 
virtual void record_header_received (const TransportHeader &header)
 
virtual bool ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data)
 
void deliver_held_data ()
 
virtual void release_remote (const GUID_t &remote)
 
virtual bool control_received (char submessage_id, const Message_Block_Ptr &control)
 
void expire_naks ()
 
void send_naks ()
 
void nak_received (const Message_Block_Ptr &control)
 
void send_naks (DisjointSequence &found)
 
void nakack_received (const Message_Block_Ptr &control)
 
virtual void send_nakack (SequenceNumber low)
 
virtual bool start (bool active, bool acked)
 
virtual void stop ()
 
virtual bool is_reliable ()
 
virtual void syn_hook (const SequenceNumber &seq)
 
- Public Member Functions inherited from OpenDDS::DCPS::MulticastSession
virtual ~MulticastSession ()
 
MulticastDataLinklink ()
 
MulticastPeer remote_peer () const
 
bool acked ()
 
void set_acked ()
 
void syn_received (const Message_Block_Ptr &control)
 
void send_all_syn (const MonotonicTimePoint &now)
 
void send_syn (const GUID_t &local_writer, const GUID_t &remote_reader)
 
void synack_received (const Message_Block_Ptr &control)
 
void send_synack (const GUID_t &local_reader, const GUID_t &remote_writer)
 
bool reassemble (ReceivedDataSample &data, const TransportHeader &header)
 
void add_remote (const GUID_t &local)
 
void add_remote (const GUID_t &local, const GUID_t &remote)
 
void remove_remote (const GUID_t &local, const GUID_t &remote)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Private Types

typedef PmfSporadicTask< ReliableSessionSporadic
 
typedef SequenceNumber TransportHeaderSN
 

Private Member Functions

TimeDuration nak_delay ()
 
void process_naks (const MonotonicTimePoint &)
 
typedef OPENDDS_MAP (MonotonicTimePoint, SequenceNumber) NakRequestMap
 
 OPENDDS_MULTIMAP (TransportHeaderSN, ReceivedDataSample) held_
 
typedef OPENDDS_SET (SequenceRange) NakPeerSet
 

Private Attributes

RcHandle< Sporadicnak_watchdog_
 
DisjointSequence nak_sequence_
 
NakRequestMap nak_requests_
 
ACE_Thread_Mutex held_lock_
 
NakPeerSet nak_peers_
 

Additional Inherited Members

- Protected Types inherited from OpenDDS::DCPS::MulticastSession
typedef ACE_Reverse_Lock< ACE_Thread_MutexReverse_Lock_t
 
- Protected Member Functions inherited from OpenDDS::DCPS::MulticastSession
 MulticastSession (RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
 
void send_control (char submessage_id, Message_Block_Ptr data)
 
void start_syn ()
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingRemoteMap
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::MulticastSession
MulticastDataLinklink_
 
MulticastPeer remote_peer_
 
ACE_Thread_Mutex start_lock_
 
Reverse_Lock_t reverse_start_lock_
 
bool started_
 
bool active_
 
TransportReassembly reassembly_
 
bool acked_
 
PendingRemoteMap pending_remote_map_
 

Detailed Description

Definition at line 27 of file ReliableSession.h.

Member Typedef Documentation

◆ Sporadic

Definition at line 63 of file ReliableSession.h.

◆ TransportHeaderSN

Definition at line 74 of file ReliableSession.h.

Constructor & Destructor Documentation

◆ ReliableSession()

OpenDDS::DCPS::ReliableSession::ReliableSession ( RcHandle< ReactorInterceptor interceptor,
MulticastDataLink link,
MulticastPeer  remote_peer 
)

Definition at line 33 of file ReliableSession.cpp.

36  : MulticastSession(interceptor, link, remote_peer)
37  , nak_watchdog_(make_rch<Sporadic>(TheServiceParticipant->time_source(),
38  interceptor,
39  rchandle_from(this),
41 {}
void process_naks(const MonotonicTimePoint &)
MulticastSession(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
RcHandle< Sporadic > nak_watchdog_
MulticastPeer remote_peer() const
#define TheServiceParticipant

◆ ~ReliableSession()

OpenDDS::DCPS::ReliableSession::~ReliableSession ( )

Definition at line 43 of file ReliableSession.cpp.

References nak_watchdog_.

44 {
45  nak_watchdog_->cancel();
46 }
RcHandle< Sporadic > nak_watchdog_

Member Function Documentation

◆ check_header()

bool OpenDDS::DCPS::ReliableSession::check_header ( const TransportHeader header)
virtual

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 49 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::sequence_, and OpenDDS::DCPS::TransportHeader::source_.

50 {
51  // Not from the remote peer for this session.
52  if (this->remote_peer_ != header.source_) return false;
53 
54  // Active sessions don't need to track nak_sequence_
55  if (this->active_) return true;
56 
57  // Update last seen sequence for remote peer; return false if we
58  // have already seen this datagram to prevent duplicate delivery
59  // Note: SN 2 is first SN recorded - fill in up to 2 when rcvd
60  return this->nak_sequence_.insert(SequenceRange(
61  header.sequence_ == 2 ? SequenceNumber() : header.sequence_,
62  header.sequence_));
63 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ control_received()

bool OpenDDS::DCPS::ReliableSession::control_received ( char  submessage_id,
const Message_Block_Ptr control 
)
virtual

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 183 of file ReliableSession.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::MulticastSession::control_received(), LM_WARNING, OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MULTICAST_NAKACK, nak_received(), and nakack_received().

185 {
186  if (MulticastSession::control_received(submessage_id, control)) {
187  return true; // base class handled message
188  }
189 
190  switch (submessage_id) {
191  case MULTICAST_NAK:
192  nak_received(control);
193  break;
194 
195  case MULTICAST_NAKACK:
196  nakack_received(control);
197  break;
198 
199  default:
200  ACE_ERROR((LM_WARNING,
201  ACE_TEXT("(%P|%t) WARNING: ")
202  ACE_TEXT("ReliableSession::control_received: ")
203  ACE_TEXT("unknown TRANSPORT_CONTROL submessage: 0x%x!\n"),
204  submessage_id));
205  break;
206  }
207  return true;
208 }
#define ACE_ERROR(X)
void nak_received(const Message_Block_Ptr &control)
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
void nakack_received(const Message_Block_Ptr &control)
ACE_TEXT("TCP_Factory")

◆ deliver_held_data()

void OpenDDS::DCPS::ReliableSession::deliver_held_data ( )

Definition at line 135 of file ReliableSession.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DataLink::data_received(), OpenDDS::DCPS::DisjointSequence::empty(), held_lock_, OpenDDS::DCPS::MulticastSession::link_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OpenDDS::DCPS::OPENDDS_VECTOR(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by expire_naks(), nakack_received(), ready_to_deliver(), and record_header_received().

136 {
137  if (nak_sequence_.empty() || nak_sequence_.low() > 1) return;
138 
139  OPENDDS_VECTOR(ReceivedDataSample) to_deliver;
140  const SequenceNumber ca = nak_sequence_.cumulative_ack();
141 
142  {
144 
145  typedef OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator iter;
146  const iter end = this->held_.upper_bound(ca);
147  for (iter it = this->held_.begin(); it != end; /*increment in loop body*/) {
148  if (Transport_debug_level > 5) {
149  LogGuid writer(it->second.header_.publication_id_);
150  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::deliver_held_data -")
151  ACE_TEXT(" deliver tseq: %q dseq: %q from %C\n"),
152  it->first.getValue(),
153  it->second.header_.sequence_.getValue(),
154  writer.c_str()));
155  }
156  to_deliver.push_back(it->second);
157  this->held_.erase(it++);
158  }
159  }
160  for (size_t i = 0; i < to_deliver.size(); ++i) {
161  this->link_->data_received(to_deliver.at(i));
162  }
163 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
SequenceNumber cumulative_ack() const
ACE_TEXT("TCP_Factory")
OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690

◆ expire_naks()

void OpenDDS::DCPS::ReliableSession::expire_naks ( )

Definition at line 224 of file ReliableSession.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::TransportReassembly::data_unavailable(), OpenDDS::DCPS::MulticastInst::DEFAULT_NAK_TIMEOUT, deliver_held_data(), OpenDDS::DCPS::TimeDuration::from_msec(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link_, LM_WARNING, OpenDDS::DCPS::DisjointSequence::low(), nak_requests_, nak_sequence_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::MulticastSession::reassembly_, and OpenDDS::DCPS::MulticastSession::remote_peer_.

Referenced by process_naks().

225 {
226  if (this->nak_requests_.empty()) return; // nothing to expire
227 
228  MulticastInst_rch cfg = link_->config();
229  const TimeDuration timeout = cfg ? cfg->nak_timeout_: TimeDuration::from_msec(MulticastInst::DEFAULT_NAK_TIMEOUT);
230  const MonotonicTimePoint deadline(MonotonicTimePoint::now() - timeout);
231  NakRequestMap::iterator first(this->nak_requests_.begin());
232  NakRequestMap::iterator last(this->nak_requests_.upper_bound(deadline));
233 
234  if (first == last) return; // nothing to expire
235 
236  // Skip unrecoverable datagrams to
237  // re-establish a baseline to detect future reception gaps.
238  SequenceNumber lastSeq = (last == this->nak_requests_.end())
239  ? this->nak_requests_.rbegin()->second
240  : last->second;
241 
242  std::vector<SequenceRange> dropped;
244  lastSeq), dropped)) {
245 
246  for (size_t i = 0; i < dropped.size(); ++i) {
247  const SequenceRange& sr = dropped[i];
248  reassembly_.data_unavailable(FragmentRange(sr.first.getValue(), sr.second.getValue()));
249  }
250 
251  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ReliableSession::expire_naks: ")
252  ACE_TEXT("timed out waiting on remote peer %#08x%08x to send missing samples: %q - %q!\n"),
253  (unsigned int)(this->remote_peer_ >> 32),
254  (unsigned int) this->remote_peer_,
255  this->nak_sequence_.low().getValue(),
256  lastSeq.getValue()));
257  }
258 
259  // Clear expired repair requests:
260  this->nak_requests_.erase(first, last);
262 }
#define ACE_ERROR(X)
static const long DEFAULT_NAK_TIMEOUT
Definition: MulticastInst.h:34
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
void data_unavailable(const FragmentRange &transportSeqDropped)
RcHandle< MulticastInst > MulticastInst_rch
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
static TimeDuration from_msec(const ACE_UINT64 &ms)
ACE_TEXT("TCP_Factory")
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ is_reliable()

virtual bool OpenDDS::DCPS::ReliableSession::is_reliable ( )
inlinevirtual

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 58 of file ReliableSession.h.

58 { return true;}

◆ nak_delay()

TimeDuration OpenDDS::DCPS::ReliableSession::nak_delay ( )
private

Definition at line 686 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::MulticastInst::DEFAULT_NAK_INTERVAL, OpenDDS::DCPS::TimeDuration::from_msec(), and OpenDDS::DCPS::MulticastSession::link_.

Referenced by process_naks(), and start().

687 {
688  MulticastInst_rch cfg = link_->config();
689  TimeDuration interval = cfg ? cfg->nak_interval_ : TimeDuration::from_msec(MulticastInst::DEFAULT_NAK_INTERVAL);
690 
691  // Apply random backoff to minimize potential collisions:
692  interval *= static_cast<double>(std::rand()) /
693  static_cast<double>(RAND_MAX) + 1.0;
694 
695  return interval;
696 }
RcHandle< MulticastInst > MulticastInst_rch
static TimeDuration from_msec(const ACE_UINT64 &ms)
static const long DEFAULT_NAK_INTERVAL
Definition: MulticastInst.h:31

◆ nak_received()

void OpenDDS::DCPS::ReliableSession::nak_received ( const Message_Block_Ptr control)

Definition at line 482 of file ReliableSession.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SingleSendBuffer::Proxy::empty(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), header, OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::SingleSendBuffer::Proxy::low(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::SingleSendBuffer::resend(), OpenDDS::DCPS::MulticastDataLink::send_buffer(), send_nakack(), OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::TransportHeader::source_, and OpenDDS::DCPS::TransportHeader::swap_bytes().

Referenced by control_received().

483 {
484  if (!this->active_) return; // sub send naks, then doesn't receive them.
485 
486  const TransportHeader& header =
488 
489  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
490 
491  MulticastPeer local_peer;
492  CORBA::ULong size = 0;
493  serializer >> local_peer; // sent as remote_peer
494  serializer >> size;
495 
496  std::vector<SequenceRange> ranges;
497 
498  for (CORBA::ULong i = 0; i < size; ++i) {
499  SequenceRange range;
500  serializer >> range.first;
501  serializer >> range.second;
502  ranges.push_back(range);
503  }
504 
505  // Ignore sample if not destined for us:
506  if ((local_peer != this->link_->local_peer()) // Not to us.
507  || (this->remote_peer_ != header.source_)) return; // Not from the remote peer for this session.
508 
509  SingleSendBuffer* send_buffer = this->link_->send_buffer();
510  // Broadcast a MULTICAST_NAKACK control sample before resending to suppress
511  // repair requests for unrecoverable samples by providing a
512  // new low-water mark for affected peers:
513  SequenceNumber sn = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
514  {
515  const SingleSendBuffer::Proxy proxy(*send_buffer);
516  if (!proxy.empty() && proxy.low() > ranges.begin()->first) {
518  ACE_DEBUG ((LM_DEBUG,
519  ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
520  ACE_TEXT (" local %#08x%08x remote %#08x%08x sending nakack for lowest available: %q\n"),
521  (unsigned int)(this->link()->local_peer() >> 32),
522  (unsigned int) this->link()->local_peer(),
523  (unsigned int)(this->remote_peer_ >> 32),
524  (unsigned int) this->remote_peer_,
525  proxy.low().getValue()));
526  }
527  sn = proxy.low();
528  }
529  }
531  send_nakack(sn);
532  }
533 
534  for (CORBA::ULong i = 0; i < size; ++i) {
535  bool ret = send_buffer->resend(ranges[i]);
537  ACE_DEBUG ((LM_DEBUG,
538  ACE_TEXT ("(%P|%t) ReliableSession::nak_received")
539  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q] resend result %C\n"),
540  (unsigned int)(this->link()->local_peer() >> 32),
541  (unsigned int) this->link()->local_peer(),
542  (unsigned int)(this->remote_peer_ >> 32),
543  (unsigned int) this->remote_peer_,
544  ranges[i].first.getValue(), ranges[i].second.getValue(),
545  ret ? "SUCCESS" : "FAILED"));
546  }
547  }
548 }
#define ACE_DEBUG(X)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
ACE_CDR::ULong ULong
MulticastReceiveStrategy * receive_strategy()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual void send_nakack(SequenceNumber low)
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
ACE_INT64 MulticastPeer
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ nakack_received()

void OpenDDS::DCPS::ReliableSession::nakack_received ( const Message_Block_Ptr control)

Definition at line 587 of file ReliableSession.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::MulticastSession::active_, OpenDDS::DCPS::TransportReassembly::data_unavailable(), OpenDDS::DCPS::DCPS_debug_level, deliver_held_data(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, LM_WARNING, nak_sequence_, OpenDDS::DCPS::MulticastSession::reassembly_, OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::source_, and OpenDDS::DCPS::TransportHeader::swap_bytes().

Referenced by control_received().

588 {
589  if (this->active_) return; // pub send nakack, doesn't receive them.
590 
591  const TransportHeader& header =
593 
594  // Not from the remote peer for this session.
595  if (this->remote_peer_ != header.source_) return;
596 
597  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
598 
599  SequenceNumber low;
600  serializer >> low;
601 
602  // MULTICAST_NAKACK control samples indicate data which cannot be
603  // repaired by a remote peer; if any values were needed below
604  // this value, then the sequence needs to be shifted:
605  std::vector<SequenceRange> dropped;
606  SequenceNumber range_low = SequenceNumber();
607  SequenceNumber range_high = low == SequenceNumber() ? SequenceNumber() : low.previous();
608 
609  if (range_low == SequenceNumber() && range_high == SequenceNumber()) {
610 
611  this->nak_sequence_.insert(range_low);
612 
613  } else if (this->nak_sequence_.insert(SequenceRange(range_low, range_high), dropped)) {
614 
615  for (size_t i = 0; i < dropped.size(); ++i) {
616  const SequenceRange& sr = dropped[i];
617  reassembly_.data_unavailable(FragmentRange(sr.first.getValue(), sr.second.getValue()));
618  }
619 
620  if (DCPS_debug_level > 0) {
621  ACE_ERROR((LM_WARNING,
622  ACE_TEXT("(%P|%t) WARNING: ReliableSession::nakack_received ")
623  ACE_TEXT("local %#08x%08x remote %#08x%08x with low [%q] ")
624  ACE_TEXT("- some ranges dropped.\n"),
625  (unsigned int)(this->link()->local_peer() >> 32),
626  (unsigned int) this->link()->local_peer(),
627  (unsigned int)(this->remote_peer_ >> 32),
628  (unsigned int) this->remote_peer_,
629  low.getValue()));
630  }
631  }
633 }
#define ACE_ERROR(X)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
std::pair< FragmentNumber, FragmentNumber > FragmentRange
void data_unavailable(const FragmentRange &transportSeqDropped)
MulticastReceiveStrategy * receive_strategy()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::ReliableSession::OPENDDS_MAP ( MonotonicTimePoint  ,
SequenceNumber   
)
private

◆ OPENDDS_MULTIMAP()

OpenDDS::DCPS::ReliableSession::OPENDDS_MULTIMAP ( TransportHeaderSN  ,
ReceivedDataSample   
)
private

◆ OPENDDS_SET()

typedef OpenDDS::DCPS::ReliableSession::OPENDDS_SET ( SequenceRange  )
private

◆ process_naks()

void OpenDDS::DCPS::ReliableSession::process_naks ( const MonotonicTimePoint )
private

Definition at line 699 of file ReliableSession.cpp.

References expire_naks(), nak_delay(), nak_watchdog_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and send_naks().

700 {
701  // Expire outstanding repair requests that have not yet been
702  // fulfilled; this prevents NAK implosions due to remote
703  // peers becoming unresponsive:
704  expire_naks();
705 
706  // Initiate repairs by sending MULTICAST_NAK control samples
707  // to remote peers from which we are missing data:
708  send_naks();
709 
710  nak_watchdog_->schedule(nak_delay());
711 }
RcHandle< Sporadic > nak_watchdog_

◆ ready_to_deliver()

bool OpenDDS::DCPS::ReliableSession::ready_to_deliver ( const TransportHeader header,
const ReceivedDataSample data 
)
virtual

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 81 of file ReliableSession.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), deliver_held_data(), OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::empty(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::ReceivedDataSample::header_, held_lock_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), nak_sequence_, OPENDDS_MULTIMAP(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::TransportHeader::sequence_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::Transport_debug_level.

83 {
84  if (!acked()
86  || (!nak_sequence_.empty() && nak_sequence_.cumulative_ack() != header.sequence_)
87  || (!nak_sequence_.empty() && nak_sequence_.low() > 1)
88  || (nak_sequence_.empty() && header.sequence_ > 1)) {
89 
90  if (Transport_debug_level > 5) {
91  LogGuid writer(data.header_.publication_id_);
92  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
93  ACE_TEXT(" tseq: %q data seq: %q from %C being WITHHELD because can't receive yet\n"),
94  header.sequence_.getValue(),
95  data.header_.sequence_.getValue(),
96  writer.c_str()));
97  }
98  {
100 
101  held_.insert(std::pair<const SequenceNumber, ReceivedDataSample>(header.sequence_, data));
102 
103  if (Transport_debug_level > 5) {
104  OPENDDS_MULTIMAP(SequenceNumber, ReceivedDataSample)::iterator it = held_.begin();
105  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
106  ACE_TEXT(" held_ data currently contains: %d samples\n"),
107  held_.size()));
108  while (it != held_.end()) {
109  LogGuid writer(it->second.header_.publication_id_);
110  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
111  ACE_TEXT(" held_ data currently contains: tseq: %q dseq: %q from %C HELD\n"),
112  it->first.getValue(),
113  it->second.header_.sequence_.getValue(),
114  writer.c_str()));
115  ++it;
116  }
117  }
118  }
120  return false;
121  } else {
122  if (Transport_debug_level > 5) {
123  LogGuid writer(data.header_.publication_id_);
124  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::ready_to_deliver -")
125  ACE_TEXT(" tseq: %q data seq: %q from %C OK to deliver\n"),
126  header.sequence_.getValue(),
127  data.header_.sequence_.getValue(),
128  writer.c_str()));
129  }
130  return true;
131  }
132 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
SequenceNumber cumulative_ack() const
ACE_TEXT("TCP_Factory")
OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_

◆ record_header_received()

void OpenDDS::DCPS::ReliableSession::record_header_received ( const TransportHeader header)
virtual

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 66 of file ReliableSession.cpp.

References OpenDDS::DCPS::MulticastSession::active_, deliver_held_data(), OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::TransportHeader::sequence_, and OpenDDS::DCPS::TransportHeader::source_.

67 {
68  // Not from the remote peer for this session.
69  if (this->remote_peer_ != header.source_) return;
70 
71  // Active sessions don't need to track nak_sequence_
72  if (this->active_) return;
73 
74  // Update nak sequence for seen sequence from remote peer
75  this->nak_sequence_.insert(header.sequence_);
77 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)

◆ release_remote()

void OpenDDS::DCPS::ReliableSession::release_remote ( const GUID_t remote)
virtual

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 166 of file ReliableSession.cpp.

References ACE_GUARD, held_lock_, and OPENDDS_MULTIMAP().

167 {
169  if (!held_.empty()) {
170  OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample)::iterator it = held_.begin();
171  while (it != held_.end()) {
172  if (it->second.header_.publication_id_ == remote) {
173  held_.erase(it++);
174  } else {
175  it++;
176  }
177  }
178  }
179 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OPENDDS_MULTIMAP(TransportHeaderSN, ReceivedDataSample) held_

◆ send_nakack()

void OpenDDS::DCPS::ReliableSession::send_nakack ( SequenceNumber  low)
virtual

Definition at line 636 of file ReliableSession.cpp.

References OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAKACK, and OpenDDS::DCPS::MulticastSession::send_control().

Referenced by nak_received().

637 {
638  size_t len = sizeof(low.getValue());
639 
640  Message_Block_Ptr data(new ACE_Message_Block(len));
641 
642  Serializer serializer(data.get(), encoding_unaligned_native);
643 
644  serializer << low;
645  // Broadcast control sample to all peers:
647 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void send_control(char submessage_id, Message_Block_Ptr data)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ send_naks() [1/2]

void OpenDDS::DCPS::ReliableSession::send_naks ( )
virtual

The range first - second will be skipped (no naks sent for it).

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 265 of file ReliableSession.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::MulticastSession::acked(), OpenDDS::DCPS::MulticastDataLink::config(), OpenDDS::DCPS::DisjointSequence::cumulative_ack(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::MulticastInst::DEFAULT_NAK_DELAY_INTERVALS, OpenDDS::DCPS::MulticastInst::DEFAULT_NAK_MAX, OpenDDS::DCPS::DisjointSequence::disjoint(), OpenDDS::DCPS::DisjointSequence::dump(), OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::DisjointSequence::insert(), OpenDDS::DCPS::MulticastSession::link(), OpenDDS::DCPS::MulticastSession::link_, LM_DEBUG, OpenDDS::DCPS::DisjointSequence::low(), OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAK, nak_peers_, nak_requests_, nak_sequence_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::MulticastSession::send_control(), and ACE_Utils::truncate_cast().

Referenced by process_naks().

266 {
267  // Could get data samples before syn control message.
268  // No use nak'ing until syn control message is received and session is acked.
269  if (!this->acked()) {
270  if (DCPS_debug_level > 5) {
271  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
272  ACE_TEXT("remote %#08x%08x session NOT acked yet, don't send naks\n"),
273  (unsigned int)(this->link()->local_peer() >> 32),
274  (unsigned int) this->link()->local_peer(),
275  (unsigned int)(this->remote_peer_ >> 32),
276  (unsigned int) this->remote_peer_));
277  }
278  return;
279  }
280 
281  if (DCPS_debug_level > 5) {
282  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
283  ACE_TEXT("remote %#08x%08x nak request size %d\n"),
284  (unsigned int)(this->link()->local_peer() >> 32),
285  (unsigned int) this->link()->local_peer(),
286  (unsigned int)(this->remote_peer_ >> 32),
287  (unsigned int) this->remote_peer_,
288  this->nak_requests_.size()));
289  }
290 
291  if (!(this->nak_sequence_.low() > 1) && !this->nak_sequence_.disjoint()) {
292  if (DCPS_debug_level > 5) {
293  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
294  ACE_TEXT("remote %#08x%08x nak sequence not disjoint, don't send naks\n"),
295  (unsigned int)(this->link()->local_peer() >> 32),
296  (unsigned int) this->link()->local_peer(),
297  (unsigned int)(this->remote_peer_ >> 32),
298  (unsigned int) this->remote_peer_));
299  }
300 
301  if (DCPS_debug_level > 9) {
302  const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
303  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
304  iter != ranges.end(); ++iter) {
305  ACE_DEBUG((LM_DEBUG, "(%P|%t) ReliableSession::send_naks - local %#08x%08x remote %#08x%08x nak_sequence includes: [%q - %q]\n",
306  (unsigned int)(this->link()->local_peer() >> 32),
307  (unsigned int) this->link()->local_peer(),
308  (unsigned int)(this->remote_peer_ >> 32),
309  (unsigned int) this->remote_peer_,
310  iter->first.getValue(),
311  iter->second.getValue()));
312  }
313  }
314  return; // nothing to send
315  }
316 
317  // Record low-water mark for this interval; this value will
318  // be used to reset the low-water mark in the event the remote
319  // peer becomes unresponsive:
321  if (this->nak_sequence_.low() > 1) {
322  this->nak_requests_[now] = SequenceNumber();
323  } else {
324  this->nak_requests_[now] = this->nak_sequence_.cumulative_ack();
325  }
326 
327  typedef std::vector<SequenceRange> RangeVector;
328  RangeVector ignored;
329 
330  /// The range first - second will be skipped (no naks sent for it).
331  SequenceNumber first;
332  SequenceNumber second;
333 
334  NakRequestMap::reverse_iterator itr(this->nak_requests_.rbegin());
335 
336  if (this->nak_requests_.size() > 1) {
337  // The sequences between rbegin - 1 and rbegin will not be ignored for naking.
338  ++itr;
339 
340  MulticastInst_rch cfg = link_->config();
341  size_t nak_delay_intervals = cfg ? cfg->nak_delay_intervals_ : MulticastInst::DEFAULT_NAK_DELAY_INTERVALS;
342  size_t nak_max = cfg ? cfg->nak_max_ : MulticastInst::DEFAULT_NAK_MAX;
343  size_t sz = nak_requests_.size();
344 
345  // Image i is the index of element in nak_requests_ in reverse order.
346  // index 0 sequence is most recent high water mark.
347  // e.g index , 20, 19, 18, 17, 16, 15, 14, 13, 12, 11, 10, 9, 8, 7, 6, 5, 4, 3, 2, 1, 0
348  // 0 (rbegin) is always skipped because missing sample between 1 and 0 interval
349  // should always be naked.,
350  // if nak_delay_intervals=4, nak_max=3, any sequence between 5 - 1, 10 - 6, 15 - 11
351  // are skipped for naking due to nak_delay_intervals and 20 - 16 are skipped for
352  // naking due to nak_max.
353  for (size_t i = 1; i < sz; ++i) {
354  if ((i * 1.0) / (nak_delay_intervals + 1) > nak_max) {
355  if (first != SequenceNumber()) {
356  first = this->nak_requests_.begin()->second;
357  }
358  else {
359  ignored.push_back(std::make_pair(this->nak_requests_.begin()->second, itr->second));
360  }
361  break;
362  }
363 
364  if (i % (nak_delay_intervals + 1) == 1) {
365  second = itr->second;
366  }
367  if (second != SequenceNumber()) {
368  first = itr->second;
369  }
370 
371  if (i % (nak_delay_intervals + 1) == 0) {
372  first = itr->second;
373 
374  if (first != SequenceNumber() && second != SequenceNumber()) {
375  ignored.push_back(std::make_pair(first, second));
376  first = SequenceNumber();
377  second = SequenceNumber();
378  }
379  }
380 
381  ++itr;
382  }
383 
384  if (first != SequenceNumber() && second != SequenceNumber() && first != second) {
385  ignored.push_back(std::make_pair(first, second));
386  }
387  }
388 
389  // Take a copy to facilitate temporary suppression:
390  DisjointSequence received(this->nak_sequence_);
391  if (DCPS_debug_level > 0) {
392  received.dump();
393  }
394 
395  size_t sz = ignored.size();
396  for (size_t i = 0; i < sz; ++i) {
397 
398  if (ignored[i].second > received.cumulative_ack()) {
399  SequenceNumber high = ignored[i].second;
400  SequenceNumber low = ignored[i].first;
401  if (low < received.cumulative_ack()) {
402  low = received.cumulative_ack();
403  }
404 
405  if (DCPS_debug_level > 5) {
406  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
407  ACE_TEXT("remote %#08x%08x ignore missing [%q - %q]\n"),
408  (unsigned int)(this->link()->local_peer() >> 32),
409  (unsigned int) this->link()->local_peer(),
410  (unsigned int)(this->remote_peer_ >> 32),
411  (unsigned int) this->remote_peer_,
412  low.getValue(), high.getValue()));
413  }
414 
415  // Make contiguous between ignored sequences.
416  received.insert(SequenceRange(low, high));
417  }
418  }
419 
420  for (NakPeerSet::iterator it(this->nak_peers_.begin());
421  it != this->nak_peers_.end(); ++it) {
422  // Update sequence to temporarily suppress repair requests for
423  // ranges already requested by other peers for this interval:
424  received.insert(*it);
425  }
426  bool sending_naks = false;
427  if (received.low() > 1){
428  //Special case: nak from beginning to make sure no missing sequence
429  //number below the first received
430  sending_naks = true;
431  std::vector<SequenceRange> ranges;
432  ranges.push_back(SequenceRange(SequenceNumber(), received.low()));
433 
434  CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
435 
436  size_t len = sizeof(this->remote_peer_)
437  + sizeof(size)
438  + size * 2 * sizeof(SequenceNumber);
439 
440  Message_Block_Ptr data(new ACE_Message_Block(len));
441 
442  Serializer serializer(data.get(), encoding_unaligned_native);
443 
444  serializer << this->remote_peer_;
445  serializer << size;
446  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
447  iter != ranges.end(); ++iter) {
448  serializer << iter->first;
449  serializer << iter->second;
450  if (DCPS_debug_level > 0) {
451  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks ")
452  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
453  (unsigned int)(this->link()->local_peer() >> 32),
454  (unsigned int) this->link()->local_peer(),
455  (unsigned int)(this->remote_peer_ >> 32),
456  (unsigned int) this->remote_peer_,
457  iter->first.getValue(), iter->second.getValue()));
458  }
459  }
460  // Send control sample to remote peer:
462  }
463  if (received.disjoint()) {
464  sending_naks = true;
465  send_naks(received);
466  }
467 
468  if (!sending_naks && DCPS_debug_level > 5){
469  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ReliableSession::send_naks local %#08x%08x ")
470  ACE_TEXT("remote %#08x%08x received sequence not disjoint, don't send naks\n"),
471  (unsigned int)(this->link()->local_peer() >> 32),
472  (unsigned int) this->link()->local_peer(),
473  (unsigned int)(this->remote_peer_ >> 32),
474  (unsigned int) this->remote_peer_));
475  }
476 
477  // Clear peer repair requests:
478  this->nak_peers_.clear();
479 }
#define ACE_DEBUG(X)
static const long DEFAULT_NAK_DELAY_INTERVALS
Definition: MulticastInst.h:32
static const long DEFAULT_NAK_MAX
Definition: MulticastInst.h:33
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
TO truncate_cast(FROM val)
ACE_CDR::ULong ULong
SequenceNumber cumulative_ack() const
RcHandle< MulticastInst > MulticastInst_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
ACE_TEXT("TCP_Factory")
void send_control(char submessage_id, Message_Block_Ptr data)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ send_naks() [2/2]

void OpenDDS::DCPS::ReliableSession::send_naks ( DisjointSequence found)

Definition at line 551 of file ReliableSession.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::MulticastSession::link(), LM_DEBUG, OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_NAK, OpenDDS::DCPS::MulticastSession::remote_peer_, OpenDDS::DCPS::MulticastSession::send_control(), and ACE_Utils::truncate_cast().

552 {
553  const std::vector<SequenceRange> ranges(received.missing_sequence_ranges());
554 
555  CORBA::ULong size = ACE_Utils::truncate_cast<CORBA::ULong>(ranges.size());
556 
557  size_t len = sizeof(this->remote_peer_)
558  + sizeof(size)
559  + size * 2 * sizeof(SequenceNumber);
560 
561  Message_Block_Ptr data(new ACE_Message_Block(len));
562 
563  Serializer serializer(data.get(), encoding_unaligned_native);
564 
565  serializer << this->remote_peer_;
566  serializer << size;
567  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
568  iter != ranges.end(); ++iter) {
569  serializer << iter->first;
570  serializer << iter->second;
572  ACE_DEBUG ((LM_DEBUG, ACE_TEXT ("(%P|%t) ReliableSession::send_naks (Disjoint) ")
573  ACE_TEXT (" local %#08x%08x remote %#08x%08x [%q - %q]\n"),
574  (unsigned int)(this->link()->local_peer() >> 32),
575  (unsigned int) this->link()->local_peer(),
576  (unsigned int)(this->remote_peer_ >> 32),
577  (unsigned int) this->remote_peer_,
578  iter->first.getValue(), iter->second.getValue()));
579  }
580  }
581  // Send control sample to remote peer:
583 }
#define ACE_DEBUG(X)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
TO truncate_cast(FROM val)
ACE_CDR::ULong ULong
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void send_control(char submessage_id, Message_Block_Ptr data)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ start()

bool OpenDDS::DCPS::ReliableSession::start ( bool  active,
bool  acked 
)
virtual

Implements OpenDDS::DCPS::MulticastSession.

Definition at line 650 of file ReliableSession.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::MulticastSession::active_, nak_delay(), nak_watchdog_, OpenDDS::DCPS::MulticastSession::reverse_start_lock_, OpenDDS::DCPS::MulticastSession::set_acked(), OpenDDS::DCPS::MulticastSession::start_lock_, and OpenDDS::DCPS::MulticastSession::started_.

651 {
652  ACE_GUARD_RETURN(ACE_Thread_Mutex, guard, this->start_lock_, false);
653 
654  if (this->started_) {
655  return true; // already started
656  }
657 
658  this->active_ = active;
659  {
660  //can't call accept_datalink while holding lock due to possible reactor deadlock with passive_connection
661  ACE_GUARD_RETURN(Reverse_Lock_t, unlock_guard, this->reverse_start_lock_, false);
662 
663  // A watchdog timer is scheduled to periodically check for gaps in
664  // received data. If a gap is discovered, MULTICAST_NAK control
665  // samples will be sent to initiate repairs.
666  // Only subscriber send naks so just schedule for sub role.
667  if (!active) {
668  if (acked) {
669  this->set_acked();
670  }
671  this->nak_watchdog_->schedule(nak_delay());
672  }
673  } //Reacquire start_lock_ after releasing unlock_guard with release_start_lock_
674 
675  return this->started_ = true;
676 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Reverse_Lock< ACE_Thread_Mutex > Reverse_Lock_t
RcHandle< Sporadic > nak_watchdog_

◆ stop()

void OpenDDS::DCPS::ReliableSession::stop ( void  )
virtual

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 679 of file ReliableSession.cpp.

References nak_watchdog_, and OpenDDS::DCPS::MulticastSession::stop().

680 {
682  this->nak_watchdog_->cancel();
683 }
RcHandle< Sporadic > nak_watchdog_

◆ syn_hook()

void OpenDDS::DCPS::ReliableSession::syn_hook ( const SequenceNumber seq)
virtual

Reimplemented from OpenDDS::DCPS::MulticastSession.

Definition at line 211 of file ReliableSession.cpp.

References OpenDDS::DCPS::DisjointSequence::insert(), nak_sequence_, and OpenDDS::DCPS::DisjointSequence::reset().

212 {
213  const std::vector<SequenceRange> ranges(this->nak_sequence_.present_sequence_ranges());
214  this->nak_sequence_.reset();
215  this->nak_sequence_.insert(seq);
216 
217  for (std::vector<SequenceRange>::const_iterator iter = ranges.begin();
218  iter != ranges.end(); ++iter) {
219  this->nak_sequence_.insert(SequenceRange(iter->first, iter->second));
220  }
221 }
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
std::pair< SequenceNumber, SequenceNumber > SequenceRange

Member Data Documentation

◆ held_lock_

ACE_Thread_Mutex OpenDDS::DCPS::ReliableSession::held_lock_
private

Definition at line 73 of file ReliableSession.h.

Referenced by deliver_held_data(), ready_to_deliver(), and release_remote().

◆ nak_peers_

NakPeerSet OpenDDS::DCPS::ReliableSession::nak_peers_
private

Definition at line 78 of file ReliableSession.h.

Referenced by send_naks().

◆ nak_requests_

NakRequestMap OpenDDS::DCPS::ReliableSession::nak_requests_
private

Definition at line 71 of file ReliableSession.h.

Referenced by expire_naks(), and send_naks().

◆ nak_sequence_

DisjointSequence OpenDDS::DCPS::ReliableSession::nak_sequence_
private

◆ nak_watchdog_

RcHandle<Sporadic> OpenDDS::DCPS::ReliableSession::nak_watchdog_
private

Definition at line 64 of file ReliableSession.h.

Referenced by process_naks(), start(), stop(), and ~ReliableSession().


The documentation for this class was generated from the following files: