OpenDDS  Snapshot(2023/04/28-20:55)
MulticastSession.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "MulticastSession.h"
10 
11 #include <dds/DCPS/GuidConverter.h>
12 
13 #include <ace/Log_Msg.h>
14 
15 #include <cmath>
16 
17 
18 #ifndef __ACE_INLINE__
19 # include "MulticastSession.inl"
20 #endif /* __ACE_INLINE__ */
21 
23 
24 namespace OpenDDS {
25 namespace DCPS {
26 
27 namespace {
28  const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
29 }
30 
32  MulticastDataLink* link,
33  MulticastPeer remote_peer)
34  : link_(link)
35  , remote_peer_(remote_peer)
36  , reverse_start_lock_(start_lock_)
37  , started_(false)
38  , active_(true)
39  , reassembly_(link->config()->fragment_reassembly_timeout_)
40  , acked_(false)
41  , syn_watchdog_(make_rch<Sporadic>(TheServiceParticipant->time_source(),
42  interceptor,
43  rchandle_from(this),
44  &MulticastSession::send_all_syn))
45  , initial_syn_delay_(link->config()->syn_interval_)
46  , config_name(link->config()->name())
47 {}
48 
50 {
51  syn_watchdog_->cancel();
52 }
53 
54 bool
56 {
57  ACE_GUARD_RETURN(ACE_SYNCH_MUTEX, guard, this->ack_lock_, false);
58  return this->acked_;
59 }
60 
61 void
63 {
64  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
65  this->acked_ = true;
66 }
67 
68 void
70 {
71  syn_watchdog_->cancel();
73  syn_watchdog_->schedule(TimeDuration(0));
74 }
75 
76 void
78 {
80  Message_Block_Ptr control(
81  this->link_->create_control(submessage_id, header, move(data)));
82  if (!control) {
84  ACE_TEXT("(%P|%t) ERROR: ")
85  ACE_TEXT("MulticastSession::send_control: ")
86  ACE_TEXT("create_control failed!\n")));
87  return;
88  }
89 
90  int error = this->link_->send_control(header, move(control));
91  if (error != SEND_CONTROL_OK) {
93  ACE_TEXT("(%P|%t) ERROR: ")
94  ACE_TEXT("MulticastSession::send_control: ")
95  ACE_TEXT("send_control failed: %d!\n"),
96  error));
97  return;
98  }
99 }
100 
101 bool
103  const Message_Block_Ptr& control)
104 {
105  switch (submessage_id) {
106  case MULTICAST_SYN:
107  syn_received(control);
108  break;
109 
110  case MULTICAST_SYNACK:
111  synack_received(control);
112  break;
113 
114  default:
115  return false;
116  }
117 
118  return true;
119 }
120 
121 void
123 {
124  if (this->active_) return; // pub send syn, then doesn't receive them.
125 
126  const TransportHeader& header =
128 
129  // Not from the remote peer for this session.
130  if (this->remote_peer_ != header.source_) return;
131 
132  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
133 
134  MulticastPeer local_peer;
135  GUID_t remote_writer;
136  GUID_t local_reader;
137  serializer >> local_peer; // sent as remote_peer
138  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&remote_writer), sizeof(remote_writer));
139  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&local_reader), sizeof(local_reader));
140 
141  // Ignore sample if not destined for us:
142  if (local_peer != this->link_->local_peer()) return;
143 
144  bool call_passive_connection = false;
145  bool call_send_synack = true;
146  {
147  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
148  PendingRemoteMap::const_iterator pos1 = pending_remote_map_.find(local_reader);
149  if (pos1 == pending_remote_map_.end()) {
150  call_send_synack = false;
151  } else {
152  RepoIdSet::const_iterator pos2 = pos1->second.find(remote_writer);
153  if (pos2 == pos1->second.end()) {
154  call_send_synack = false;
155  }
156  }
157 
159  "(%P|%t) MulticastSession[%C]::syn_received "
160  "local %#08x%08x %C remote %#08x%08x %C\n",
161  config_name.c_str(),
162  (unsigned int)(this->link()->local_peer() >> 32),
163  (unsigned int) this->link()->local_peer(),
164  LogGuid(local_reader).c_str(),
165  (unsigned int)(this->remote_peer_ >> 32),
166  (unsigned int) this->remote_peer_,
167  LogGuid(remote_writer).c_str()),
168  2);
169 
170  if (!this->acked_) {
171  this->acked_ = true;
172  syn_hook(header.sequence_);
173  call_passive_connection = true;
174  }
175  }
176 
177  if (call_passive_connection) {
178  MulticastTransport_rch transport = link_->transport();
179  if (transport) {
180  transport->passive_connection(link_->local_peer(), remote_peer_);
181  }
182  }
183 
184  // MULTICAST_SYN control samples are always positively
185  // acknowledged by a matching remote peer:
186  if (call_send_synack) {
187  send_synack(local_reader, remote_writer);
188  }
189 }
190 
191 void
193 {
194  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
195  for (PendingRemoteMap::const_iterator pos1 = pending_remote_map_.begin(), limit = pending_remote_map_.end();
196  pos1 != limit; ++pos1) {
197  const GUID_t& local_writer = pos1->first;
198  for (RepoIdSet::const_iterator pos2 = pos1->second.begin(), limit = pos1->second.end(); pos2 != limit; ++pos2) {
199  const GUID_t& remote_reader = *pos2;
200  send_syn(local_writer, remote_reader);
201  }
202  }
203 
204  // Exponential back-off.
205  syn_delay_ *= 2;
206  syn_watchdog_->schedule(syn_delay_);
207 }
208 
209 void
210 MulticastSession::send_syn(const GUID_t& local_writer,
211  const GUID_t& remote_reader)
212 {
213  size_t len = sizeof(this->remote_peer_) + 2 * sizeof(GUID_t);
214 
215  Message_Block_Ptr data( new ACE_Message_Block(len));
216 
217  Serializer serializer(data.get(), encoding_kind);
218 
219  serializer << this->remote_peer_;
220  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&local_writer), sizeof(local_writer));
221  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&remote_reader), sizeof(remote_reader));
222 
224  "(%P|%t) MulticastSession[%C]::send_syn "
225  "local %#08x%08x %C remote %#08x%08x %C\n",
226  config_name.c_str(),
227  (unsigned int)(this->link()->local_peer() >> 32),
228  (unsigned int) this->link()->local_peer(),
229  LogGuid(local_writer).c_str(),
230  (unsigned int)(this->remote_peer_ >> 32),
231  (unsigned int) this->remote_peer_,
232  LogGuid(remote_reader).c_str()),
233  2);
234 
235  // Send control sample to remote peer:
237 }
238 
239 void
241 {
242  if (!this->active_) return; // sub send synack, then doesn't receive them.
243 
244  // Already received ack.
245  //if (this->acked()) return;
246 
247  const TransportHeader& header =
249 
250  // Not from the remote peer for this session.
251  if (this->remote_peer_ != header.source_) return;
252 
253  Serializer serializer(control.get(), encoding_kind, header.swap_bytes());
254 
255  MulticastPeer local_peer;
256  GUID_t remote_reader;
257  GUID_t local_writer;
258  serializer >> local_peer; // sent as remote_peer
259  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&remote_reader), sizeof(remote_reader));
260  serializer.read_octet_array(reinterpret_cast<ACE_CDR::Octet*>(&local_writer), sizeof(local_writer));
261 
262  // Ignore sample if not destined for us:
263  if (local_peer != this->link_->local_peer()) return;
264 
266  "(%P|%t) MulticastSession[%C]::synack_received "
267  "local %#08x%08x %C remote %#08x%08x %C\n",
268  config_name.c_str(),
269  (unsigned int)(this->link()->local_peer() >> 32),
270  (unsigned int) this->link()->local_peer(),
271  LogGuid(local_writer).c_str(),
272  (unsigned int)(this->remote_peer_ >> 32),
273  (unsigned int) this->remote_peer_,
274  LogGuid(remote_reader).c_str()),
275  2);
276 
277  {
278  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
279  this->acked_ = true;
280  remove_remote_i(local_writer, remote_reader);
281  }
282 
283  this->link_->invoke_on_start_callbacks(local_writer, remote_reader, true);
284 }
285 
286 void
288  const GUID_t& remote_writer)
289 {
290  size_t len = sizeof(this->remote_peer_) + 2 * sizeof(GUID_t);
291 
292  Message_Block_Ptr data(new ACE_Message_Block(len));
293 
294  Serializer serializer(data.get(), encoding_kind);
295 
296  serializer << this->remote_peer_;
297  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&local_reader), sizeof(local_reader));
298  serializer.write_octet_array(reinterpret_cast<const ACE_CDR::Octet*>(&remote_writer), sizeof(remote_writer));
299 
300  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastSession[%C]::send_synack "
301  "local %#08x%08x %C remote %#08x%08x %C active %d\n",
302  config_name.c_str(),
303  (unsigned int)(this->link()->local_peer() >> 32),
304  (unsigned int) this->link()->local_peer(),
305  LogGuid(local_reader).c_str(),
306  (unsigned int)(this->remote_peer_ >> 32),
307  (unsigned int) this->remote_peer_,
308  LogGuid(remote_writer).c_str(),
309  this->active_ ? 1 : 0), 2);
310 
311  // Send control sample to remote peer:
313 
314  // Send naks before sending synack to
315  // reduce wait time for resends from remote.
316  send_naks();
317 }
318 
319 void
321 {
322  this->syn_watchdog_->cancel();
323 }
324 
325 bool
327  const TransportHeader& header)
328 {
329  return this->reassembly_.reassemble(header.sequence_,
330  header.first_fragment_,
331  data);
332 }
333 
334 void
336 {
337  const GuidConverter conv(local);
338  if (conv.isWriter()) {
339  // Active peers schedule a watchdog timer to initiate a 2-way
340  // handshake to verify that passive endpoints can send/receive
341  // data reliably. This process must be executed using the
342  // transport reactor thread to prevent blocking.
343  // Only publisher send syn so just schedule for pub role.
344  this->start_syn();
345  }
346 }
347 
348 void
350  const GUID_t& remote)
351 {
352  const GuidConverter conv(local);
353 
354  {
355  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
356  pending_remote_map_[local].insert(remote);
357  }
358 
359  if (conv.isWriter()) {
360  // Active peers schedule a watchdog timer to initiate a 2-way
361  // handshake to verify that passive endpoints can send/receive
362  // data reliably. This process must be executed using the
363  // transport reactor thread to prevent blocking.
364  // Only publisher send syn so just schedule for pub role.
365  this->start_syn();
366  }
367 }
368 
369 void
371  const GUID_t& remote)
372 {
373  ACE_GUARD(ACE_SYNCH_MUTEX, guard, this->ack_lock_);
374  remove_remote_i(local, remote);
375 }
376 
377 void
379  const GUID_t& remote)
380 {
381  const GuidConverter conv(local);
382 
383  const bool empty_before = pending_remote_map_.empty();
384  pending_remote_map_[local].erase(remote);
385  if (pending_remote_map_[local].empty()) {
386  pending_remote_map_.erase(local);
387  }
388  const bool empty = pending_remote_map_.empty() && !empty_before;
389 
390  if (conv.isWriter() && empty && this->syn_watchdog_) {
391  this->syn_watchdog_->cancel();
392  }
393 }
394 
395 } // namespace DCPS
396 } // namespace OpenDDS
397 
virtual bool control_received(char submessage_id, const Message_Block_Ptr &control)
ACE_INT64 MulticastPeer
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
#define ACE_SYNCH_MUTEX
void synack_received(const Message_Block_Ptr &control)
MulticastTransport_rch transport()
bool swap_bytes() const
Determine if the serializer should swap bytes.
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:628
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
const char * c_str() const
void syn_received(const Message_Block_Ptr &control)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
bool reassemble(ReceivedDataSample &data, const TransportHeader &header)
LM_DEBUG
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
void send_control(char submessage_id, Message_Block_Ptr data)
void remove_remote(const GUID_t &local, const GUID_t &remote)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
Defines class that represents a transport packet header.
void add_remote(const GUID_t &local)
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
MulticastSession(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)
void send_synack(const GUID_t &local_reader, const GUID_t &remote_writer)
bool reassemble(const SequenceNumber &transportSeq, bool firstFrag, ReceivedDataSample &data, ACE_UINT32 total_frags=0)
#define VDBG_LVL(DBG_ARGS, LEVEL)
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:668
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void send_syn(const GUID_t &local_writer, const GUID_t &remote_reader)
void send_all_syn(const MonotonicTimePoint &now)
void remove_remote_i(const GUID_t &local, const GUID_t &remote)
#define TheServiceParticipant
bool isWriter() const
Returns true if the GUID represents a writer entity.
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void syn_hook(const SequenceNumber &)
RcHandle< Sporadic > syn_watchdog_
const TimeDuration initial_syn_delay_
MulticastReceiveStrategy * receive_strategy()
void invoke_on_start_callbacks(bool success)
Definition: DataLink.cpp:194