OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::MulticastSession Class Referenceabstract

#include <MulticastSession.h>

Inheritance diagram for OpenDDS::DCPS::MulticastSession:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MulticastSession:
Collaboration graph
[legend]

Public Member Functions

virtual ~MulticastSession ()
 
MulticastDataLinklink ()
 
MulticastPeer remote_peer () const
 
bool acked ()
 
void set_acked ()
 
virtual bool is_reliable ()
 
void syn_received (const Message_Block_Ptr &control)
 
void send_all_syn (const MonotonicTimePoint &now)
 
void send_syn (const GUID_t &local_writer, const GUID_t &remote_reader)
 
void synack_received (const Message_Block_Ptr &control)
 
void send_synack (const GUID_t &local_reader, const GUID_t &remote_writer)
 
virtual void send_naks ()
 
virtual bool check_header (const TransportHeader &header)=0
 
virtual void record_header_received (const TransportHeader &header)=0
 
virtual bool ready_to_deliver (const TransportHeader &header, const ReceivedDataSample &data)=0
 
virtual void release_remote (const GUID_t &)
 
virtual bool control_received (char submessage_id, const Message_Block_Ptr &control)
 
virtual bool start (bool active, bool acked)=0
 
virtual void stop ()
 
bool reassemble (ReceivedDataSample &data, const TransportHeader &header)
 
void add_remote (const GUID_t &local)
 
void add_remote (const GUID_t &local, const GUID_t &remote)
 
void remove_remote (const GUID_t &local, const GUID_t &remote)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Types

typedef ACE_Reverse_Lock< ACE_Thread_MutexReverse_Lock_t
 

Protected Member Functions

 MulticastSession (RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
 
void send_control (char submessage_id, Message_Block_Ptr data)
 
void start_syn ()
 
virtual void syn_hook (const SequenceNumber &)
 
typedef OPENDDS_MAP_CMP (GUID_t, RepoIdSet, GUID_tKeyLessThan) PendingRemoteMap
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Protected Attributes

MulticastDataLinklink_
 
MulticastPeer remote_peer_
 
ACE_Thread_Mutex start_lock_
 
Reverse_Lock_t reverse_start_lock_
 
bool started_
 
bool active_
 
TransportReassembly reassembly_
 
bool acked_
 
PendingRemoteMap pending_remote_map_
 

Private Types

typedef PmfSporadicTask< MulticastSessionSporadic
 

Private Member Functions

void remove_remote_i (const GUID_t &local, const GUID_t &remote)
 

Private Attributes

ACE_Thread_Mutex ack_lock_
 
RcHandle< Sporadicsyn_watchdog_
 
TimeDuration syn_delay_
 
const TimeDuration initial_syn_delay_
 
String config_name
 

Detailed Description

Definition at line 34 of file MulticastSession.h.

Member Typedef Documentation

◆ Reverse_Lock_t

Definition at line 96 of file MulticastSession.h.

◆ Sporadic

Definition at line 125 of file MulticastSession.h.

Constructor & Destructor Documentation

◆ ~MulticastSession()

OpenDDS::DCPS::MulticastSession::~MulticastSession ( )
virtual

Definition at line 49 of file MulticastSession.cpp.

References syn_watchdog_.

50 {
51  syn_watchdog_->cancel();
52 }
RcHandle< Sporadic > syn_watchdog_

◆ MulticastSession()

OpenDDS::DCPS::MulticastSession::MulticastSession ( RcHandle< ReactorInterceptor interceptor,
MulticastDataLink link,
MulticastPeer  remote_peer 
)
protected

Definition at line 31 of file MulticastSession.cpp.

34  : link_(link)
37  , started_(false)
38  , active_(true)
39  , reassembly_(link->config()->fragment_reassembly_timeout_)
40  , acked_(false)
41  , syn_watchdog_(make_rch<Sporadic>(TheServiceParticipant->time_source(),
42  interceptor,
43  rchandle_from(this),
45  , initial_syn_delay_(link->config()->syn_interval_)
46  , config_name(link->config()->name())
47 {}
const TimeDuration initial_syn_delay_
void send_all_syn(const MonotonicTimePoint &now)
RcHandle< Sporadic > syn_watchdog_
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
MulticastPeer remote_peer() const
#define TheServiceParticipant

Member Function Documentation

◆ acked()

bool OpenDDS::DCPS::MulticastSession::acked ( )

Definition at line 55 of file MulticastSession.cpp.

References ACE_GUARD_RETURN, ACE_SYNCH_MUTEX, ack_lock_, and acked_.

Referenced by OpenDDS::DCPS::ReliableSession::ready_to_deliver(), and OpenDDS::DCPS::ReliableSession::send_naks().

56 {
57  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
58  return this->acked_;
59 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ add_remote() [1/2]

void OpenDDS::DCPS::MulticastSession::add_remote ( const GUID_t local)

Definition at line 335 of file MulticastSession.cpp.

References OpenDDS::DCPS::GuidConverter::isWriter(), and start_syn().

336 {
337  const GuidConverter conv(local);
338  if (conv.isWriter()) {
339  // Active peers schedule a watchdog timer to initiate a 2-way
340  // handshake to verify that passive endpoints can send/receive
341  // data reliably. This process must be executed using the
342  // transport reactor thread to prevent blocking.
343  // Only publisher send syn so just schedule for pub role.
344  this->start_syn();
345  }
346 }

◆ add_remote() [2/2]

void OpenDDS::DCPS::MulticastSession::add_remote ( const GUID_t local,
const GUID_t remote 
)

Definition at line 349 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, OpenDDS::DCPS::GuidConverter::isWriter(), pending_remote_map_, and start_syn().

351 {
352  const GuidConverter conv(local);
353 
354  {
355  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
356  pending_remote_map_[local].insert(remote);
357  }
358 
359  if (conv.isWriter()) {
360  // Active peers schedule a watchdog timer to initiate a 2-way
361  // handshake to verify that passive endpoints can send/receive
362  // data reliably. This process must be executed using the
363  // transport reactor thread to prevent blocking.
364  // Only publisher send syn so just schedule for pub role.
365  this->start_syn();
366  }
367 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ check_header()

virtual bool OpenDDS::DCPS::MulticastSession::check_header ( const TransportHeader header)
pure virtual

◆ control_received()

bool OpenDDS::DCPS::MulticastSession::control_received ( char  submessage_id,
const Message_Block_Ptr control 
)
virtual

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 102 of file MulticastSession.cpp.

References OpenDDS::DCPS::MULTICAST_SYN, OpenDDS::DCPS::MULTICAST_SYNACK, syn_received(), and synack_received().

Referenced by OpenDDS::DCPS::ReliableSession::control_received().

104 {
105  switch (submessage_id) {
106  case MULTICAST_SYN:
107  syn_received(control);
108  break;
109 
110  case MULTICAST_SYNACK:
111  synack_received(control);
112  break;
113 
114  default:
115  return false;
116  }
117 
118  return true;
119 }
void syn_received(const Message_Block_Ptr &control)
void synack_received(const Message_Block_Ptr &control)

◆ is_reliable()

virtual bool OpenDDS::DCPS::MulticastSession::is_reliable ( )
inlinevirtual

Reimplemented in OpenDDS::DCPS::ReliableSession, and OpenDDS::DCPS::BestEffortSession.

Definition at line 45 of file MulticastSession.h.

45 { return false;}

◆ link()

ACE_INLINE MulticastDataLink * OpenDDS::DCPS::MulticastSession::link ( void  )

◆ OPENDDS_MAP_CMP()

typedef OpenDDS::DCPS::MulticastSession::OPENDDS_MAP_CMP ( GUID_t  ,
RepoIdSet  ,
GUID_tKeyLessThan   
)
protected

◆ ready_to_deliver()

virtual bool OpenDDS::DCPS::MulticastSession::ready_to_deliver ( const TransportHeader header,
const ReceivedDataSample data 
)
pure virtual

◆ reassemble()

bool OpenDDS::DCPS::MulticastSession::reassemble ( ReceivedDataSample data,
const TransportHeader header 
)

Definition at line 326 of file MulticastSession.cpp.

References OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportReassembly::reassemble(), reassembly_, and OpenDDS::DCPS::TransportHeader::sequence_.

328 {
329  return this->reassembly_.reassemble(header.sequence_,
330  header.first_fragment_,
331  data);
332 }
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)

◆ record_header_received()

virtual void OpenDDS::DCPS::MulticastSession::record_header_received ( const TransportHeader header)
pure virtual

◆ release_remote()

virtual void OpenDDS::DCPS::MulticastSession::release_remote ( const GUID_t )
inlinevirtual

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 60 of file MulticastSession.h.

60 {};

◆ remote_peer()

ACE_INLINE MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer ( ) const

Definition at line 20 of file MulticastSession.inl.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, and remote_peer_.

21 {
22  return this->remote_peer_;
23 }

◆ remove_remote()

void OpenDDS::DCPS::MulticastSession::remove_remote ( const GUID_t local,
const GUID_t remote 
)

Definition at line 370 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, and remove_remote_i().

372 {
373  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
374  remove_remote_i(local, remote);
375 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void remove_remote_i(const GUID_t &local, const GUID_t &remote)

◆ remove_remote_i()

void OpenDDS::DCPS::MulticastSession::remove_remote_i ( const GUID_t local,
const GUID_t remote 
)
private

Definition at line 378 of file MulticastSession.cpp.

References OpenDDS::DCPS::GuidConverter::isWriter(), OPENDDS_END_VERSIONED_NAMESPACE_DECL, pending_remote_map_, and syn_watchdog_.

Referenced by remove_remote(), and synack_received().

380 {
381  const GuidConverter conv(local);
382 
383  const bool empty_before = pending_remote_map_.empty();
384  pending_remote_map_[local].erase(remote);
385  if (pending_remote_map_[local].empty()) {
386  pending_remote_map_.erase(local);
387  }
388  const bool empty = pending_remote_map_.empty() && !empty_before;
389 
390  if (conv.isWriter() && empty && this->syn_watchdog_) {
391  this->syn_watchdog_->cancel();
392  }
393 }
RcHandle< Sporadic > syn_watchdog_

◆ send_all_syn()

void OpenDDS::DCPS::MulticastSession::send_all_syn ( const MonotonicTimePoint now)

Definition at line 192 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, pending_remote_map_, send_syn(), syn_delay_, and syn_watchdog_.

193 {
194  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
195  for (PendingRemoteMap::const_iterator pos1 = pending_remote_map_.begin(), limit = pending_remote_map_.end();
196  pos1 != limit; ++pos1) {
197  const GUID_t& local_writer = pos1->first;
198  for (RepoIdSet::const_iterator pos2 = pos1->second.begin(), limit = pos1->second.end(); pos2 != limit; ++pos2) {
199  const GUID_t& remote_reader = *pos2;
200  send_syn(local_writer, remote_reader);
201  }
202  }
203 
204  // Exponential back-off.
205  syn_delay_ *= 2;
206  syn_watchdog_->schedule(syn_delay_);
207 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void send_syn(const GUID_t &local_writer, const GUID_t &remote_reader)
RcHandle< Sporadic > syn_watchdog_

◆ send_control()

void OpenDDS::DCPS::MulticastSession::send_control ( char  submessage_id,
Message_Block_Ptr  data 
)
protected

Definition at line 77 of file MulticastSession.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DataLink::create_control(), header, link_, LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::DataLink::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.

Referenced by OpenDDS::DCPS::ReliableSession::send_nakack(), OpenDDS::DCPS::ReliableSession::send_naks(), send_syn(), and send_synack().

78 {
79  DataSampleHeader header;
80  Message_Block_Ptr control(
81  this->link_->create_control(submessage_id, header, move(data)));
82  if (!control) {
83  ACE_ERROR((LM_ERROR,
84  ACE_TEXT("(%P|%t) ERROR: ")
85  ACE_TEXT("MulticastSession::send_control: ")
86  ACE_TEXT("create_control failed!\n")));
87  return;
88  }
89 
90  int error = this->link_->send_control(header, move(control));
91  if (error != SEND_CONTROL_OK) {
92  ACE_ERROR((LM_ERROR,
93  ACE_TEXT("(%P|%t) ERROR: ")
94  ACE_TEXT("MulticastSession::send_control: ")
95  ACE_TEXT("send_control failed: %d!\n"),
96  error));
97  return;
98  }
99 }
#define ACE_ERROR(X)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:628
ACE_TEXT("TCP_Factory")
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:668

◆ send_naks()

virtual void OpenDDS::DCPS::MulticastSession::send_naks ( )
inlinevirtual

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 54 of file MulticastSession.h.

References header.

Referenced by send_synack().

54 {}

◆ send_syn()

void OpenDDS::DCPS::MulticastSession::send_syn ( const GUID_t local_writer,
const GUID_t remote_reader 
)

Definition at line 210 of file MulticastSession.cpp.

References OpenDDS::DCPS::LogGuid::c_str(), config_name, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), link(), LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_SYN, remote_peer_, send_control(), and VDBG_LVL.

Referenced by send_all_syn().

212 {
213  size_t len = sizeof(this->remote_peer_) + 2 * sizeof(GUID_t);
214 
215  Message_Block_Ptr data( new ACE_Message_Block(len));
216 
217  Serializer serializer(data.get(), encoding_kind);
218 
219  serializer << this->remote_peer_;
220  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&local_writer), sizeof(local_writer));
221  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&remote_reader), sizeof(remote_reader));
222 
223  VDBG_LVL((LM_DEBUG,
224  "(%P|%t) MulticastSession[%C]::send_syn "
225  "local %#08x%08x %C remote %#08x%08x %C\n",
226  config_name.c_str(),
227  (unsigned int)(this->link()->local_peer() >> 32),
228  (unsigned int) this->link()->local_peer(),
229  LogGuid(local_writer).c_str(),
230  (unsigned int)(this->remote_peer_ >> 32),
231  (unsigned int) this->remote_peer_,
232  LogGuid(remote_reader).c_str()),
233  2);
234 
235  // Send control sample to remote peer:
237 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void send_control(char submessage_id, Message_Block_Ptr data)
#define VDBG_LVL(DBG_ARGS, LEVEL)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ send_synack()

void OpenDDS::DCPS::MulticastSession::send_synack ( const GUID_t local_reader,
const GUID_t remote_writer 
)

Definition at line 287 of file MulticastSession.cpp.

References active_, OpenDDS::DCPS::LogGuid::c_str(), config_name, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), link(), LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::move(), OpenDDS::DCPS::MULTICAST_SYNACK, remote_peer_, send_control(), send_naks(), and VDBG_LVL.

Referenced by syn_received().

289 {
290  size_t len = sizeof(this->remote_peer_) + 2 * sizeof(GUID_t);
291 
292  Message_Block_Ptr data(new ACE_Message_Block(len));
293 
294  Serializer serializer(data.get(), encoding_kind);
295 
296  serializer << this->remote_peer_;
297  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&local_reader), sizeof(local_reader));
298  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&remote_writer), sizeof(remote_writer));
299 
300  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
301  "local %#08x%08x %C remote %#08x%08x %C active %d\n",
302  config_name.c_str(),
303  (unsigned int)(this->link()->local_peer() >> 32),
304  (unsigned int) this->link()->local_peer(),
305  LogGuid(local_reader).c_str(),
306  (unsigned int)(this->remote_peer_ >> 32),
307  (unsigned int) this->remote_peer_,
308  LogGuid(remote_writer).c_str(),
309  this->active_ ? 1 : 0), 2);
310 
311  // Send control sample to remote peer:
313 
314  // Send naks before sending synack to
315  // reduce wait time for resends from remote.
316  send_naks();
317 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void send_control(char submessage_id, Message_Block_Ptr data)
#define VDBG_LVL(DBG_ARGS, LEVEL)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ set_acked()

void OpenDDS::DCPS::MulticastSession::set_acked ( )

Definition at line 62 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, and acked_.

Referenced by OpenDDS::DCPS::BestEffortSession::start(), and OpenDDS::DCPS::ReliableSession::start().

63 {
64  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
65  this->acked_ = true;
66 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ start()

virtual bool OpenDDS::DCPS::MulticastSession::start ( bool  active,
bool  acked 
)
pure virtual

◆ start_syn()

void OpenDDS::DCPS::MulticastSession::start_syn ( )
protected

Definition at line 69 of file MulticastSession.cpp.

References initial_syn_delay_, syn_delay_, and syn_watchdog_.

Referenced by add_remote().

70 {
71  syn_watchdog_->cancel();
73  syn_watchdog_->schedule(TimeDuration(0));
74 }
const TimeDuration initial_syn_delay_
RcHandle< Sporadic > syn_watchdog_

◆ stop()

void OpenDDS::DCPS::MulticastSession::stop ( void  )
virtual

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 320 of file MulticastSession.cpp.

References syn_watchdog_.

Referenced by OpenDDS::DCPS::ReliableSession::stop().

321 {
322  this->syn_watchdog_->cancel();
323 }
RcHandle< Sporadic > syn_watchdog_

◆ syn_hook()

virtual void OpenDDS::DCPS::MulticastSession::syn_hook ( const SequenceNumber )
inlineprotectedvirtual

Reimplemented in OpenDDS::DCPS::ReliableSession.

Definition at line 93 of file MulticastSession.h.

Referenced by syn_received().

93 {}

◆ syn_received()

void OpenDDS::DCPS::MulticastSession::syn_received ( const Message_Block_Ptr control)

Definition at line 122 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, acked_, active_, OpenDDS::DCPS::LogGuid::c_str(), config_name, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, link(), link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), pending_remote_map_, OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), remote_peer_, send_synack(), OpenDDS::DCPS::TransportHeader::sequence_, OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::TransportHeader::swap_bytes(), syn_hook(), OpenDDS::DCPS::MulticastDataLink::transport(), and VDBG_LVL.

Referenced by control_received().

123 {
124  if (this->active_) return; // pub send syn, then doesn't receive them.
125 
126  const TransportHeader& header =
128 
129  // Not from the remote peer for this session.
130  if (this->remote_peer_ != header.source_) return;
131 
132  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
133 
134  MulticastPeer local_peer;
135  GUID_t remote_writer;
136  GUID_t local_reader;
137  serializer >> local_peer; // sent as remote_peer
138  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&remote_writer), sizeof(remote_writer));
139  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&local_reader), sizeof(local_reader));
140 
141  // Ignore sample if not destined for us:
142  if (local_peer != this->link_->local_peer()) return;
143 
144  bool call_passive_connection = false;
145  bool call_send_synack = true;
146  {
147  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
148  PendingRemoteMap::const_iterator pos1 = pending_remote_map_.find(local_reader);
149  if (pos1 == pending_remote_map_.end()) {
150  call_send_synack = false;
151  } else {
152  RepoIdSet::const_iterator pos2 = pos1->second.find(remote_writer);
153  if (pos2 == pos1->second.end()) {
154  call_send_synack = false;
155  }
156  }
157 
158  VDBG_LVL((LM_DEBUG,
159  "(%P|%t) MulticastSession[%C]::syn_received "
160  "local %#08x%08x %C remote %#08x%08x %C\n",
161  config_name.c_str(),
162  (unsigned int)(this->link()->local_peer() >> 32),
163  (unsigned int) this->link()->local_peer(),
164  LogGuid(local_reader).c_str(),
165  (unsigned int)(this->remote_peer_ >> 32),
166  (unsigned int) this->remote_peer_,
167  LogGuid(remote_writer).c_str()),
168  2);
169 
170  if (!this->acked_) {
171  this->acked_ = true;
172  syn_hook(header.sequence_);
173  call_passive_connection = true;
174  }
175  }
176 
177  if (call_passive_connection) {
178  MulticastTransport_rch transport = link_->transport();
179  if (transport) {
180  transport->passive_connection(link_->local_peer(), remote_peer_);
181  }
182  }
183 
184  // MULTICAST_SYN control samples are always positively
185  // acknowledged by a matching remote peer:
186  if (call_send_synack) {
187  send_synack(local_reader, remote_writer);
188  }
189 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void syn_hook(const SequenceNumber &)
MulticastTransport_rch transport()
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
MulticastReceiveStrategy * receive_strategy()
#define VDBG_LVL(DBG_ARGS, LEVEL)
RcHandle< MulticastTransport > MulticastTransport_rch
void send_synack(const GUID_t &local_reader, const GUID_t &remote_writer)
ACE_INT64 MulticastPeer

◆ synack_received()

void OpenDDS::DCPS::MulticastSession::synack_received ( const Message_Block_Ptr control)

Definition at line 240 of file MulticastSession.cpp.

References ACE_GUARD, ACE_SYNCH_MUTEX, ack_lock_, acked_, active_, OpenDDS::DCPS::LogGuid::c_str(), config_name, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header, OpenDDS::DCPS::DataLink::invoke_on_start_callbacks(), link(), link_, LM_DEBUG, OpenDDS::DCPS::MulticastDataLink::local_peer(), OpenDDS::DCPS::MulticastDataLink::receive_strategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header(), remote_peer_, remove_remote_i(), OpenDDS::DCPS::TransportHeader::source_, OpenDDS::DCPS::TransportHeader::swap_bytes(), and VDBG_LVL.

Referenced by control_received().

241 {
242  if (!this->active_) return; // sub send synack, then doesn't receive them.
243 
244  // Already received ack.
245  //if (this->acked()) return;
246 
247  const TransportHeader& header =
249 
250  // Not from the remote peer for this session.
251  if (this->remote_peer_ != header.source_) return;
252 
253  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
254 
255  MulticastPeer local_peer;
256  GUID_t remote_reader;
257  GUID_t local_writer;
258  serializer >> local_peer; // sent as remote_peer
259  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&remote_reader), sizeof(remote_reader));
260  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&local_writer), sizeof(local_writer));
261 
262  // Ignore sample if not destined for us:
263  if (local_peer != this->link_->local_peer()) return;
264 
265  VDBG_LVL((LM_DEBUG,
266  "(%P|%t) MulticastSession[%C]::synack_received "
267  "local %#08x%08x %C remote %#08x%08x %C\n",
268  config_name.c_str(),
269  (unsigned int)(this->link()->local_peer() >> 32),
270  (unsigned int) this->link()->local_peer(),
271  LogGuid(local_writer).c_str(),
272  (unsigned int)(this->remote_peer_ >> 32),
273  (unsigned int) this->remote_peer_,
274  LogGuid(remote_reader).c_str()),
275  2);
276 
277  {
278  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
279  this->acked_ = true;
280  remove_remote_i(local_writer, remote_reader);
281  }
282 
283  this->link_->invoke_on_start_callbacks(local_writer, remote_reader, true);
284 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void remove_remote_i(const GUID_t &local, const GUID_t &remote)
MulticastReceiveStrategy * receive_strategy()
#define VDBG_LVL(DBG_ARGS, LEVEL)
ACE_INT64 MulticastPeer

Member Data Documentation

◆ ack_lock_

ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::ack_lock_
private

◆ acked_

bool OpenDDS::DCPS::MulticastSession::acked_
protected

Definition at line 110 of file MulticastSession.h.

Referenced by acked(), set_acked(), syn_received(), and synack_received().

◆ active_

bool OpenDDS::DCPS::MulticastSession::active_
protected

◆ config_name

String OpenDDS::DCPS::MulticastSession::config_name
private

Definition at line 129 of file MulticastSession.h.

Referenced by send_syn(), send_synack(), syn_received(), and synack_received().

◆ initial_syn_delay_

const TimeDuration OpenDDS::DCPS::MulticastSession::initial_syn_delay_
private

Definition at line 128 of file MulticastSession.h.

Referenced by start_syn().

◆ link_

MulticastDataLink* OpenDDS::DCPS::MulticastSession::link_
protected

◆ pending_remote_map_

PendingRemoteMap OpenDDS::DCPS::MulticastSession::pending_remote_map_
protected

Definition at line 116 of file MulticastSession.h.

Referenced by add_remote(), remove_remote_i(), send_all_syn(), and syn_received().

◆ reassembly_

TransportReassembly OpenDDS::DCPS::MulticastSession::reassembly_
protected

◆ remote_peer_

MulticastPeer OpenDDS::DCPS::MulticastSession::remote_peer_
protected

◆ reverse_start_lock_

Reverse_Lock_t OpenDDS::DCPS::MulticastSession::reverse_start_lock_
protected

Definition at line 97 of file MulticastSession.h.

Referenced by OpenDDS::DCPS::ReliableSession::start().

◆ start_lock_

ACE_Thread_Mutex OpenDDS::DCPS::MulticastSession::start_lock_
protected

◆ started_

bool OpenDDS::DCPS::MulticastSession::started_
protected

◆ syn_delay_

TimeDuration OpenDDS::DCPS::MulticastSession::syn_delay_
private

Definition at line 127 of file MulticastSession.h.

Referenced by send_all_syn(), and start_syn().

◆ syn_watchdog_

RcHandle<Sporadic> OpenDDS::DCPS::MulticastSession::syn_watchdog_
private

Definition at line 126 of file MulticastSession.h.

Referenced by remove_remote_i(), send_all_syn(), start_syn(), stop(), and ~MulticastSession().


The documentation for this class was generated from the following files: