OpenDDS  Snapshot(2023/04/28-20:55)
MulticastDataLink.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "MulticastDataLink.h"
9 #include "MulticastSession.h"
11 #include "MulticastTransport.h"
12 #include "MulticastSendStrategy.h"
14 
17 #include <dds/DCPS/GuidConverter.h>
19 
20 #include <tao/ORB_Core.h>
21 
22 #include <ace/Default_Constants.h>
23 #include <ace/Global_Macros.h>
24 #include <ace/Log_Msg.h>
25 #include <ace/Truncate.h>
26 #include <ace/OS_NS_sys_socket.h>
27 
28 #ifndef __ACE_INLINE__
29 # include "MulticastDataLink.inl"
30 #endif /* __ACE_INLINE__ */
31 
33 
34 namespace OpenDDS {
35 namespace DCPS {
36 
37 namespace {
38  const Encoding::Kind encoding_kind = Encoding::KIND_UNALIGNED_CDR;
39 }
40 
42  const MulticastSessionFactory_rch& session_factory,
43  MulticastPeer local_peer,
44  const MulticastInst_rch& config,
45  const ReactorTask_rch &reactor_task,
46  bool is_active)
47 : DataLink(transport, 0 /*priority*/, false /*loopback*/, is_active),
48  session_factory_(session_factory),
49  local_peer_(local_peer),
50  reactor_task_(reactor_task),
51  send_strategy_(make_rch<MulticastSendStrategy>(this)),
52  recv_strategy_(make_rch<MulticastReceiveStrategy>(this))
53 {
54  // A send buffer may be bound to the send strategy to ensure a
55  // configured number of most-recent datagrams are retained:
57  const size_t nak_depth = config ? config->nak_depth_ : MulticastInst::DEFAULT_NAK_DEPTH;
58  const size_t default_max_samples = DEFAULT_CONFIG_MAX_SAMPLES_PER_PACKET;
59  const size_t max_samples_per_packet = config ? config->max_samples_per_packet_ : default_max_samples;
60  send_buffer_.reset(new SingleSendBuffer(nak_depth, max_samples_per_packet));
62  }
63 }
64 
66 {
67  if (send_buffer_) {
69  }
70 }
71 
72 
73 bool
75 {
76  MulticastInst_rch cfg = config();
77  if (!cfg) {
78  return false;
79  }
80 
81  const std::string& net_if = cfg->local_address_;
82 #ifdef ACE_HAS_MAC_OSX
85 #endif
86  if (this->socket_.join(group_address, 1,
87  net_if.empty() ? 0 :
88  ACE_TEXT_CHAR_TO_TCHAR(net_if.c_str())) != 0) {
90  ACE_TEXT("(%P|%t) ERROR: MulticastDataLink::join: ")
91  ACE_TEXT("ACE_SOCK_Dgram_Mcast::join failed %m.\n")),
92  false);
93  }
94  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) MulticastDataLink::join OK\n")), 6);
95 
96  ACE_HANDLE handle = this->socket_.get_handle();
97 
98  if (!OpenDDS::DCPS::set_socket_multicast_ttl(this->socket_, cfg->ttl_)) {
100  ACE_TEXT("(%P|%t) ERROR: ")
101  ACE_TEXT("MulticastDataLink::join: ")
102  ACE_TEXT("OpenDDS::DCPS::set_socket_multicast_ttl failed.\n")),
103  false);
104  }
105 
106  int rcv_buffer_size = ACE_Utils::truncate_cast<int>(cfg->rcv_buffer_size_);
107  if (rcv_buffer_size != 0
108  && ACE_OS::setsockopt(handle, SOL_SOCKET,
109  SO_RCVBUF,
110  (char *) &rcv_buffer_size,
111  sizeof (int)) < 0) {
113  ACE_TEXT("(%P|%t) ERROR: ")
114  ACE_TEXT("MulticastDataLink::join: ")
115  ACE_TEXT("ACE_OS::setsockopt RCVBUF failed.\n")),
116  false);
117  }
118 
119 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
120  int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
121 
122  if (ACE_OS::setsockopt(handle, SOL_SOCKET,
123  SO_SNDBUF,
124  (char *) &snd_size,
125  sizeof(snd_size)) < 0
126  && errno != ENOTSUP) {
128  ACE_TEXT("(%P|%t) ERROR: ")
129  ACE_TEXT("MulticastDataLink::join: ")
130  ACE_TEXT("ACE_OS::setsockopt SNDBUF failed to set the send buffer size to %d errno %m\n"),
131  snd_size),
132  false);
133  }
134 #endif /* ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
135 
136  if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
137  static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
138  != 0) {
139  this->socket_.close();
141  ACE_TEXT("(%P|%t) ERROR: ")
142  ACE_TEXT("MulticastDataLink::join: ")
143  ACE_TEXT("DataLink::start failed!\n")),
144  false);
145  }
146 
147  return true;
148 }
149 
152 {
154  guard,
155  this->session_lock_,
157 
158  MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
159  if (it != this->sessions_.end()) {
160  return it->second;
161  }
162  else return MulticastSession_rch();
163 }
164 
167 {
169  guard,
170  this->session_lock_,
172 
173  MulticastSessionMap::iterator it(this->sessions_.find(remote_peer));
174  if (it != this->sessions_.end()) {
175  return it->second;
176  }
177 
178  MulticastSession_rch session;
180  if (mt) {
181  session = session_factory_->create(mt->reactor_task()->interceptor(), this, remote_peer);
182  if (session.is_nil()) {
184  ACE_TEXT("(%P|%t) ERROR: ")
185  ACE_TEXT("MulticastDataLink::find_or_create_session: ")
186  ACE_TEXT("failed to create session for remote peer: %#08x%08x!\n"),
187  (unsigned int) (remote_peer >> 32),
188  (unsigned int) remote_peer),
190  }
191 
192  std::pair<MulticastSessionMap::iterator, bool> pair = this->sessions_.insert(
193  MulticastSessionMap::value_type(remote_peer, session));
194  if (pair.first == this->sessions_.end()) {
196  ACE_TEXT("(%P|%t) ERROR: ")
197  ACE_TEXT("MulticastDataLink::find_or_create_session: ")
198  ACE_TEXT("failed to insert session for remote peer: %#08x%08x!\n"),
199  (unsigned int) (remote_peer >> 32),
200  (unsigned int) remote_peer),
202  }
203  }
204  return session;
205 }
206 
207 bool
209 {
211  guard,
212  this->session_lock_,
213  false);
214 
215  MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
216  if (it == this->sessions_.end() && is_active()) {
217  return false;
218  }
219  if (it != this->sessions_.end() && it->second->acked()) {
220  return it->second->check_header(header);
221  }
222 
223  return true;
224 }
225 
226 bool
228 {
229  if (header.message_id_ == TRANSPORT_CONTROL) return true;
230 
232  guard,
233  this->session_lock_,
234  false);
235 
236  // Skip data sample unless there is a session for it.
237  return (this->sessions_.count(receive_strategy()->received_header().source_) > 0);
238 }
239 
240 bool
242  const TransportHeader& header)
243 {
245  guard,
246  this->session_lock_,
247  false);
248 
249  MulticastSessionMap::iterator it(this->sessions_.find(header.source_));
250  if (it == this->sessions_.end()) return false;
251  if (it->second->acked()) {
252  return it->second->reassemble(data, header);
253  }
254  return false;
255 }
256 
257 int
259  const GUID_t& lsi,
261  bool reliable)
262 {
263  int result = DataLink::make_reservation(rpi, lsi, trl, reliable);
264  if (reliable) {
265  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(rpi).federationId() << 32
267  MulticastSession_rch session = find_session(remote_peer);
268  if (session) {
269  session->add_remote(lsi, rpi);
270  }
271  } else {
272  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(rpi).federationId() << 32
274  MulticastSession_rch session = find_session(remote_peer);
275  if (session) {
276  session->add_remote(lsi);
277  }
278  }
279  return result;
280 }
281 
282 void
284  const GUID_t& local_id)
285 {
286  const MulticastPeer remote_peer = (ACE_INT64)RepoIdConverter(remote_id).federationId() << 32
287  | RepoIdConverter(remote_id).participantId();
288  MulticastSession_rch session = find_session(remote_peer);
289  if (session) {
290  session->remove_remote(local_id, remote_id);
291  }
292 }
293 
294 void
296 {
297  switch (sample.header_.message_id_) {
298  case TRANSPORT_CONTROL: {
299  // Transport control samples are delivered to all sessions
300  // regardless of association status:
301  {
302  Message_Block_Ptr payload(sample.data());
303  char* const ptr = payload ? payload->rd_ptr() : 0;
304 
306  guard,
307  this->session_lock_);
308 
309  const TransportHeader& theader = receive_strategy()->received_header();
310 
311  if (!is_active() && sample.header_.submessage_id_ == MULTICAST_SYN &&
312  sessions_.find(theader.source_) == sessions_.end()) {
313  // We have received a SYN but there is no session (yet) for this source.
314  // Depending on the data, we may need to send SYNACK.
315 
316  guard.release();
317  syn_received_no_session(theader.source_, payload,
318  theader.swap_bytes());
319 
320  guard.acquire();
321  MulticastSessionMap::iterator s_itr = sessions_.find(theader.source_);
322  if (s_itr != sessions_.end()) {
323  s_itr->second->record_header_received(theader);
324  }
325 
326  if (ptr) {
327  payload->rd_ptr(ptr);
328  }
329  return;
330  }
331 
332  MulticastSessionMap temp_sessions(sessions_);
333  guard.release();
334 
335  for (MulticastSessionMap::iterator it(temp_sessions.begin());
336  it != temp_sessions.end(); ++it) {
337  it->second->control_received(sample.header_.submessage_id_,
338  payload);
339  it->second->record_header_received(theader);
340 
341  // reset read pointer
342  if (ptr) {
343  payload->rd_ptr(ptr);
344  }
345  }
346  }
347  } break;
348 
349  default:
350 
351  if (ready_to_deliver(sample)) {
352  data_received(sample);
353  }
354  break;
355  }
356 }
357 
358 bool
360 {
362  guard,
363  this->session_lock_, false);
364 
365  const TransportHeader& theader = receive_strategy()->received_header();
366 
367  MulticastSessionMap::iterator session_it = sessions_.find(theader.source_);
368  if (session_it != sessions_.end()) {
369  MulticastSession_rch sess_rch(session_it->second);
370  guard.release();
371  return sess_rch->ready_to_deliver(theader, data);
372  }
373 
374  return true;
375 }
376 
377 void
379 {
381  MulticastPeer remote_source = (ACE_INT64)RepoIdConverter(remote).federationId() << 32
382  | RepoIdConverter(remote).participantId();
383  MulticastSessionMap::iterator session_it = sessions_.find(remote_source);
384  if (session_it != sessions_.end() && session_it->second->is_reliable()) {
385  session_it->second->release_remote(remote);
386  }
387 }
388 
389 void
391  const Message_Block_Ptr& data,
392  bool swap_bytes)
393 {
394  Serializer serializer_read(data.get(), encoding_kind, swap_bytes);
395 
397  serializer_read >> local_peer;
398 
399  if (local_peer != local_peer_) {
400  return;
401  }
402 
403  {
404  MulticastInst_rch cfg = config();
405  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastDataLink[%C]::syn_received_no_session "
406  "send_synack local %#08x%08x remote %#08x%08x\n",
407  cfg ? cfg->name().c_str() : "",
408  (unsigned int) (local_peer >> 32),
409  (unsigned int) local_peer,
410  (unsigned int) (source >> 32),
411  (unsigned int) source), 2);
412  }
413 
414  Message_Block_Ptr synack_data(new ACE_Message_Block(sizeof(MulticastPeer)));
415 
416  Serializer serializer_write(synack_data.get(), encoding_kind);
417  serializer_write << source;
418 
420  Message_Block_Ptr control(
421  create_control(MULTICAST_SYNACK, header, move(synack_data)));
422 
423  if (control == 0) {
425  ACE_TEXT("(%P|%t) ERROR: ")
426  ACE_TEXT("MulticastDataLink::syn_received_no_session: ")
427  ACE_TEXT("create_control failed!\n")));
428  return;
429  }
430 
431  const int error = send_control(header, move(control));
432  if (error != SEND_CONTROL_OK) {
433  ACE_ERROR((LM_ERROR, "(%P|%t) MulticastDataLink::syn_received_no_session: "
434  "ERROR: send_control failed: %d!\n", error));
435  return;
436  }
437 
439  if (mt) {
440  mt->passive_connection(local_peer, source);
441  }
442 }
443 
444 void
446 {
448  guard,
449  this->session_lock_);
450 
451  for (MulticastSessionMap::iterator it(this->sessions_.begin());
452  it != this->sessions_.end(); ++it) {
453  it->second->stop();
454  }
455  this->sessions_.clear();
456 
457  this->socket_.close();
458 }
459 
460 void
462 {
463  if (send_buffer_) {
464  send_buffer_->retain_all(localId);
465  send_buffer_.reset();
466  }
467 }
468 
469 } // namespace DCPS
470 } // namespace OpenDDS
471 
DataSampleHeader header_
The demarshalled sample header.
ACE_INT64 MulticastPeer
#define ACE_ERROR(X)
int setsockopt(ACE_HANDLE handle, int level, int optname, const char *optval, int optlen)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual int requires_send_buffer() const =0
#define ENOTSUP
char message_id_
The enum MessageId.
MulticastTransport_rch transport()
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
Conversion processing and value testing utilities for DCPS Information Repository identifiers...
MulticastDataLink(const MulticastTransport_rch &transport, const MulticastSessionFactory_rch &session_factory, MulticastPeer local_peer, const MulticastInst_rch &config, const ReactorTask_rch &reactor_task, bool is_active)
virtual MulticastSession_rch create(RcHandle< ReactorInterceptor > interceptor, MulticastDataLink *link, MulticastPeer remote_peer)=0
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
bool swap_bytes() const
Determine if the serializer should swap bytes.
ACE_Message_Block * create_control(char submessage_id, DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:628
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
ParticipantId participantId() const
Get the participant id from the GUID.
void opts(int opts)
RcHandle< MulticastSession > MulticastSession_rch
OpenDDS::Federator::RepoKey federationId() const
Get the federeation id from the GUID.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
char * rd_ptr(void) const
void client_stop(const GUID_t &localId)
#define SOL_SOCKET
void release_reservations_i(const GUID_t &remote_id, const GUID_t &local_id)
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
#define SO_SNDBUF
LM_DEBUG
MulticastReceiveStrategy_rch recv_strategy_
TO truncate_cast(FROM val)
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
MulticastSendStrategy_rch send_strategy_
#define ACE_SYNCH_RECURSIVE_MUTEX
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
static const size_t DEFAULT_NAK_DEPTH
Definition: MulticastInst.h:30
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
MulticastSession_rch find_session(MulticastPeer remote_peer)
Defines class that represents a transport packet header.
int make_reservation(const GUID_t &remote_publication_id, const GUID_t &local_subscription_id, const TransportReceiveListener_wrch &receive_listener, bool reliable)
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
#define ACE_DEFAULT_MAX_SOCKET_BUFSIZ
char submessage_id_
Implementation-specific sub-message Ids.
ACE_TEXT("TCP_Factory")
unique_ptr< SingleSendBuffer > send_buffer_
bool join(const ACE_INET_Addr &group_address)
void release_remote_i(const GUID_t &remote)
bool check_header(const TransportHeader &header)
void syn_received_no_session(MulticastPeer source, const Message_Block_Ptr &data, bool swap_bytes)
virtual int make_reservation(const GUID_t &remote_subscription_id, const GUID_t &local_publication_id, const TransportSendListener_wrch &send_listener, bool reliable)
Definition: DataLink.cpp:398
#define VDBG_LVL(DBG_ARGS, LEVEL)
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr data)
Definition: DataLink.cpp:668
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
MulticastSessionFactory_rch session_factory_
bool ready_to_deliver(const ReceivedDataSample &data)
long long ACE_INT64
ACE_SYNCH_RECURSIVE_MUTEX session_lock_
#define SO_RCVBUF
#define ACE_ERROR_RETURN(X, Y)
bool reassemble(ReceivedDataSample &data, const TransportHeader &header)
MulticastSession_rch find_or_create_session(MulticastPeer remote_peer)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int close(void)
MulticastReceiveStrategy * receive_strategy()
void sample_received(ReceivedDataSample &sample)