OpenDDS  Snapshot(2023/04/28-20:55)
RtpsUdpTransport.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "RtpsUdpTransport.h"
7 
8 #include "RtpsUdpDataLink.h"
9 #include "RtpsUdpInst.h"
10 #include "RtpsUdpInst_rch.h"
11 #include "RtpsUdpSendStrategy.h"
12 #include "RtpsUdpReceiveStrategy.h"
13 
14 #include <dds/OpenddsDcpsExtTypeSupportImpl.h>
15 
18 #include <dds/DCPS/LogAddr.h>
23 #include <dds/DCPS/RTPS/RtpsCoreTypeSupportImpl.h>
24 
25 #include <ace/CDR_Base.h>
26 #include <ace/Log_Msg.h>
27 #include <ace/Sock_Connect.h>
28 
30 
31 namespace OpenDDS {
32 namespace DCPS {
33 
35  : TransportImpl(inst)
36 #if defined(OPENDDS_SECURITY)
37  , local_crypto_handle_(DDS::HANDLE_NIL)
38 #endif
39 #ifdef OPENDDS_SECURITY
40  , ice_endpoint_(make_rch<IceEndpoint>(ref(*this)))
41  , relay_stun_task_falloff_(TimeDuration::zero_value)
42  , ice_agent_(ICE::Agent::instance())
43 #endif
44  , transport_statistics_(inst->name())
45 {
47  if (!(configure_i(inst) && open())) {
49  }
50 }
51 
54 {
56 }
57 
58 #ifdef OPENDDS_SECURITY
61 {
62  return ice_agent_;
63 }
64 #endif
65 
68 {
69 #ifdef OPENDDS_SECURITY
70  RtpsUdpInst_rch cfg = config();
71  return (cfg && cfg->use_ice()) ? static_rchandle_cast<ICE::Endpoint>(ice_endpoint_) : DCPS::WeakRcHandle<ICE::Endpoint>();
72 #else
74 #endif
75 }
76 
77 void
79 {
80  ACE_UNUSED_ARG(flag);
81 
82 #ifdef OPENDDS_SECURITY
83  RtpsUdpInst_rch cfg = config();
84  if (flag) {
85  {
88  }
90  } else {
91  if (!cfg || cfg->use_rtps_relay()) {
93  }
94  }
95 #endif
96 }
97 
98 void
100 {
101  ACE_UNUSED_ARG(flag);
102 
103 #ifdef OPENDDS_SECURITY
104  RtpsUdpInst_rch cfg = config();
105  if (flag) {
106  {
109  }
111  } else {
112  if (!cfg || !cfg->rtps_relay_only()) {
114  }
115  }
116 #endif
117 }
118 
119 void
121 {
122  ACE_UNUSED_ARG(after);
123 
124 #ifdef OPENDDS_SECURITY
125  RtpsUdpInst_rch cfg = config();
126  const bool before = cfg && cfg->use_ice();
127  if (cfg) {
128  cfg->use_ice(after);
129  }
130 
131  if (before && !after) {
132  stop_ice();
133  } else if (!before && after) {
134  start_ice();
135  }
136 #endif
137 }
138 
141 {
143 
144  RtpsUdpInst_rch cfg = config();
145  if (!cfg) {
146  return RtpsUdpDataLink_rch();
147  }
148 
150  assign(local_prefix_, local_prefix);
151 #ifdef OPENDDS_SECURITY
152  {
155  }
157 #endif
158  }
159 
160 #if defined(OPENDDS_SECURITY)
161  if (cfg->use_ice()) {
163  ri->execute_or_enqueue(make_rch<RemoveHandler>(unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
164 #ifdef ACE_HAS_IPV6
165  ri->execute_or_enqueue(make_rch<RemoveHandler>(ipv6_unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
166 #endif
167  }
168 #endif
169 
170  RtpsUdpDataLink_rch link = make_rch<RtpsUdpDataLink>(rchandle_from(this), local_prefix, config(), reactor_task(), ref(transport_statistics_), ref(transport_statistics_mutex_));
171 
172 #if defined(OPENDDS_SECURITY)
174 #endif
175 
176  if (!link->open(unicast_socket_
177 #ifdef ACE_HAS_IPV6
178  , ipv6_unicast_socket_
179 #endif
180  )) {
181 #ifdef ACE_HAS_IPV6
182  const ACE_HANDLE v6handle = ipv6_unicast_socket_.get_handle();
183 #else
184  const ACE_HANDLE v6handle = ACE_INVALID_HANDLE;
185 #endif
187  ACE_TEXT("(%P|%t) ERROR: ")
188  ACE_TEXT("RtpsUdpTransport::make_datalink: ")
189  ACE_TEXT("failed to open DataLink for sockets %d %d\n"),
190  unicast_socket_.get_handle(), v6handle
191  ));
192  return RtpsUdpDataLink_rch();
193  }
194 
195  // RtpsUdpDataLink now owns the socket
196  unicast_socket_.set_handle(ACE_INVALID_HANDLE);
197 #ifdef ACE_HAS_IPV6
198  ipv6_unicast_socket_.set_handle(ACE_INVALID_HANDLE);
199 #endif
200 
201  return link;
202 }
203 
206  const ConnectionAttribs& attribs,
207  const TransportClient_rch& client)
208 {
209  bit_sub_ = client->get_builtin_subscriber_proxy();
210 
211  GuardThreadType guard_links(links_lock_);
212 
213  if (is_shut_down()) {
214  return AcceptConnectResult();
215  }
216 
217  if (!link_) {
219  if (!link_) {
220  return AcceptConnectResult();
221  }
222  }
223 
224  RtpsUdpDataLink_rch link = link_;
225 
226  if (use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, remote.discovery_blob_, remote.participant_discovered_at_,
227  remote.context_,
228  attribs.local_reliable_, remote.reliable_,
229  attribs.local_durable_, remote.durable_, attribs.max_sn_, client)) {
230  return AcceptConnectResult(link);
231  }
232 
234  add_pending_connection(client, link);
235  VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::connect_datalink pending.\n"), 2);
237 }
238 
241  const ConnectionAttribs& attribs,
242  const TransportClient_rch& client)
243 {
244  bit_sub_ = client->get_builtin_subscriber_proxy();
245 
246  GuardThreadType guard_links(links_lock_);
247 
248  if (is_shut_down()) {
249  return AcceptConnectResult();
250  }
251 
252  if (!link_) {
254  if (!link_) {
255  return AcceptConnectResult();
256  }
257  }
258  RtpsUdpDataLink_rch link = link_;
259 
260  if (use_datalink(attribs.local_id_, remote.repo_id_, remote.blob_, remote.discovery_blob_, remote.participant_discovered_at_,
261  remote.context_,
262  attribs.local_reliable_, remote.reliable_,
263  attribs.local_durable_, remote.durable_, attribs.max_sn_, client)) {
264  return AcceptConnectResult(link);
265  }
266 
268  add_pending_connection(client, link);
269  VDBG_LVL((LM_DEBUG, "(%P|%t) RtpsUdpTransport::accept_datalink pending.\n"), 2);
271 }
272 
273 
274 void
276  const GUID_t& remote_id,
277  bool disassociate,
278  bool association_failed)
279 {
280  if (disassociate || association_failed) {
281  GuardThreadType guard_links(links_lock_);
282  if (link_) {
283  TransportClient_rch c = client.lock();
284  if (c) {
285  link_->disassociated(c->get_guid(), remote_id);
286  }
287  }
288  }
289 
290  {
292  typedef PendConnMap::iterator iter_t;
293  const std::pair<iter_t, iter_t> range =
294  pending_connections_.equal_range(client);
295  for (iter_t iter = range.first; iter != range.second; ++iter) {
296  iter->second->remove_on_start_callback(client, remote_id);
297  }
298  pending_connections_.erase(range.first, range.second);
299  }
300 }
301 
302 bool
304  const GUID_t& remote_id,
305  const TransportBLOB& remote_data,
306  const TransportBLOB& discovery_locator,
307  const MonotonicTime_t& participant_discovered_at,
308  ACE_CDR::ULong participant_flags,
309  bool local_reliable, bool remote_reliable,
310  bool local_durable, bool remote_durable,
311  SequenceNumber max_sn,
312  const TransportClient_rch& client)
313 {
314  AddrSet uc_addrs, mc_addrs;
315  bool requires_inline_qos;
316  unsigned int blob_bytes_read;
317  get_connection_addrs(remote_data, &uc_addrs, &mc_addrs, &requires_inline_qos, &blob_bytes_read);
318 
319  NetworkAddress disco_addr_hint;
320  if (discovery_locator.length()) {
321  AddrSet disco_uc_addrs, disco_mc_addrs;
322  bool disco_requires_inline_qos;
323  unsigned int disco_blob_bytes_read;
324  get_connection_addrs(discovery_locator, &disco_uc_addrs, &disco_mc_addrs, &disco_requires_inline_qos, &disco_blob_bytes_read);
325 
326  for (AddrSet::const_iterator it = disco_uc_addrs.begin(), limit = disco_uc_addrs.end(); it != limit; ++it) {
327  for (AddrSet::const_iterator it2 = uc_addrs.begin(), limit2 = uc_addrs.end(); it2 != limit2; ++it2) {
328  if (it->addr_bytes_equal(*it2) && DCPS::is_more_local(disco_addr_hint, *it2)) {
329  disco_addr_hint = *it2;
330  }
331  }
332  }
333  }
334 
335  if (link_) {
336  return link_->associated(local_id, remote_id, local_reliable, remote_reliable,
337  local_durable, remote_durable,
338  participant_discovered_at, participant_flags, max_sn, client,
339  uc_addrs, mc_addrs, disco_addr_hint, requires_inline_qos);
340  }
341 
342  return true;
343 }
344 
345 #if defined(OPENDDS_SECURITY)
346 void
348 {
349  RtpsUdpDataLink_rch link;
350  {
352  local_crypto_handle_ = pch;
353  link = link_;
354  }
355  if (link) {
356  link->local_crypto_handle(pch);
357  }
358 }
359 #endif
360 
361 void
363  AddrSet* uc_addrs,
364  AddrSet* mc_addrs,
365  bool* requires_inline_qos,
366  unsigned int* blob_bytes_read) const
367 {
368  using namespace OpenDDS::RTPS;
369  LocatorSeq locators;
370  DDS::ReturnCode_t result =
371  blob_to_locators(remote, locators, requires_inline_qos, blob_bytes_read);
372  if (result != DDS::RETCODE_OK) {
373  return;
374  }
375 
376  for (CORBA::ULong i = 0; i < locators.length(); ++i) {
377  ACE_INET_Addr addr;
378  // If conversion was successful
379  if (locator_to_address(addr, locators[i], false) == 0) {
380  if (addr.is_multicast()) {
381  RtpsUdpInst_rch cfg = config();
382  if (cfg && cfg->use_multicast_ && mc_addrs) {
383  mc_addrs->insert(NetworkAddress(addr));
384  }
385  } else if (uc_addrs) {
386  uc_addrs->insert(NetworkAddress(addr));
387  }
388  }
389  }
390 }
391 
392 bool
394 {
395  RtpsUdpInst_rch cfg = config();
396  if (cfg) {
397  cfg->populate_locator(info, flags);
398  return true;
399  }
400  return false;
401 }
402 
403 void
405  const GUID_t& writerid,
406  const GUID_t& readerid,
407  const TransportLocatorSeq& locators,
409 {
410  if (is_shut_down()) {
411  return;
412  }
413 
414  RtpsUdpInst_rch cfg = config();
415  if (!cfg) {
416  return;
417  }
418 
419  const TransportBLOB* blob = cfg->get_blob(locators);
420  if (!blob) {
421  return;
422  }
423 
424  GuardThreadType guard_links(links_lock_);
425 
426  if (!link_) {
427  link_ = make_datalink(participant.guidPrefix);
428  }
429 
430  AddrSet uc_addrs;
431  get_connection_addrs(*blob, &uc_addrs);
432  link_->register_for_reader(writerid, readerid, uc_addrs, listener);
433 }
434 
435 void
437  const GUID_t& writerid,
438  const GUID_t& readerid)
439 {
440  GuardThreadType guard_links(links_lock_);
441 
442  if (link_) {
443  link_->unregister_for_reader(writerid, readerid);
444  }
445 }
446 
447 void
449  const GUID_t& readerid,
450  const GUID_t& writerid,
451  const TransportLocatorSeq& locators,
452  DiscoveryListener* listener)
453 {
454  if (is_shut_down()) {
455  return;
456  }
457 
458  RtpsUdpInst_rch cfg = config();
459  if (!cfg) {
460  return;
461  }
462 
463  const TransportBLOB* blob = cfg->get_blob(locators);
464  if (!blob) {
465  return;
466  }
467 
468  GuardThreadType guard_links(links_lock_);
469 
470  if (!link_) {
471  link_ = make_datalink(participant.guidPrefix);
472  }
473 
474  AddrSet uc_addrs;
475  get_connection_addrs(*blob, &uc_addrs);
476  link_->register_for_writer(readerid, writerid, uc_addrs, listener);
477 }
478 
479 void
481  const GUID_t& readerid,
482  const GUID_t& writerid)
483 {
484  GuardThreadType guard_links(links_lock_);
485 
486  if (link_) {
487  link_->unregister_for_writer(readerid, writerid);
488  }
489 }
490 
491 void
493  const TransportLocatorSeq& locators)
494 {
495  if (is_shut_down()) {
496  return;
497  }
498 
499  RtpsUdpInst_rch cfg = config();
500  if (!cfg) {
501  return;
502  }
503 
504  const TransportBLOB* blob = cfg->get_blob(locators);
505  if (!blob) {
506  return;
507  }
508 
509  GuardThreadType guard_links(links_lock_);
510 
511  if (link_) {
512  AddrSet uc_addrs, mc_addrs;
513  bool requires_inline_qos;
514  unsigned int blob_bytes_read;
515  get_connection_addrs(*blob, &uc_addrs, &mc_addrs, &requires_inline_qos, &blob_bytes_read);
516  link_->update_locators(remote, uc_addrs, mc_addrs, requires_inline_qos, false);
517  }
518 }
519 
520 void
522  TransportLocator& tl)
523 {
524  if (is_shut_down()) {
525  return;
526  }
527 
528  GuardThreadType guard_links(links_lock_);
529 
530  bool expects_inline_qos = false;
531  NetworkAddress addr;
532  if (link_) {
533  addr = link_->get_last_recv_address(remote);
534  if (addr == NetworkAddress()) {
535  return;
536  }
537  GUIDSeq_var guids(new GUIDSeq);
538  GUIDSeq& ref = static_cast<GUIDSeq&>(guids);
539  ref.length(1);
540  ref[0] = remote;
541  expects_inline_qos = link_->requires_inline_qos(guids);
542  }
543 
544  LocatorSeq locators;
545  locators.length(1);
546  address_to_locator(locators[0], addr.to_addr());
547 
549  size_t size = serialized_size(encoding, locators);
550  primitive_serialized_size_boolean(encoding, size);
551 
552  ACE_Message_Block mb_locator(size);
553  Serializer ser_loc(&mb_locator, encoding);
554  ser_loc << locators;
555  ser_loc << ACE_OutputCDR::from_boolean(expects_inline_qos);
556 
557  tl.transport_type = "rtps_udp";
558  RTPS::message_block_to_sequence(mb_locator, tl.data);
559 }
560 
561 void
563 {
564 #ifdef OPENDDS_SECURITY
565  relay_stun_task_->cancel();
566  RtpsUdpInst_rch cfg = config();
567  {
570  }
572 #endif
573 }
574 
575 void
577 {
581 }
582 
583 bool
585 {
586  if (!config) {
587  return false;
588  }
589 
590  // Override with DCPSDefaultAddress.
591  if (config->local_address() == NetworkAddress() &&
592  TheServiceParticipant->default_address() != NetworkAddress()) {
593  config->local_address(TheServiceParticipant->default_address());
594  }
595  if (config->multicast_interface_.empty() &&
596  TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
597  config->multicast_interface_ = DCPS::LogAddr::ip(TheServiceParticipant->default_address().to_addr());
598  }
599 
600  // Open the socket here so that any addresses/ports left
601  // unspecified in the RtpsUdpInst are known by the time we get to
602  // connection_info_i(). Opening the sockets here also allows us to
603  // detect and report errors during DataReader/Writer setup instead
604  // of during association.
605 
606  ACE_INET_Addr address = config->local_address().to_addr();
607 
608  if (unicast_socket_.open(address, PF_INET) != 0) {
610  ACE_TEXT("(%P|%t) ERROR: ")
611  ACE_TEXT("RtpsUdpTransport::configure_i: open:")
612  ACE_TEXT("%m\n")),
613  false);
614  }
615 
616 #ifdef ACE_WIN32
617  // By default Winsock will cause reads to fail with "connection reset"
618  // when UDP sends result in ICMP "port unreachable" messages.
619  // The transport framework is not set up for this since returning <= 0
620  // from our receive_bytes causes the framework to close down the datalink
621  // which in this case is used to receive from multiple peers.
622  {
623  BOOL recv_udp_connreset = FALSE;
624  unicast_socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
625  }
626 #endif
627 
628  if (unicast_socket_.get_local_addr(address) != 0) {
630  ACE_TEXT("(%P|%t) ERROR: ")
631  ACE_TEXT("RtpsUdpTransport::configure_i: get_local_addr:")
632  ACE_TEXT("%m\n")),
633  false);
634  }
635 
636  config->local_address(NetworkAddress(address));
637 
638 #ifdef ACE_RECVPKTINFO
639  int sockopt = 1;
640  if (unicast_socket_.set_option(IPPROTO_IP, ACE_RECVPKTINFO, &sockopt, sizeof sockopt) == -1) {
641  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpTransport::configure_i: set_option: %m\n")), false);
642  }
643 #endif
644 
645 #ifdef ACE_HAS_IPV6
646  address = config->ipv6_local_address().to_addr();
647 
648  if (ipv6_unicast_socket_.open(address, PF_INET6) != 0) {
650  ACE_TEXT("(%P|%t) ERROR: ")
651  ACE_TEXT("RtpsUdpTransport::configure_i: open:")
652  ACE_TEXT("%m\n")),
653  false);
654  }
655 
656 #ifdef ACE_WIN32
657  // By default Winsock will cause reads to fail with "connection reset"
658  // when UDP sends result in ICMP "port unreachable" messages.
659  // The transport framework is not set up for this since returning <= 0
660  // from our receive_bytes causes the framework to close down the datalink
661  // which in this case is used to receive from multiple peers.
662  {
663  BOOL recv_udp_connreset = FALSE;
664  ipv6_unicast_socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
665  }
666 #endif
667 
668  if (ipv6_unicast_socket_.get_local_addr(address) != 0) {
670  ACE_TEXT("(%P|%t) ERROR: ")
671  ACE_TEXT("RtpsUdpTransport::configure_i: get_local_addr:")
672  ACE_TEXT("%m\n")),
673  false);
674  }
675 
676  NetworkAddress temp(address);
677  if (address.is_ipv4_mapped_ipv6() && temp.is_any()) {
678  temp = NetworkAddress(address.get_port_number(), "::");
679  }
680  config->ipv6_local_address(temp);
681 
682 #ifdef ACE_RECVPKTINFO6
683  if (ipv6_unicast_socket_.set_option(IPPROTO_IPV6, ACE_RECVPKTINFO6, &sockopt, sizeof sockopt) == -1) {
684  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsUdpTransport::configure_i: set_option: %m\n")), false);
685  }
686 #endif
687 #endif
688 
689  create_reactor_task(false, "RtpsUdpTransport" + config->name());
690 
692  job_queue_ = DCPS::make_rch<DCPS::JobQueue>(reactor);
693 
694 #ifdef OPENDDS_SECURITY
695  if (config->use_ice()) {
696  start_ice();
697  }
698 
700 #endif
701 
702  if (config->opendds_discovery_default_listener_) {
703  GuardThreadType guard_links(links_lock_);
704  link_ = make_datalink(config->opendds_discovery_guid_.guidPrefix);
705  link_->default_listener(*config->opendds_discovery_default_listener_);
706  }
707 
708 #ifdef OPENDDS_SECURITY
709  {
711  relay_stun_task_falloff_.set(config->heartbeat_period_);
712  }
714 #endif
715 
716  return true;
717 }
718 
720 {
721  GuardThreadType guard_links(links_lock_);
722  const RtpsUdpDataLink_rch link = link_;
723  guard_links.release();
724  if (link) {
725  link->client_stop(localId);
726  }
727 }
728 
729 void
731 {
732 #ifdef OPENDDS_SECURITY
733  RtpsUdpInst_rch cfg = config();
734 
735  if (cfg && cfg->use_ice()) {
736  stop_ice();
737  }
738 
739  relay_stun_task_->cancel();
740 #endif
741 
742  job_queue_.reset();
743 
744  GuardThreadType guard_links(links_lock_);
745  if (link_) {
747  }
748  link_.reset();
749 }
750 
751 void
753 {
754  // No-op for rtps_udp: keep the link_ around until the transport is shut down.
755 }
756 
757 
758 
759 #ifdef OPENDDS_SECURITY
760 
761 const ACE_SOCK_Dgram&
763 {
764 #ifdef ACE_HAS_IPV6
765  if (fd == transport.ipv6_unicast_socket_.get_handle()) {
766  return transport.ipv6_unicast_socket_;
767  }
768 #endif
769  ACE_UNUSED_ARG(fd);
770  return transport.unicast_socket_;
771 }
772 
773 int
775 {
776  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
777 
778  struct iovec iov[1];
779  char buffer[0x10000];
780  iov[0].iov_base = buffer;
781  iov[0].iov_len = sizeof buffer;
782  ACE_INET_Addr remote_address;
783 
784  bool stop;
785  RtpsUdpReceiveStrategy::receive_bytes_helper(iov, 1, choose_recv_socket(fd), remote_address,
786 #ifdef OPENDDS_SECURITY
787  transport.get_ice_agent(), transport.get_ice_endpoint(),
788 #endif
789  transport, stop);
790 
791  return 0;
792 }
793 
794 namespace {
795  bool shouldWarn(int code) {
796  return code == EPERM || code == EACCES || code == EINTR || code == ENOBUFS || code == ENOMEM
797  || code == EADDRNOTAVAIL || code == ENETUNREACH;
798  }
799 
800  ssize_t
801  send_single_i(RtpsUdpInst_rch config, ACE_SOCK_Dgram& socket, const iovec iov[], int n, const ACE_INET_Addr& addr, bool& network_is_unreachable)
802  {
803  OPENDDS_ASSERT(addr != ACE_INET_Addr());
804 
805  if (!config) {
806  return -1;
807  }
808 
809 #ifdef OPENDDS_TESTING_FEATURES
810  ssize_t total_length;
811  if (config->should_drop(iov, n, total_length)) {
812  return total_length;
813  }
814 #endif
815 
816 #ifdef ACE_LACKS_SENDMSG
817  char buffer[UDP_MAX_MESSAGE_SIZE];
818  char *iter = buffer;
819  for (int i = 0; i < n; ++i) {
820  if (size_t(iter - buffer + iov[i].iov_len) > UDP_MAX_MESSAGE_SIZE) {
821  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsUdpTransport.cpp send_single_i() - "
822  "message too large at index %d size %d\n", i, iov[i].iov_len));
823  return -1;
824  }
825  std::memcpy(iter, iov[i].iov_base, iov[i].iov_len);
826  iter += iov[i].iov_len;
827  }
828  const ssize_t result = socket.send(buffer, iter - buffer, addr);
829 #else
830  const ssize_t result = socket.send(iov, n, addr);
831 #endif
832  if (result < 0) {
833  const int err = errno;
834  if (err != ENETUNREACH || !network_is_unreachable) {
835  errno = err;
836  const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
837  ACE_ERROR((prio, "(%P|%t) RtpsUdpTransport.cpp send_single_i() - "
838  "destination %C failed %p\n", DCPS::LogAddr(addr).c_str(), ACE_TEXT("send")));
839  }
840  if (err == ENETUNREACH) {
841  network_is_unreachable = true;
842  }
843  // Reset errno since the rest of framework expects it.
844  errno = err;
845  } else {
846  network_is_unreachable = false;
847  }
848  return result;
849  }
850 }
851 
852 ICE::AddressListType
854  ICE::AddressListType addresses;
855 
856  RtpsUdpInst_rch cfg = transport.config();
857 
858  if (!cfg) {
859  return addresses;
860  }
861 
862  ACE_INET_Addr addr = cfg->local_address().to_addr();
863  if (addr != ACE_INET_Addr()) {
864  if (addr.is_any()) {
865  ICE::AddressListType addrs;
867  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
868  if (pos->get_type() == AF_INET) {
869  pos->set_port_number(addr.get_port_number());
870  addresses.push_back(*pos);
871  }
872  }
873  } else {
874  addresses.push_back(addr);
875  }
876  }
877 
878 #ifdef ACE_HAS_IPV6
879  addr = cfg->ipv6_local_address().to_addr();
880  if (addr != ACE_INET_Addr()) {
881  if (addr.is_any()) {
882  ICE::AddressListType addrs;
884  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
885  if (pos->get_type() == AF_INET6) {
886  pos->set_port_number(addr.get_port_number());
887  addresses.push_back(*pos);
888  }
889  }
890  } else {
891  addresses.push_back(addr);
892  }
893  }
894 #endif
895 
896  return addresses;
897 }
898 
901 {
902 #ifdef ACE_HAS_IPV6
903  if (destination.get_type() == AF_INET6) {
904  return transport.link_ ? transport.link_->ipv6_unicast_socket() : transport.ipv6_unicast_socket_;
905  }
906 #endif
907 
908  ACE_UNUSED_ARG(destination);
909  return transport.link_ ? transport.link_->unicast_socket() : transport.unicast_socket_;
910 }
911 
912 void
914 {
915  ACE_SOCK_Dgram& socket = choose_send_socket(destination);
916 
917  ACE_Message_Block block(20 + message.length());
918  DCPS::Serializer serializer(&block, STUN::encoding);
919  const_cast<STUN::Message&>(message).block = &block;
920  serializer << message;
921 
922  iovec iov[MAX_SEND_BLOCKS];
923  const int num_blocks = RtpsUdpSendStrategy::mb_to_iov(block, iov);
924  const ssize_t result = send_single_i(transport.config(), socket, iov, num_blocks, destination, network_is_unreachable_);
925 
926  RtpsUdpInst_rch cfg = transport.config();
927  if (result < 0) {
928  if (cfg && cfg->count_messages()) {
929  ssize_t bytes = 0;
930  for (int i = 0; i < num_blocks; ++i) {
931  bytes += iov[i].iov_len;
932  }
933  const NetworkAddress da(destination);
934  const InternalMessageCountKey key(da, MCK_STUN, da == cfg->rtps_relay_address());
935  ACE_GUARD(ACE_Thread_Mutex, g, transport.transport_statistics_mutex_);
936  transport.transport_statistics_.message_count[key].send_fail(bytes);
937  }
938  if (!network_is_unreachable_) {
939  const ACE_Log_Priority prio = shouldWarn(errno) ? LM_WARNING : LM_ERROR;
940  ACE_ERROR((prio, "(%P|%t) RtpsUdpTransport::send() - "
941  "failed to send STUN message\n"));
942  }
943  } else if (cfg && cfg->count_messages()) {
944  const NetworkAddress da(destination);
945  const InternalMessageCountKey key(da, MCK_STUN, da == cfg->rtps_relay_address());
946  ACE_GUARD(ACE_Thread_Mutex, g, transport.transport_statistics_mutex_);
947  transport.transport_statistics_.message_count[key].send(result);
948  }
949 }
950 
953  RtpsUdpInst_rch cfg = transport.config();
954  return cfg ? cfg->stun_server_address().to_addr() : ACE_INET_Addr();
955 }
956 
957 void
959 {
960  if (DCPS::DCPS_debug_level > 3) {
961  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsUdpTransport::start_ice\n"));
962  }
963 
964  ice_agent_->add_endpoint(static_rchandle_cast<ICE::Endpoint>(ice_endpoint_));
965 
966  GuardThreadType guard_links(links_lock_);
967 
968  if (!link_) {
970  ri->execute_or_enqueue(make_rch<RegisterHandler>(unicast_socket_.get_handle(), ice_endpoint_.get(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
971 #ifdef ACE_HAS_IPV6
972  ri->execute_or_enqueue(make_rch<RegisterHandler>(ipv6_unicast_socket_.get_handle(), ice_endpoint_.get(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
973 #endif
974  }
975 }
976 
977 void
979 {
980  if (DCPS::DCPS_debug_level > 3) {
981  ACE_DEBUG((LM_INFO, "(%P|%t) RtpsUdpTransport::stop_ice\n"));
982  }
983 
984  GuardThreadType guard_links(links_lock_);
985 
986  if (!link_) {
988  ri->execute_or_enqueue(make_rch<RemoveHandler>(unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
989 #ifdef ACE_HAS_IPV6
990  ri->execute_or_enqueue(make_rch<RemoveHandler>(ipv6_unicast_socket_.get_handle(), static_cast<ACE_Reactor_Mask>(ACE_Event_Handler::READ_MASK)));
991 #endif
992  }
993 
994  ice_agent_->remove_endpoint(static_rchandle_cast<ICE::Endpoint>(ice_endpoint_));
995 }
996 
997 void
999 {
1000  GuardThreadType guard_links(links_lock_);
1001 
1002  RtpsUdpInst_rch cfg = config();
1003  if (!cfg) {
1004  return;
1005  }
1006 
1007  const ACE_INET_Addr relay_address = cfg->rtps_relay_address().to_addr();
1008 
1009  if ((cfg->use_rtps_relay() || cfg->rtps_relay_only()) &&
1010  relay_address != ACE_INET_Addr() &&
1013  ice_endpoint_->send(relay_address, relay_srsm_.message());
1014  {
1016  relay_stun_task_falloff_.advance(ICE::Configuration::instance()->server_reflexive_address_period());
1017  relay_stun_task_->schedule(relay_stun_task_falloff_.get());
1018  }
1019  }
1020 }
1021 
1022 void
1024 {
1025 #ifndef DDS_HAS_MINIMUM_BIT
1026  DCPS::ConnectionRecord connection_record;
1027  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
1028  connection_record.protocol = RTPS_RELAY_STUN_PROTOCOL;
1029  connection_record.latency = TimeDuration::zero_value.to_dds_duration();
1030 
1031  switch (sc) {
1034  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1035  connection_record.latency = relay_srsm_.latency().to_dds_duration();
1037  deferred_connection_records_.push_back(std::make_pair(true, connection_record));
1038  }
1039  break;
1042  // Lengthen to normal period.
1043  {
1045  relay_stun_task_falloff_.set(ICE::Configuration::instance()->server_reflexive_address_period());
1046  }
1047  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1048  connection_record.latency = relay_srsm_.latency().to_dds_duration();
1050  deferred_connection_records_.push_back(std::make_pair(true, connection_record));
1051  break;
1053  {
1054  connection_record.address = DCPS::LogAddr(relay_srsm_.unset_stun_server_address()).c_str();
1055  deferred_connection_records_.push_back(std::make_pair(false, connection_record));
1056  break;
1057  }
1058  }
1059 
1060  if (!bit_sub_) {
1061  return;
1062  }
1063 
1064  if (!deferred_connection_records_.empty()) {
1065  job_queue_->enqueue(DCPS::make_rch<WriteConnectionRecords>(bit_sub_, deferred_connection_records_));
1067  }
1068 
1069 #else
1070  ACE_UNUSED_ARG(sc);
1071 #endif
1072 }
1073 
1074 void
1076 {
1077 #ifndef DDS_HAS_MINIMUM_BIT
1078  relay_stun_task_->cancel();
1079 
1080  DCPS::ConnectionRecord connection_record;
1081  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
1082  connection_record.protocol = RTPS_RELAY_STUN_PROTOCOL;
1083  connection_record.latency = TimeDuration::zero_value.to_dds_duration();
1084 
1086  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
1087  deferred_connection_records_.push_back(std::make_pair(false, connection_record));
1088  }
1089 
1090  if (!bit_sub_) {
1091  return;
1092  }
1093 
1094  if (!deferred_connection_records_.empty()) {
1095  job_queue_->enqueue(DCPS::make_rch<WriteConnectionRecords>(bit_sub_, deferred_connection_records_));
1097  }
1098 
1100 #endif
1101 }
1102 
1103 #endif
1104 
1105 } // namespace DCPS
1106 } // namespace OpenDDS
1107 
virtual DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
virtual bool connection_info_i(TransportLocator &info, ConnectionInfoFlags flags) const
#define ACE_DEBUG(X)
virtual void use_rtps_relay_now(bool flag)
void local_crypto_handle(DDS::Security::ParticipantCryptoHandle pch)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_SOCK_Dgram & choose_send_socket(const ACE_INET_Addr &address) const
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool requires_inline_qos(const GUIDSeq_var &peers)
const InstanceHandle_t HANDLE_NIL
DDS::Security::ParticipantCryptoHandle local_crypto_handle() const
virtual void get_last_recv_locator(const GUID_t &, TransportLocator &)
BUILT_IN_TOPIC_KEY string protocol
bool configure_i(const RtpsUdpInst_rch &config)
ACE_INET_Addr to_addr() const
void unregister_for_reader(const GUID_t &writerid, const GUID_t &readerid)
void register_for_writer(const GUID_t &readerid, const GUID_t &writerid, const AddrSet &addresses, DiscoveryListener *listener)
if(!(yy_init))
unsigned long ACE_Reactor_Mask
LM_INFO
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
sequence< octet > key
RtpsUdpTransport(const RtpsUdpInst_rch &inst)
RcHandle< BitSubscriber > bit_sub_
int locator_to_address(ACE_INET_Addr &dest, const DCPS::Locator_t &locator, bool map)
void create_reactor_task(bool useAsyncSend=false, const OPENDDS_STRING &name="")
ACE_Thread_Mutex transport_statistics_mutex_
virtual void send(const ACE_INET_Addr &address, const STUN::Message &message)
virtual void stop_accepting_or_connecting(const TransportClient_wrch &client, const GUID_t &remote_id, bool disassociate, bool association_failed)
bool is_more_local(const NetworkAddress &current, const NetworkAddress &incoming)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
CommandPtr execute_or_enqueue(CommandPtr command)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
sequence< Locator_t > LocatorSeq
virtual AcceptConnectResult connect_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
bool use_datalink(const GUID_t &local_id, const GUID_t &remote_id, const TransportBLOB &remote_data, const TransportBLOB &discovery_locator, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, SequenceNumber max_sn, const TransportClient_rch &client)
RcHandle< Sporadic > relay_stun_task_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused as reported by Andy Elvey and Dan Kosecki *resynced with Christopher Diggins s branch as it exists in tree building code is back Christopher Diggins *resynced codebase with Chris s branch *removed tree building code
Definition: CHANGELOG.txt:8
ICE::ServerReflexiveStateMachine relay_srsm_
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
sequence< TransportLocator > TransportLocatorSeq
int ssize_t
virtual void unregister_for_writer(const GUID_t &, const GUID_t &, const GUID_t &)
int release(void)
int set_option(int level, int option, void *optval, int optlen) const
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
const MessageCountKind MCK_STUN
void enqueue(JobPtr job)
Definition: JobQueue.h:61
int get_type(void) const
static const time_t DEFAULT_HEARTBEAT_PERIOD_SEC
Definition: RtpsUdpInst.h:35
void register_for_reader(const GUID_t &writerid, const GUID_t &readerid, const AddrSet &addresses, DiscoveryListener *listener)
virtual void rtps_relay_only_now(bool flag)
#define EPERM
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
ACE_HANDLE socket(int protocol_family, int type, int proto)
bool is_any(void) const
BUILT_IN_TOPIC_KEY string address
void disassociated(const GUID_t &local, const GUID_t &remote)
static ssize_t receive_bytes_helper(iovec iov[], int n, const ACE_SOCK_Dgram &socket, ACE_INET_Addr &remote_address, DCPS::RcHandle< ICE::Agent > agent, DCPS::WeakRcHandle< ICE::Endpoint > endpoint, RtpsUdpTransport &tport, bool &stop)
DDS::Security::ParticipantCryptoHandle local_crypto_handle_
LM_DEBUG
ThreadLockType relay_stun_task_falloff_mutex_
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
Definition: Ice.cpp:128
ACE_CDR::ULong ULong
NetworkAddress get_last_recv_address(const GUID_t &remote_id)
virtual void release_datalink(DataLink *link)
void client_stop(const GUID_t &localId)
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
FibonacciSequence< TimeDuration > relay_stun_task_falloff_
void client_stop(const GUID_t &localId)
virtual void update_locators(const GUID_t &, const TransportLocatorSeq &)
sequence< TransportStatistics > TransportStatisticsSequence
ConnectionRecords deferred_connection_records_
virtual void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
RtpsUdpDataLink_rch make_datalink(const GuidPrefix_t &local_prefix)
void default_listener(const TransportReceiveListener_wrch &trl)
Definition: DataLink.inl:352
void set_handle(ACE_HANDLE handle)
int control(int cmd, void *) const
DDS::Duration_t to_dds_duration() const
void message_block_to_sequence(const ACE_Message_Block &mb_locator, T &out)
Definition: MessageUtils.h:101
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
RcHandle< ICE::Agent > ice_agent_
int get_local_addr(ACE_Addr &) const
#define AF_INET
#define PF_INET
void relay_stun_task(const MonotonicTimePoint &now)
ACE_Log_Priority
void add_pending_connection(const TransportClient_rch &client, DataLink_rch link)
ACE_HANDLE get_handle(void) const
LM_WARNING
ACE_UINT32 ULong
void append(TransportStatisticsSequence &seq, const InternalTransportStatistics &istats)
virtual void register_for_writer(const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
The End User API.
const STUN::Message & message() const
Definition: RTPS/ICE/Ice.h:237
bool associated(const GUID_t &local, const GUID_t &remote, bool local_reliable, bool remote_reliable, bool local_durable, bool remote_durable, const MonotonicTime_t &participant_discovered_at, ACE_CDR::ULong participant_flags, SequenceNumber max_sn, const TransportClient_rch &client, AddrSet &unicast_addresses, AddrSet &multicast_addresses, const NetworkAddress &last_addr_hint, bool requires_inline_qos)
const char *const name
Definition: debug.cpp:60
LockType pending_connections_lock_
Lock to protect the pending_connections_ data member.
virtual AcceptConnectResult accept_datalink(const RemoteTransport &remote, const ConnectionAttribs &attribs, const TransportClient_rch &client)
#define IPPROTO_IP
void update_locators(const GUID_t &remote_id, AddrSet &unicast_addresses, AddrSet &multicast_addresses, bool requires_inline_qos, bool add_ref)
ACE_TEXT("TCP_Factory")
static const TimeDuration zero_value
Definition: TimeDuration.h:31
ACE_UINT16 length() const
Definition: Stun.h:213
void unregister_for_writer(const GUID_t &readerid, const GUID_t &writerid)
bool open(const ACE_SOCK_Dgram &unicast_socket)
ReactorTask_rch reactor_task()
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
u_short get_port_number(void) const
bool is_multicast(void) const
RcHandle< RtpsUdpDataLink > RtpsUdpDataLink_rch
#define EADDRNOTAVAIL
Sequence number abstraction. Only allows positive 64 bit values.
DCPS::TimeDuration latency() const
Definition: RTPS/ICE/Ice.h:246
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
static Configuration * instance()
Definition: Ice.cpp:109
const DCPS::Encoding & get_locators_encoding()
virtual ICE::AddressListType host_addresses() const
#define VDBG_LVL(DBG_ARGS, LEVEL)
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE fd) const
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void append_transport_statistics(TransportStatisticsSequence &seq)
InternalTransportStatistics transport_statistics_
const ACE_INET_Addr & unset_stun_server_address() const
Definition: RTPS/ICE/Ice.h:238
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
#define ENETUNREACH
string transport_type
The transport type (e.g. tcp or udp)
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14
RcHandle< IceEndpoint > ice_endpoint_
const ReturnCode_t RETCODE_OK
const GuidPrefix_t GUIDPREFIX_UNKNOWN
Nil value for the GUID prefix (participant identifier).
Definition: GuidUtils.h:32
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
ACE_Reactor * reactor() const
#define ACE_ERROR_RETURN(X, Y)
const string RTPS_RELAY_STUN_PROTOCOL
DDS::ReturnCode_t blob_to_locators(const DCPS::TransportBLOB &blob, DCPS::LocatorSeq &locators, bool *requires_inline_qos, unsigned int *pBytesRead)
RcHandle< T > lock() const
Definition: RcObject.h:188
static const String ip(const ACE_INET_Addr &addr)
Definition: LogAddr.cpp:15
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define ENOBUFS
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
DDS::OctetSeq TransportBLOB
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
virtual void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
virtual ACE_INET_Addr stun_server_address() const
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
RtpsUdpInst_rch config() const
DCPS::RcHandle< ICE::Agent > get_ice_agent() const
OpenDDS_Dcps_Export void primitive_serialized_size_boolean(const Encoding &encoding, size_t &size, size_t count=1)
void get_connection_addrs(const TransportBLOB &data, AddrSet *uc_addrs, AddrSet *mc_addrs=0, bool *requires_inline_qos=0, unsigned int *blob_bytes_read=0) const
BUILT_IN_TOPIC_KEY DDS::OctetArray16 guid
virtual void use_ice_now(bool flag)
size_t ConnectionInfoFlags
TransportInst_rch config() const