OpenDDS  Snapshot(2023/04/28-20:55)
Spdp.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "Spdp.h"
7 
8 #include "MessageUtils.h"
9 #include "MessageTypes.h"
10 #include "ParameterListConverter.h"
11 #include "RtpsDiscovery.h"
12 
14 #include <dds/DCPS/GuidConverter.h>
15 #include <dds/DCPS/GuidUtils.h>
16 #include <dds/DCPS/Ice.h>
17 #include <dds/DCPS/LogAddr.h>
18 #include <dds/DCPS/Logging.h>
19 #include <dds/DCPS/Qos_Helper.h>
22 #ifdef OPENDDS_SECURITY
24 #endif
25 
26 #include <dds/DdsDcpsGuidC.h>
27 
28 #include <ace/Reactor.h>
29 #include <ace/OS_NS_sys_socket.h> // For setsockopt()
30 #include <ace/OS_NS_strings.h>
31 
32 #include <cstring>
33 #include <stdexcept>
34 
36 
37 namespace OpenDDS {
38 
39 namespace RTPS {
40 using DCPS::GUID_t;
42 using DCPS::TimeDuration;
43 using DCPS::Serializer;
44 using DCPS::Encoding;
45 using DCPS::ENDIAN_BIG;
47 using DCPS::LogLevel;
48 using DCPS::log_level;
49 using DCPS::LogAddr;
50 
51 namespace {
52  const Encoding encoding_plain_big(Encoding::KIND_XCDR1, ENDIAN_BIG);
53  const Encoding encoding_plain_native(Encoding::KIND_XCDR1);
54 
55  bool disposed(const ParameterList& inlineQos)
56  {
57  for (CORBA::ULong i = 0; i < inlineQos.length(); ++i) {
58  if (inlineQos[i]._d() == PID_STATUS_INFO) {
59  return inlineQos[i].status_info().value[3] & 1;
60  }
61  }
62  return false;
63  }
64 
65 #ifndef DDS_HAS_MINIMUM_BIT
66  DCPS::ParticipantLocation compute_location_mask(const ACE_INET_Addr& address, bool from_relay)
67  {
68  if (address.get_type() == AF_INET6) {
69  return from_relay ? DCPS::LOCATION_RELAY6 : DCPS::LOCATION_LOCAL6;
70  }
71  return from_relay ? DCPS::LOCATION_RELAY : DCPS::LOCATION_LOCAL;
72  }
73 #endif
74 
75 #ifdef OPENDDS_SECURITY
76 
77 #ifndef DDS_HAS_MINIMUM_BIT
78  DCPS::ParticipantLocation compute_ice_location_mask(const ACE_INET_Addr& address)
79  {
80  if (address.get_type() == AF_INET6) {
81  return DCPS::LOCATION_ICE6;
82  }
83  return DCPS::LOCATION_ICE;
84  }
85 #endif
86 
88  return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
89  }
90 
92  return rhs.name == lhs.name && rhs.value == lhs.value && rhs.propagate == lhs.propagate;
93  }
94 
96  bool result = (rhs.length() == lhs.length());
97  for (unsigned int i = 0; result && i < rhs.length(); ++i) {
98  result = (rhs[i] == lhs[i]);
99  }
100  return result;
101  }
102 
104  bool result = (rhs.length() == lhs.length());
105  for (unsigned int i = 0; result && i < rhs.length(); ++i) {
106  result = (rhs[i] == lhs[i]);
107  }
108  return result;
109  }
110 
111  bool operator==(const DDS::Security::DataHolder& rhs, const DDS::Security::DataHolder& lhs) {
112  return rhs.class_id == lhs.class_id && rhs.properties == lhs.properties && rhs.binary_properties == lhs.binary_properties;
113  }
114 
115  void init_participant_sec_attributes(DDS::Security::ParticipantSecurityAttributes& attr)
116  {
118  attr.is_access_protected = false;
119  attr.is_rtps_protected = false;
120  attr.is_discovery_protected = false;
121  attr.is_liveliness_protected = false;
123  attr.ac_endpoint_properties.length(0);
124  }
125 
126 #endif
127 
128  inline bool prop_to_bool(const DDS::Property_t& prop)
129  {
130  const char* const value = prop.value.in();
131  return std::strcmp(value, "0") && ACE_OS::strcasecmp(value, "false");
132  }
133 }
134 
135 void Spdp::init(DDS::DomainId_t /*domain*/,
136  DCPS::GUID_t& guid,
137  const DDS::DomainParticipantQos& qos,
139 {
140  type_lookup_service_ = tls;
141 
142  bool enable_endpoint_announcements = true;
143  bool enable_type_lookup_service = config_->use_xtypes();
144 
145  const DDS::PropertySeq& properties = qos.property.value;
146  for (unsigned int idx = 0; idx != properties.length(); ++idx) {
147  const DDS::Property_t& prop = properties[idx];
148  if (std::strcmp(RTPS_DISCOVERY_ENDPOINT_ANNOUNCEMENTS, prop.name.in()) == 0) {
149  enable_endpoint_announcements = prop_to_bool(prop);
150  } else if (std::strcmp(RTPS_DISCOVERY_TYPE_LOOKUP_SERVICE, prop.name.in()) == 0) {
151  enable_type_lookup_service = prop_to_bool(prop);
152  } else if (std::strcmp(RTPS_RELAY_APPLICATION_PARTICIPANT, prop.name.in()) == 0) {
153  is_application_participant_ = prop_to_bool(prop);
154  } else if (std::strcmp(RTPS_REFLECT_HEARTBEAT_COUNT, prop.name.in()) == 0) {
155  const CORBA::ULong old_flags = config_->participant_flags();
156  const CORBA::ULong new_flags = prop_to_bool(prop) ? (old_flags | PFLAGS_REFLECT_HEARTBEAT_COUNT) : (old_flags & ~PFLAGS_REFLECT_HEARTBEAT_COUNT);
157  config_->participant_flags(new_flags);
158  }
159  }
160 
167 
168  if (enable_endpoint_announcements) {
173  }
174 
175  if (enable_type_lookup_service) {
181  }
182 
183 #ifdef OPENDDS_SECURITY
184  if (is_security_enabled()) {
185  using namespace DDS::Security;
186 
197 
198  if (enable_endpoint_announcements) {
203  }
204 
205  if (enable_type_lookup_service) {
211  }
212  }
213 #endif
214 
215  guid = guid_; // may have changed in SpdpTransport constructor
216  sedp_->ignore(guid);
217 }
218 
220  GUID_t& guid,
221  const DDS::DomainParticipantQos& qos,
222  RtpsDiscovery* disco,
224  : qos_(qos)
225  , disco_(disco)
226  , config_(disco_->config())
227  , quick_resend_ratio_(disco_->config()->quick_resend_ratio())
228  , min_resend_delay_(disco_->config()->min_resend_delay())
229  , lease_duration_(disco_->config()->lease_duration())
230  , lease_extension_(disco_->config()->lease_extension())
231  , domain_(domain)
232  , guid_(guid)
233  , participant_discovered_at_(MonotonicTimePoint::now().to_monotonic_time())
235  , tport_(DCPS::make_rch<SpdpTransport>(rchandle_from(this)))
236  , initialized_flag_(false)
237  , eh_shutdown_(false)
239  , shutdown_flag_(false)
241  , sedp_(DCPS::make_rch<Sedp>(guid_, DCPS::ref(*this), DCPS::ref(lock_)))
242 #ifdef OPENDDS_SECURITY
244  , security_config_()
245  , security_enabled_(false)
249  , ice_agent_(ICE::Agent::instance())
251 #endif
252 {
254 
255  init(domain, guid, qos, tls);
256 
257 #ifdef OPENDDS_SECURITY
258  init_participant_sec_attributes(participant_sec_attr_);
259 #endif
260 
261 }
262 
263 #ifdef OPENDDS_SECURITY
265  const DCPS::GUID_t& guid,
266  const DDS::DomainParticipantQos& qos,
267  RtpsDiscovery* disco,
269  DDS::Security::IdentityHandle identity_handle,
272  : qos_(qos)
273  , disco_(disco)
274  , config_(disco_->config())
275  , quick_resend_ratio_(disco_->config()->quick_resend_ratio())
276  , min_resend_delay_(disco_->config()->min_resend_delay())
277  , lease_duration_(disco_->config()->lease_duration())
278  , lease_extension_(disco_->config()->lease_extension())
279  , domain_(domain)
280  , guid_(guid)
281  , participant_discovered_at_(MonotonicTimePoint::now().to_monotonic_time())
283  , tport_(DCPS::make_rch<SpdpTransport>(rchandle_from(this)))
284  , initialized_flag_(false)
285  , eh_shutdown_(false)
287  , shutdown_flag_(false)
289  , sedp_(DCPS::make_rch<Sedp>(guid_, DCPS::ref(*this), DCPS::ref(lock_)))
291  , security_config_(Security::SecurityRegistry::instance()->default_config())
292  , security_enabled_(security_config_->get_authentication() && security_config_->get_access_control() &&
293  security_config_->get_crypto_key_factory() && security_config_->get_crypto_key_exchange())
294  , identity_handle_(identity_handle)
295  , permissions_handle_(perm_handle)
296  , crypto_handle_(crypto_handle)
297  , ice_agent_(ICE::Agent::instance())
299 {
301 
302  init(domain, guid_, qos, tls);
303 
304  DDS::Security::Authentication_var auth = security_config_->get_authentication();
305  DDS::Security::AccessControl_var access = security_config_->get_access_control();
306 
307  DDS::Security::SecurityException se = {"", 0, 0};
308 
309  if (!auth->get_identity_token(identity_token_, identity_handle_, se)) {
310  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
311  ACE_TEXT("Spdp::Spdp() - ")
312  ACE_TEXT("unable to get identity token. Security Exception[%d.%d]: %C\n"),
313  se.code, se.minor_code, se.message.in()));
314  throw std::runtime_error("unable to get identity token");
315  }
316  if (!auth->get_identity_status_token(identity_status_token_, identity_handle_, se)) {
317  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
318  ACE_TEXT("Spdp::Spdp() - ")
319  ACE_TEXT("unable to get identity status token. Security Exception[%d.%d]: %C\n"),
320  se.code, se.minor_code, se.message.in()));
321  throw std::runtime_error("unable to get identity status token");
322  }
323  if (!access->get_permissions_token(permissions_token_, permissions_handle_, se)) {
324  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
325  ACE_TEXT("Spdp::Spdp() - ")
326  ACE_TEXT("unable to get permissions handle. Security Exception[%d.%d]: %C\n"),
327  se.code, se.minor_code, se.message.in()));
328  throw std::runtime_error("unable to get permissions token");
329  }
330  if (!access->get_permissions_credential_token(permissions_credential_token_, permissions_handle_, se)) {
331  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
332  ACE_TEXT("Spdp::Spdp() - ")
333  ACE_TEXT("unable to get permissions credential handle. Security Exception[%d.%d]: %C\n"),
334  se.code, se.minor_code, se.message.in()));
335  throw std::runtime_error("unable to get permissions credential token");
336  }
337 
338  if (!auth->set_permissions_credential_and_token(identity_handle_, permissions_credential_token_, permissions_token_, se)) {
339  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
340  ACE_TEXT("Spdp::Spdp() - ")
341  ACE_TEXT("unable to set permissions credential and token. Security Exception[%d.%d]: %C\n"),
342  se.code, se.minor_code, se.message.in()));
343  throw std::runtime_error("unable to set permissions credential and token");
344  }
345 
346  init_participant_sec_attributes(participant_sec_attr_);
347 
348  if (!access->get_participant_sec_attributes(permissions_handle_, participant_sec_attr_, se)) {
349  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
350  ACE_TEXT("Spdp::Spdp() - ")
351  ACE_TEXT("failed to retrieve participant security attributes. Security Exception[%d.%d]: %C\n"),
352  se.code, se.minor_code, se.message.in()));
353  throw std::runtime_error("unable to retrieve participant security attributes");
354  }
355 
356  sedp_->init_security(identity_handle, perm_handle, crypto_handle);
357 }
358 #endif
359 
360 void
362 {
363  {
365  shutdown_flag_ = true;
366  if (DCPS::DCPS_debug_level > 3) {
368  ACE_TEXT("(%P|%t) Spdp::~Spdp ")
369  ACE_TEXT("remove discovered participants\n")));
370  }
371 
372 #ifdef OPENDDS_SECURITY
373  try {
375  } catch (const CORBA::Exception& e) {
376  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::~Spdp() - from ")
377  ACE_TEXT("write_secure_disposes: %C\n"), e._info().c_str()));
378  }
379 #endif
380 
381  for (DiscoveredParticipantIter part = participants_.begin(); part != participants_.end(); ++part) {
382 #ifdef OPENDDS_SECURITY
383  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
384  if (sedp_endpoint) {
385  stop_ice(sedp_endpoint, part->first, part->second.pdata_.participantProxy.availableBuiltinEndpoints,
386  part->second.pdata_.participantProxy.availableExtendedBuiltinEndpoints);
387  }
388  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = tport_->get_ice_endpoint();
389  if (spdp_endpoint) {
390  ice_agent_->stop_ice(spdp_endpoint, guid_, part->first);
391  }
393 #endif
395  }
396  }
397 
398 #ifdef OPENDDS_SECURITY
399  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
400  if (sedp_endpoint) {
402  ice_agent_->remove_local_agent_info_listener(sedp_endpoint, l);
403  }
404 #endif
405 
406  // ensure sedp's task queue is drained before data members are being
407  // deleted
408  sedp_->shutdown();
409 
410  // release lock for reset of event handler, which may delete transport
411  tport_->close(sedp_->reactor_task());
412  tport_.reset();
413  {
415  DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
416  while (!eh_shutdown_) {
417  shutdown_cond_.wait(thread_status_manager);
418  }
419  }
420 }
421 
423 {
424 }
425 
426 #ifdef OPENDDS_SECURITY
427 void
429 {
431  return;
432  }
433 
436 
437  sedp_->write_dcps_participant_secure(pdata, GUID_UNKNOWN);
438 }
439 
440 void
442 {
443  sedp_->write_dcps_participant_dispose(guid_);
444 }
445 #endif
446 
447 namespace {
449  {
450 #ifdef OPENDDS_SECURITY
451  return pdata.ddsParticipantDataSecure.base.base;
452 #else
453  return pdata.ddsParticipantData;
454 #endif
455  }
456 }
457 
458 #ifndef DDS_HAS_MINIMUM_BIT
459 void
462  const ACE_INET_Addr& from,
463  const char* reason)
464 {
465  // We have the global lock.
466  iter->second.location_updates_.push_back(DiscoveredParticipant::LocationUpdate(mask, from, DCPS::SystemTimePoint::now()));
467 
468  if (DCPS::log_bits) {
469  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::enqueue_location_update_i: %@ for %C size=%B reason=%C\n", this, LogGuid(iter->first).c_str(), iter->second.location_updates_.size(), reason));
470  }
471 
472 }
473 
474 void Spdp::process_location_updates_i(const DiscoveredParticipantIter& iter, const char* reason, bool force_publish)
475 {
476  // We have the global lock.
477 
478  if (iter == participants_.end()) {
479  if (DCPS::log_bits) {
480  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ iterator invalid, returning\n", this));
481  }
482  return;
483  }
484 
485  if (iter->second.bit_ih_ == DDS::HANDLE_NIL) {
486  // Do not process updates until the participant exists in the built-in topics.
487  if (DCPS::log_bits) {
488  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ %C does not exist in participant bit, returning\n", this, LogGuid(iter->first).c_str()));
489  }
490  return;
491  }
492 
493  DiscoveredParticipant::LocationUpdateList location_updates;
494  std::swap(iter->second.location_updates_, location_updates);
495 
496  if (DCPS::log_bits) {
497  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ %C has %B location update(s) force_publish=%d reason=%C\n", this, LogGuid(iter->first).c_str(), location_updates.size(), force_publish, reason));
498  }
499 
500  bool published = false;
501  for (DiscoveredParticipant::LocationUpdateList::const_iterator pos = location_updates.begin(),
502  limit = location_updates.end(); iter != participants_.end() && pos != limit; ++pos) {
503  DCPS::ParticipantLocationBuiltinTopicData& location_data = iter->second.location_data_;
504 
505  OPENDDS_STRING addr = "";
506  const DCPS::ParticipantLocation old_mask = location_data.location;
507  if (pos->from_ != ACE_INET_Addr()) {
508  location_data.location |= pos->mask_;
509  addr = DCPS::LogAddr(pos->from_).str();
510  } else {
511  location_data.location &= ~(pos->mask_);
512  }
513 
514  location_data.change_mask = pos->mask_;
515 
516  bool address_change = false;
517  switch (pos->mask_) {
519  address_change = addr.compare(location_data.local_addr.in()) != 0;
520  location_data.local_addr = addr.c_str();
521  location_data.local_timestamp = pos->timestamp_.to_dds_time();
522  break;
523  case DCPS::LOCATION_ICE:
524  address_change = addr.compare(location_data.ice_addr.in()) != 0;
525  location_data.ice_addr = addr.c_str();
526  location_data.ice_timestamp = pos->timestamp_.to_dds_time();
527  break;
529  address_change = addr.compare(location_data.relay_addr.in()) != 0;
530  location_data.relay_addr = addr.c_str();
531  location_data.relay_timestamp = pos->timestamp_.to_dds_time();
532  break;
534  address_change = addr.compare(location_data.local6_addr.in()) != 0;
535  location_data.local6_addr = addr.c_str();
536  location_data.local6_timestamp = pos->timestamp_.to_dds_time();
537  break;
538  case DCPS::LOCATION_ICE6:
539  address_change = addr.compare(location_data.ice6_addr.in()) != 0;
540  location_data.ice6_addr = addr.c_str();
541  location_data.ice6_timestamp = pos->timestamp_.to_dds_time();
542  break;
544  address_change = addr.compare(location_data.relay6_addr.in()) != 0;
545  location_data.relay6_addr = addr.c_str();
546  location_data.relay6_timestamp = pos->timestamp_.to_dds_time();
547  break;
548  }
549 
550  const DDS::Time_t expr =
551  (
552  pos->timestamp_ - rtps_duration_to_time_duration(iter->second.pdata_.leaseDuration,
553  iter->second.pdata_.participantProxy.protocolVersion,
554  iter->second.pdata_.participantProxy.vendorId)
555  ).to_dds_time();
556  if ((location_data.location & DCPS::LOCATION_LOCAL) && DCPS::operator<(location_data.local_timestamp, expr)) {
557  location_data.location &= ~(DCPS::LOCATION_LOCAL);
558  location_data.change_mask |= DCPS::LOCATION_LOCAL;
559  location_data.local_timestamp = pos->timestamp_.to_dds_time();
560  }
561  if ((location_data.location & DCPS::LOCATION_RELAY) && DCPS::operator<(location_data.relay_timestamp, expr)) {
562  location_data.location &= ~(DCPS::LOCATION_RELAY);
563  location_data.change_mask |= DCPS::LOCATION_RELAY;
564  location_data.relay_timestamp = pos->timestamp_.to_dds_time();
565  }
566  if ((location_data.location & DCPS::LOCATION_LOCAL6) && DCPS::operator<(location_data.local6_timestamp, expr)) {
567  location_data.location &= ~(DCPS::LOCATION_LOCAL6);
568  location_data.change_mask |= DCPS::LOCATION_LOCAL6;
569  location_data.local6_timestamp = pos->timestamp_.to_dds_time();
570  }
571  if ((location_data.location & DCPS::LOCATION_RELAY6) && DCPS::operator<(location_data.relay6_timestamp, expr)) {
572  location_data.location &= ~(DCPS::LOCATION_RELAY6);
573  location_data.change_mask |= DCPS::LOCATION_RELAY6;
574  location_data.relay6_timestamp = pos->timestamp_.to_dds_time();
575  }
576 
577  if (old_mask != location_data.location || address_change) {
578  if (DCPS::log_bits) {
579  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ publishing %C update\n", this, LogGuid(iter->first).c_str()));
580  }
582  published = true;
583  }
584  }
585 
586  if (force_publish && !published) {
587  if (DCPS::log_bits) {
588  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ publishing %C forced\n", this, LogGuid(iter->first).c_str()));
589  }
591  published = true;
592  }
593 
594  if (!published && DCPS::log_bits) {
595  if (DCPS::log_bits) {
596  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::process_location_updates_i: %@ not published\n", this, LogGuid(iter->first).c_str()));
597  }
598  }
599 }
600 
601 void
603 {
604  iter->second.location_ih_ = bit_subscriber_->add_participant_location(iter->second.location_data_, DDS::NEW_VIEW_STATE);
605  if (DCPS::log_bits) {
606  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: Spdp::publish_location_update_i: %@ participant %C has participant location handle %d\n", this, LogGuid(iter->first).c_str(), iter->second.location_ih_));
607  }
608 }
609 #endif
610 
613 {
614  return tport_->ice_endpoint_added_ ? tport_->get_ice_endpoint() : DCPS::WeakRcHandle<ICE::Endpoint>();
615 }
616 
617 bool cmp_ip4(const ACE_INET_Addr& a, const DCPS::Locator_t& locator)
618 {
619  struct sockaddr_in* sa = static_cast<struct sockaddr_in*>(a.get_addr());
620  if (sa->sin_family == AF_INET && locator.kind == LOCATOR_KIND_UDPv4) {
621  const unsigned char* ip = reinterpret_cast<const unsigned char*>(&sa->sin_addr);
622  const unsigned char* la = reinterpret_cast<const unsigned char*>(locator.address) + 12;
623  return ACE_OS::memcmp(ip, la, 4) == 0;
624  }
625  return false;
626 }
627 
628 #if defined (ACE_HAS_IPV6)
629 bool cmp_ip6(const ACE_INET_Addr& a, const DCPS::Locator_t& locator)
630 {
631  struct sockaddr_in6* in6 = static_cast<struct sockaddr_in6*>(a.get_addr());
632  if (in6->sin6_family == AF_INET6 && locator.kind == LOCATOR_KIND_UDPv6) {
633  const unsigned char* ip = reinterpret_cast<const unsigned char*>(&in6->sin6_addr);
634  const unsigned char* la = reinterpret_cast<const unsigned char*>(locator.address);
635  return ACE_OS::memcmp(ip, la, 16) == 0;
636  }
637  return false;
638 }
639 #endif // ACE_HAS_IPV6
640 
641 bool is_ip_equal(const ACE_INET_Addr& a, const DCPS::Locator_t& locator)
642 {
643 #ifdef ACE_HAS_IPV6
644  if (a.get_type() == AF_INET6) {
645  return cmp_ip6(a, locator);
646  }
647 #endif
648  return cmp_ip4(a, locator);
649 }
650 
652  const unsigned char* a = reinterpret_cast<const unsigned char*>(o.address);
653  ACE_INET_Addr addr;
654  bool b = locator_to_address(addr, o, false) == 0;
655  ACE_DEBUG((LM_DEBUG, ACE_TEXT("locator%d(kind:%d)[%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d.%d] locator_to_address:%C\n"),
656  i, o.kind, a[0], a[1], a[2], a[3], a[4], a[5], a[6], a[7], a[8], a[9], a[10], a[11], a[12], a[13], a[14], a[15],
657  (b ? DCPS::LogAddr(addr).c_str() : "failed")));
658 }
659 
660 bool ip_in_locator_list(const ACE_INET_Addr& from, const DCPS::LocatorSeq& locators)
661 {
662  if (DCPS::DCPS_debug_level >= 8) {
663  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ip_in_locator_list - from (type %d): %C\n"), from.get_type(), DCPS::LogAddr(from).c_str()));
664  }
665  for (CORBA::ULong i = 0; i < locators.length(); ++i) {
666  if (DCPS::DCPS_debug_level >= 8) {
667  print_locator(i, locators[i]);
668  }
669  if (is_ip_equal(from, locators[i])) {
670  return true;
671  }
672  }
673  return false;
674 }
675 
676 #ifdef OPENDDS_SECURITY
677 bool ip_in_AgentInfo(const ACE_INET_Addr& from, const ParameterList& plist)
678 {
679  bool found = false;
680  ICE::AgentInfoMap ai_map;
681  if (!ParameterListConverter::from_param_list(plist, ai_map)) {
682  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ip_in_AgentInfo - failed to convert ParameterList to AgentInfoMap\n")));
683  return found;
684  }
685  ICE::AgentInfoMap::const_iterator sedp_i = ai_map.find(SEDP_AGENT_INFO_KEY);
686  if (sedp_i != ai_map.end()) {
687  const ICE::AgentInfo::CandidatesType& cs = sedp_i->second.candidates;
688  for (ICE::AgentInfo::const_iterator i = cs.begin(); i != cs.end(); ++i) {
689  if (from.is_ip_equal(i->address)) {
690  found = true;
691  break;
692  }
693  }
694  }
695  ICE::AgentInfoMap::const_iterator spdp_i = ai_map.find(SPDP_AGENT_INFO_KEY);
696  if (!found && spdp_i != ai_map.end()) {
697  const ICE::AgentInfo::CandidatesType& cs = spdp_i->second.candidates;
698  for (ICE::AgentInfo::const_iterator i = cs.begin(); i != cs.end(); ++i) {
699  if (from.is_ip_equal(i->address)) {
700  found = true;
701  break;
702  }
703  }
704  }
705  return found;
706 }
707 #endif
708 
709 void
711  const ParticipantData_t& cpdata,
712  const DCPS::MonotonicTimePoint& now,
713  const DCPS::SequenceNumber& seq,
714  const ACE_INET_Addr& from,
715  bool from_sedp)
716 {
717  // Make a (non-const) copy so we can tweak values below
718  ParticipantData_t pdata(cpdata);
719 
721 
723 
725  return;
726  }
727 
728  if (sedp_->ignoring(guid)) {
729  // Ignore, this is our domain participant or one that the user has
730  // asked us to ignore.
731  return;
732  }
733 
734  const bool relay_in_use = (config_->rtps_relay_only() || config_->use_rtps_relay());
735  const bool from_relay = relay_in_use && (from == config_->spdp_rtps_relay_address());
736 
737 #ifndef DDS_HAS_MINIMUM_BIT
738  const DCPS::ParticipantLocation location_mask = compute_location_mask(from, from_relay);
739 #endif
740 
741  // Don't trust SPDP for the RtpsRelay application participant.
742  // Otherwise, anyone can reset the application participant.
743 #ifdef OPENDDS_SECURITY
744  if (is_security_enabled() && !from_sedp) {
746  }
747 #endif
748 
749  // Find the participant - iterator valid only as long as we hold the lock
750  DiscoveredParticipantIter iter = participants_.find(guid);
751 
752  if (iter == participants_.end()) {
753  // Trying to delete something that doesn't exist is a NOOP
755  return;
756  }
757 
758 #ifdef OPENDDS_SECURITY
759  if (config_->max_participants_in_authentication() &&
760  n_participants_in_authentication_ >= config_->max_participants_in_authentication()) {
761  if (DCPS::security_debug.auth_debug) {
763  "(%P|%t) {auth_debug} DEBUG: Spdp::handle_participant_data - participants_in_authentication: %B >= max: %B\n",
764  n_participants_in_authentication_, config_->max_participants_in_authentication()));
765  }
766  return;
767  }
768 #endif
769 
770  partBitData(pdata).key = guid_to_bit_key(guid);
771 
772  TimeDuration effective_lease(pdata.leaseDuration.seconds);
773 
776  ACE_TEXT("(%P|%t) Spdp::handle_participant_data - %C discovered %C lease %C from %C (%B)\n"),
777  DCPS::LogGuid(guid_).c_str(), DCPS::LogGuid(guid).c_str(),
778  effective_lease.str(0).c_str(), DCPS::LogAddr(from).c_str(),
779  participants_.size()));
780  }
781 
782  if (!from_sedp) {
783 #ifdef OPENDDS_SECURITY
784  if (is_security_enabled()) {
785  effective_lease = config_->security_unsecure_lease_duration();
786  } else {
787 #endif
788  const TimeDuration maxLeaseDuration = config_->max_lease_duration();
789  if (maxLeaseDuration && effective_lease > maxLeaseDuration) {
790  if (DCPS::DCPS_debug_level >= 2) {
792  ACE_TEXT("(%P|%t) Spdp::handle_participant_data - overwriting %C lease %C from %C with %C\n"),
793  DCPS::LogGuid(guid).c_str(), effective_lease.str(0).c_str(),
794  DCPS::LogAddr(from).c_str(), maxLeaseDuration.str(0).c_str()));
795  }
796  effective_lease = maxLeaseDuration;
797  }
798 #ifdef OPENDDS_SECURITY
799  }
800 #endif
801  }
802 
803  pdata.leaseDuration.seconds = static_cast<ACE_CDR::Long>(effective_lease.value().sec());
804 
805  if (tport_->directed_send_task_) {
806  if (tport_->directed_guids_.empty()) {
807  tport_->directed_send_task_->schedule(TimeDuration::zero_value);
808  }
809  tport_->directed_guids_.push_back(guid);
810  }
811 
812  // add a new participant
813 
814 #ifdef OPENDDS_SECURITY
815  std::pair<DiscoveredParticipantIter, bool> p = participants_.insert(std::make_pair(guid, DiscoveredParticipant(pdata, seq, config_->auth_resend_period())));
817  if (DCPS::security_debug.auth_debug) {
819  "(%P|%t) {auth_debug} DEBUG: Spdp::handle_participant_data() %B participants in authentication\n",
821  }
822 #else
823  std::pair<DiscoveredParticipantIter, bool> p = participants_.insert(std::make_pair(guid, DiscoveredParticipant(pdata, seq, TimeDuration())));
824 #endif
825  iter = p.first;
826  iter->second.discovered_at_ = now;
827  update_lease_expiration_i(iter, now);
829 
830  if (!from_relay && from != ACE_INET_Addr()) {
831  iter->second.last_recv_address_ = from;
832  }
833 
835  log_progress("participant discovery", guid_, guid, iter->second.discovered_at_.to_monotonic_time());
836  }
837 
838 #ifndef DDS_HAS_MINIMUM_BIT
839  if (!from_sedp) {
840  enqueue_location_update_i(iter, location_mask, from, "new participant");
841  }
842 #endif
843 
844  sedp_->associate(iter->second
845 #ifdef OPENDDS_SECURITY
847 #endif
848  );
849 
850  // Since we've just seen a new participant, let's send out our
851  // own announcement, so they don't have to wait.
852  if (from != ACE_INET_Addr()) {
853  if (from_relay) {
854  tport_->write_i(guid, iter->second.last_recv_address_, SpdpTransport::SEND_RELAY);
855  } else {
856  tport_->shorten_local_sender_delay_i();
857  }
858  }
859 
860 #ifdef OPENDDS_SECURITY
861  if (is_security_enabled()) {
862  if (!iter->second.has_security_data()) {
864  if (DCPS::security_debug.auth_debug) {
865  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::handle_participant_data - ")
866  ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
867  DCPS::LogGuid(guid).c_str()));
868  }
869  // FUTURE: This is probably not a good idea since it will just get rediscovered.
871  participants_.erase(iter);
872  iter = participants_.end();
873  } else { // allow_unauthenticated_participants == true
875  match_unauthenticated(iter);
876  }
877  } else {
878  iter->second.identity_token_ = pdata.ddsParticipantDataSecure.base.identity_token;
879  iter->second.permissions_token_ = pdata.ddsParticipantDataSecure.base.permissions_token;
880  iter->second.property_qos_ = pdata.ddsParticipantDataSecure.base.property;
881  iter->second.security_info_ = pdata.ddsParticipantDataSecure.base.security_info;
882  iter->second.extended_builtin_endpoints_ = pdata.ddsParticipantDataSecure.base.extended_builtin_endpoints;
883 
884  // The remote needs to see our SPDP before attempting authentication.
885  tport_->write_i(guid, iter->second.last_recv_address_, from_relay ? SpdpTransport::SEND_RELAY : SpdpTransport::SEND_DIRECT);
886 
887  attempt_authentication(iter, true);
888 
889  if (iter->second.auth_state_ == AUTH_STATE_UNAUTHENTICATED) {
891  if (DCPS::security_debug.auth_debug) {
892  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::handle_participant_data - ")
893  ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
894  DCPS::LogGuid(guid).c_str()));
895  }
897  participants_.erase(iter);
898  iter = participants_.end();
899  } else { // allow_unauthenticated_participants == true
901  match_unauthenticated(iter);
902  }
903  } else if (iter->second.auth_state_ == AUTH_STATE_AUTHENTICATED) {
904  if (!match_authenticated(guid, iter)) {
906  participants_.erase(iter);
907  iter = participants_.end();
908  }
909  }
910  // otherwise just return, since we're waiting for input to finish authentication
911  }
912  } else {
914  match_unauthenticated(iter);
915  }
916 #else
917  match_unauthenticated(iter);
918 #endif
919 
920  } else { // Existing Participant
921  if (from_sedp && DCPS::transport_debug.log_progress) {
922  log_progress("secure participant discovery", guid_, guid, iter->second.discovered_at_.to_monotonic_time());
923  }
924 
925 #ifndef DDS_HAS_MINIMUM_BIT
926  if (!from_sedp) {
927  enqueue_location_update_i(iter, location_mask, from, "existing participant");
928  }
929 #endif
930 #ifdef OPENDDS_SECURITY
931  // Non-secure updates for authenticated participants are used for liveliness but
932  // are otherwise ignored. Non-secure dispose messages are ignored completely.
933  if (is_security_enabled() && iter->second.auth_state_ == AUTH_STATE_AUTHENTICATED && !from_sedp) {
934  update_lease_expiration_i(iter, now);
935  if (!from_relay && from != ACE_INET_Addr()) {
936  iter->second.last_recv_address_ = from;
937  }
938 #ifndef DDS_HAS_MINIMUM_BIT
939  process_location_updates_i(iter, "non-secure liveliness");
940 #endif
941  return;
942  }
943 #endif
944 
946 #ifdef OPENDDS_SECURITY
947  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
948  if (sedp_endpoint) {
949  stop_ice(sedp_endpoint, iter->first, iter->second.pdata_.participantProxy.availableBuiltinEndpoints,
950  iter->second.pdata_.participantProxy.availableExtendedBuiltinEndpoints);
951  }
952  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = tport_->get_ice_endpoint();
953  if (spdp_endpoint) {
954  ice_agent_->stop_ice(spdp_endpoint, guid_, iter->first);
955  }
957 #endif
958 #ifndef DDS_HAS_MINIMUM_BIT
959  process_location_updates_i(iter, "dispose/unregister");
960 #endif
961  if (iter != participants_.end()) {
963  participants_.erase(iter);
964  }
965  return;
966  }
967 
968  // Check if sequence numbers are increasing
969  if (validateSequenceNumber(now, seq, iter)) {
970  // update an existing participant
971  DDS::ParticipantBuiltinTopicData& pdataBit = partBitData(pdata);
972  DDS::ParticipantBuiltinTopicData& discoveredBit = partBitData(iter->second.pdata_);
973  pdataBit.key = discoveredBit.key;
974 
975 #ifndef OPENDDS_SAFETY_PROFILE
976  using DCPS::operator!=;
977 #endif
978  if (discoveredBit.user_data != pdataBit.user_data ||
979  (from_sedp && iter->second.bit_ih_ == DDS::HANDLE_NIL)) {
980  discoveredBit.user_data = pdataBit.user_data;
981 
982  // If secure user data, this is the first time we should be
983  // seeing the real user data.
984  iter->second.bit_ih_ = bit_subscriber_->add_participant(pdataBit, secure_part_user_data() ? DDS::NEW_VIEW_STATE : DDS::NOT_NEW_VIEW_STATE);
985  }
986  if (locators_changed(iter->second.pdata_.participantProxy, pdata.participantProxy)) {
987  sedp_->update_locators(pdata);
988  }
989  const DCPS::MonotonicTime_t da = iter->second.pdata_.discoveredAt;
990  iter->second.pdata_ = pdata;
991  iter->second.pdata_.discoveredAt = da;
992  update_lease_expiration_i(iter, now);
994  if (!from_relay && from != ACE_INET_Addr()) {
995  iter->second.last_recv_address_ = from;
996  }
997 
998 #ifndef DDS_HAS_MINIMUM_BIT
999  /*
1000  * If secure user data, force update location bit because we just gave
1001  * the first data on the participant. Readers might have been ignoring
1002  * location samples on the participant until now.
1003  */
1004  process_location_updates_i(iter, "valid SPDP", secure_part_user_data());
1005 #endif
1006  // Else a reset has occurred and check if we should remove the participant
1007  } else if (iter->second.seq_reset_count_ >= config_->max_spdp_sequence_msg_reset_check()) {
1008 #ifdef OPENDDS_SECURITY
1010 #endif
1011 #ifndef DDS_HAS_MINIMUM_BIT
1012  process_location_updates_i(iter, "reset");
1013 #endif
1014  if (iter != participants_.end()) {
1016  participants_.erase(iter);
1017  }
1018  return;
1019  }
1020  }
1021 
1022 #ifndef DDS_HAS_MINIMUM_BIT
1023  if (iter != participants_.end()) {
1024  process_location_updates_i(iter, "catch all");
1025  }
1026 #endif
1027 }
1028 
1029 bool
1031 {
1032  if (seq.getValue() != 0 && iter->second.max_seq_ != DCPS::SequenceNumber::MAX_VALUE) {
1033  if (seq < iter->second.max_seq_) {
1034  const bool honeymoon_period = now < iter->second.discovered_at_ + min_resend_delay_;
1035  if (!honeymoon_period) {
1036  ++iter->second.seq_reset_count_;
1037  }
1038  return false;
1039  } else if (iter->second.seq_reset_count_ > 0) {
1040  --iter->second.seq_reset_count_;
1041  }
1042  }
1043  iter->second.max_seq_ = std::max(iter->second.max_seq_, seq);
1044  return true;
1045 }
1046 
1047 void
1049  const ParameterList& plist,
1050  const ACE_INET_Addr& from)
1051 {
1054  return;
1055  }
1056 
1057  const MonotonicTimePoint now = MonotonicTimePoint::now();
1058  ParticipantData_t pdata;
1059 
1061  pdata.discoveredAt = now.to_monotonic_time();
1062 #ifdef OPENDDS_SECURITY
1064 #endif
1065 
1066  if (!ParameterListConverter::from_param_list(plist, pdata)) {
1067  if (DCPS::DCPS_debug_level > 0) {
1068  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
1069  ACE_TEXT("failed to convert from ParameterList to ")
1070  ACE_TEXT("SPDPdiscoveredParticipantData\n")));
1071  }
1072  return;
1073  }
1074 
1075  // Remote domain ID, if populated, has to match
1076  if (pdata.participantProxy.domainId != domain_) {
1077  return;
1078  }
1079 
1081  if (guid == guid_) {
1082  // About us, stop.
1083  return;
1084  }
1085 
1086  const DCPS::MessageId msg_id = (data.inlineQos.length() && disposed(data.inlineQos)) ? DCPS::DISPOSE_INSTANCE : DCPS::SAMPLE_DATA;
1087 
1088 #ifdef OPENDDS_SECURITY
1089  const bool relay_in_use = (config_->rtps_relay_only() || config_->use_rtps_relay());
1090  const bool from_relay = relay_in_use && (from == config_->spdp_rtps_relay_address());
1091 
1092  if (config_->check_source_ip() && msg_id == DCPS::SAMPLE_DATA && !from_relay && !ip_in_locator_list(from, pdata.participantProxy.metatrafficUnicastLocatorList) && !ip_in_AgentInfo(from, plist)) {
1093  if (DCPS::DCPS_debug_level >= 8) {
1094  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) Spdp::data_received - dropped IP: %C\n"), DCPS::LogAddr(from).c_str()));
1095  }
1096  return;
1097  }
1098 
1099  const bool security_enabled = is_security_enabled();
1100  guard.release();
1101 
1102  if (!security_enabled) {
1103  process_participant_ice(plist, pdata, guid);
1104  }
1105 #elif !defined OPENDDS_SAFETY_PROFILE
1106  const bool relay_in_use = (config_->rtps_relay_only() || config_->use_rtps_relay());
1107  const bool from_relay = relay_in_use && (from == config_->spdp_rtps_relay_address());
1108 
1109  const bool check_source_ip = config_->check_source_ip();
1110  guard.release();
1111 
1112  if (check_source_ip && msg_id == DCPS::SAMPLE_DATA && !from_relay && !ip_in_locator_list(from, pdata.participantProxy.metatrafficUnicastLocatorList)) {
1113  if (DCPS::DCPS_debug_level >= 8) {
1114  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) Spdp::data_received - IP not in locator list: %C\n"), DCPS::LogAddr(from).c_str()));
1115  }
1116  return;
1117  }
1118 #else
1119  guard.release();
1120 #endif
1121 
1122  handle_participant_data(msg_id, pdata, now, to_opendds_seqnum(data.writerSN), from, false);
1123 }
1124 
1125 void
1127 {
1128 #ifndef DDS_HAS_MINIMUM_BIT
1129  if (!secure_part_user_data()) { // else the user data is assumed to be blank
1130  dp_iter->second.bit_ih_ = bit_subscriber_->add_participant(partBitData(dp_iter->second.pdata_), DDS::NEW_VIEW_STATE);
1131  }
1132 
1133  process_location_updates_i(dp_iter, "match_unauthenticated");
1134 #else
1135  ACE_UNUSED_ARG(dp_iter);
1136 #endif /* DDS_HAS_MINIMUM_BIT */
1137 }
1138 
1139 #ifdef OPENDDS_SECURITY
1140 
1141 void
1143 {
1145 
1146  if (DCPS::security_debug.auth_debug) {
1147  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_auth_request() - ")
1148  ACE_TEXT("%C -> %C local %C\n"),
1149  DCPS::LogGuid(guid).c_str(),
1151  DCPS::LogGuid(guid_).c_str()));
1152  }
1153 
1154  // If this message wasn't intended for us, ignore handshake message
1155  if (msg.destination_participant_guid != guid_) {
1156  if (DCPS::security_debug.auth_debug) {
1157  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_auth_request() - ")
1158  ACE_TEXT("Dropped not recipient\n")));
1159  }
1160  return;
1161  }
1162 
1163  if (msg.message_data.length() == 0) {
1164  if (DCPS::security_debug.auth_debug) {
1165  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_auth_request() - ")
1166  ACE_TEXT("Dropped no data\n")));
1167  }
1168  return;
1169  }
1170 
1172 
1174  return;
1175  }
1176 
1177  if (sedp_->ignoring(guid)) {
1178  // Ignore, this is our domain participant or one that the user has
1179  // asked us to ignore.
1180  if (DCPS::security_debug.auth_debug) {
1181  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_auth_request() - ")
1182  ACE_TEXT("Explicitly ignoring\n")));
1183  }
1184  return;
1185  }
1186 
1187 
1188  DiscoveredParticipantMap::iterator iter = participants_.find(guid);
1189 
1190  if (iter != participants_.end()) {
1191  if (msg.message_identity.sequence_number <= iter->second.auth_req_sequence_number_) {
1192  if (DCPS::security_debug.auth_debug) {
1193  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_auth_request() - ")
1194  ACE_TEXT("Dropped due to old sequence number\n")));
1195  }
1196  return;
1197  }
1198 
1199  iter->second.remote_auth_request_token_ = msg.message_data[0];
1200  iter->second.auth_req_sequence_number_ = msg.message_identity.sequence_number;
1201 
1202  attempt_authentication(iter, false);
1203  }
1204 }
1205 
1206 namespace {
1207  void set_participant_guid(const GUID_t& guid, ParameterList& param_list)
1208  {
1209  Parameter gp_param;
1210  gp_param.guid(guid);
1211  gp_param._d(PID_PARTICIPANT_GUID);
1212  param_list.length(param_list.length() + 1);
1213  param_list[param_list.length() - 1] = gp_param;
1214  }
1215 }
1216 
1218 {
1220  {
1221  get_part_bit_data(false),
1224  qos_.property,
1225  {0, 0},
1226  0
1227  },
1229  };
1230 
1235  }
1241  }
1244  }
1245 
1246  ParameterList plist;
1247  set_participant_guid(guid_, plist);
1248 
1249  if (!ParameterListConverter::to_param_list(pbtds.base, plist)) {
1250  if (DCPS::DCPS_debug_level > 0) {
1251  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::local_participant_data_as_octets() - ")
1252  ACE_TEXT("Failed to convert from ParticipantBuiltinTopicData to ParameterList\n")));
1253  }
1254  return DDS::OctetSeq();
1255  }
1256 
1257  ACE_Message_Block temp_buff(DCPS::serialized_size(encoding_plain_big, plist));
1258  DCPS::Serializer ser(&temp_buff, encoding_plain_big);
1259  if (!(ser << plist)) {
1260  if (DCPS::DCPS_debug_level > 0) {
1261  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::local_participant_data_as_octets() - ")
1262  ACE_TEXT("Failed to serialize parameter list.\n")));
1263  }
1264  return DDS::OctetSeq();
1265  }
1266 
1267  DDS::OctetSeq seq(static_cast<unsigned int>(temp_buff.length()));
1268  seq.length(seq.maximum());
1269  std::memcpy(seq.get_buffer(), temp_buff.rd_ptr(), temp_buff.length());
1270  return seq;
1271 }
1272 
1273 void
1275 {
1277 
1278  Security::Authentication_var auth = security_config_->get_authentication();
1279  DDS::Security::SecurityException se = {"", 0, 0};
1280  if (dp.handshake_handle_ != DDS::HANDLE_NIL) {
1281  // Return the handle for reauth.
1282  if (!auth->return_handshake_handle(dp.handshake_handle_, se)) {
1283  if (DCPS::security_debug.auth_warn) {
1284  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
1285  ACE_TEXT("Spdp::send_handshake_request() - ")
1286  ACE_TEXT("Unable to return handshake handle. ")
1287  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
1288  se.code, se.minor_code, se.message.in()));
1289  }
1290  return;
1291  }
1293  }
1294 
1295  const DDS::OctetSeq local_participant = local_participant_data_as_octets();
1296  if (!local_participant.length()) {
1297  return; // already logged in local_participant_data_as_octets()
1298  }
1299 
1301  if (auth->begin_handshake_request(dp.handshake_handle_, hs_mt, identity_handle_, dp.identity_handle_,
1302  local_participant, se)
1304  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_handshake_request() - ")
1305  ACE_TEXT("Failed to begin handshake_request. Security Exception[%d.%d]: %C\n"),
1306  se.code, se.minor_code, se.message.in()));
1307  return;
1308  }
1309 
1311 
1321  msg.message_data.length(1);
1322  msg.message_data[0] = hs_mt;
1323 
1324  if (send_handshake_message(guid, dp, msg) != DDS::RETCODE_OK) {
1325  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_handshake_request() - ")
1326  ACE_TEXT("Unable to write stateless message (handshake).\n")));
1327  return;
1328  } else if (DCPS::security_debug.auth_debug) {
1329  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::send_handshake_request() - ")
1330  ACE_TEXT("Sent handshake request message for participant: %C\n"),
1331  DCPS::LogGuid(guid).c_str()));
1332  }
1333 }
1334 
1335 void
1337 {
1338  const DCPS::GUID_t& guid = iter->first;
1339  DiscoveredParticipant& dp = iter->second;
1340 
1341  if (DCPS::security_debug.auth_debug) {
1342  ACE_DEBUG((LM_DEBUG, "(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication "
1343  "for %C from_discovery=%d have_remote_token=%d auth_state=%d handshake_state=%d\n",
1344  DCPS::LogGuid(guid).c_str(),
1345  from_discovery, !(dp.remote_auth_request_token_ == DDS::Security::Token()),
1346  dp.auth_state_, dp.handshake_state_));
1347  }
1348 
1349  dp.handshake_resend_falloff_.set(config_->auth_resend_period());
1350 
1351  if (!from_discovery && dp.handshake_state_ != HANDSHAKE_STATE_DONE) {
1352  // Ignore auth reqs when already in progress.
1354  return;
1355  }
1356 
1357  // Reset.
1359  dp.handshake_deadline_ = DCPS::MonotonicTimePoint::now() + config_->max_auth_time();
1360  handshake_deadlines_.insert(std::make_pair(dp.handshake_deadline_, guid));
1361  tport_->handshake_deadline_task_->schedule(config_->max_auth_time());
1362 
1363  DDS::Security::Authentication_var auth = security_config_->get_authentication();
1364  DDS::Security::SecurityException se = {"", 0, 0};
1365 
1366  const DDS::Security::ValidationResult_t vr = auth->validate_remote_identity(
1368  identity_handle_, dp.identity_token_, guid, se);
1369 
1371  if (dp.have_auth_req_msg_) {
1380  dp.auth_req_msg_.message_data.length(1);
1382  // Send the auth req immediately to reset the remote if they are
1383  // still authenticated with us.
1385  if (DCPS::security_debug.auth_debug) {
1386  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::attempt_authentication() - ")
1387  ACE_TEXT("Unable to write auth req message.\n")));
1388  }
1389  } else {
1390  if (DCPS::security_debug.auth_debug) {
1391  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication() - ")
1392  ACE_TEXT("Sent auth req message for participant: %C\n"),
1393  DCPS::LogGuid(guid).c_str()));
1394  }
1395  }
1397  }
1398 
1399  switch (vr) {
1404  return;
1405  }
1407  if (DCPS::security_debug.auth_debug) {
1408  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication() - ")
1409  ACE_TEXT("Attempting authentication (expecting request) for participant: %C\n"),
1410  DCPS::LogGuid(guid).c_str()));
1411  }
1413  dp.is_requester_ = true;
1414  return; // We'll need to wait for an inbound handshake request from the remote participant
1415  }
1417  if (DCPS::security_debug.auth_debug) {
1418  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication() - ")
1419  ACE_TEXT("Attempting authentication (sending request/expecting reply) for participant: %C\n"),
1420  DCPS::LogGuid(guid).c_str()));
1421  }
1423  send_handshake_request(guid, dp);
1424  return;
1425  }
1427  if (DCPS::security_debug.auth_debug) {
1428  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication() - ")
1429  ACE_TEXT("Remote participant identity is invalid. Security Exception[%d.%d]: %C\n"),
1430  se.code, se.minor_code, se.message.in()));
1431  }
1435  return;
1436  }
1437  default: {
1438  if (DCPS::security_debug.auth_debug) {
1439  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::attempt_authentication() - ")
1440  ACE_TEXT("Unexpected return value while validating remote identity. Security Exception[%d.%d]: %C\n"),
1441  se.code, se.minor_code, se.message.in()));
1442  }
1446  return;
1447  }
1448  }
1449 }
1450 
1451 void
1453 {
1454  DDS::Security::SecurityException se = {"", 0, 0};
1455  Security::Authentication_var auth = security_config_->get_authentication();
1456 
1458 
1459  if (DCPS::security_debug.auth_debug) {
1460  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1461  ACE_TEXT("%C -> %C local %C\n"),
1462  DCPS::LogGuid(src_participant).c_str(),
1464  DCPS::LogGuid(guid_).c_str()));
1465  }
1466 
1467  // If this message wasn't intended for us, ignore handshake message
1468  if (msg.destination_participant_guid != guid_) {
1469  if (DCPS::security_debug.auth_debug) {
1470  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1471  ACE_TEXT("Dropped not recipient\n")));
1472  }
1473  return;
1474  }
1475 
1476  if (msg.message_data.length() == 0) {
1477  if (DCPS::security_debug.auth_debug) {
1478  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1479  ACE_TEXT("Dropped no data\n")));
1480  }
1481  return;
1482  }
1483 
1485 
1486  if (!initialized_flag_ || shutdown_flag_ ) {
1487  return;
1488  }
1489 
1490  // If discovery hasn't initialized / validated this participant yet, ignore handshake messages
1491  DiscoveredParticipantIter iter = participants_.find(src_participant);
1492  if (iter == participants_.end()) {
1493  if (DCPS::security_debug.auth_warn) {
1495  ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_handshake_message() - ")
1496  ACE_TEXT("received handshake for undiscovered participant %C. Ignoring.\n"),
1497  DCPS::LogGuid(src_participant).c_str()));
1498  }
1499  return;
1500  }
1501 
1502  DiscoveredParticipant& dp = iter->second;
1503 
1504  if (DCPS::security_debug.auth_debug) {
1505  ACE_DEBUG((LM_DEBUG, "(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - "
1506  "for %C auth_state=%d handshake_state=%d\n",
1507  DCPS::LogGuid(src_participant).c_str(),
1508  dp.auth_state_, dp.handshake_state_));
1509  }
1510 
1511  // We have received a handshake message from the remote which means
1512  // we don't need to send the auth req.
1513  dp.have_auth_req_msg_ = false;
1514 
1515  dp.handshake_resend_falloff_.set(config_->auth_resend_period());
1516 
1518  // Remote is still sending a reply, so resend the final.
1520  if (sedp_->write_stateless_message(dp.handshake_msg_, reader) != DDS::RETCODE_OK) {
1521  if (DCPS::security_debug.auth_debug) {
1522  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::handle_handshake_message() - ")
1523  ACE_TEXT("Unable to write handshake message.\n")));
1524  }
1525  } else {
1526  if (DCPS::security_debug.auth_debug) {
1527  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1528  ACE_TEXT("Sent handshake message for participant: %C\n"),
1529  DCPS::LogGuid(iter->first).c_str()));
1530  }
1531  }
1532  return;
1533  }
1534 
1535  if (msg.message_identity.sequence_number <= iter->second.handshake_sequence_number_) {
1536  return;
1537  }
1538  iter->second.handshake_sequence_number_ = msg.message_identity.sequence_number;
1539 
1540  switch (dp.handshake_state_) {
1541  case HANDSHAKE_STATE_DONE:
1542  // Handled above.
1543  return;
1544 
1546  if (DCPS::security_debug.auth_warn) {
1548  ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_handshake_message() - ")
1549  ACE_TEXT("Invalid handshake state\n")));
1550  }
1551  return;
1552  }
1553 
1560  reply.destination_participant_guid = src_participant;
1563  reply.message_data.length(1);
1564  reply.message_data[0] = msg.message_data[0];
1565 
1566  if (dp.handshake_handle_ != DDS::HANDLE_NIL) {
1567  // Return the handle for reauth.
1568  if (!auth->return_handshake_handle(dp.handshake_handle_, se)) {
1569  if (DCPS::security_debug.auth_warn) {
1570  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
1571  ACE_TEXT("Spdp::handke_handshake_message() - ")
1572  ACE_TEXT("Unable to return handshake handle. ")
1573  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
1574  se.code, se.minor_code, se.message.in()));
1575  }
1576  return;
1577  }
1579  }
1580 
1581  const DDS::OctetSeq local_participant = local_participant_data_as_octets();
1582  if (!local_participant.length()) {
1583  return; // already logged in local_participant_data_as_octets()
1584  }
1586  auth->begin_handshake_reply(dp.handshake_handle_, reply.message_data[0], dp.identity_handle_,
1587  identity_handle_, local_participant, se);
1588 
1589  switch (vr) {
1591  // Theoretically, this shouldn't happen unless handshakes can involve fewer than 3 messages
1595  match_authenticated(src_participant, iter);
1596  return;
1597  }
1599  if (DCPS::security_debug.auth_warn) {
1600  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_handshake_message() - ")
1601  ACE_TEXT("Failed to reply to incoming handshake message. Security Exception[%d.%d]: %C\n"),
1602  se.code, se.minor_code, se.message.in()));
1603  }
1604  return;
1605  }
1607  if (DCPS::security_debug.auth_warn) {
1608  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1609  ACE_TEXT("Unexpected validation pending retry\n")));
1610  }
1611  return;
1612  }
1614  if (DCPS::security_debug.auth_warn) {
1615  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1616  ACE_TEXT("Unexpected validation pending handshake request\n")));
1617  }
1618  return;
1619  }
1621  if (send_handshake_message(src_participant, dp, reply) != DDS::RETCODE_OK) {
1622  if (DCPS::security_debug.auth_warn) {
1623  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_handshake_message() - ")
1624  ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
1625  }
1626  return;
1627  } else {
1628  if (DCPS::security_debug.auth_debug) {
1629  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1630  ACE_TEXT("Sent handshake reply for participant: %C\n"),
1631  DCPS::LogGuid(src_participant).c_str()));
1632  }
1633  }
1635  return;
1636  }
1638  // Theoretically, this shouldn't happen unless handshakes can involve fewer than 3 messages
1639  if (send_handshake_message(src_participant, dp, reply) != DDS::RETCODE_OK) {
1640  if (DCPS::security_debug.auth_warn) {
1641  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_handshake_message() - ")
1642  ACE_TEXT("Unable to write stateless message for final message.\n")));
1643  }
1644  return;
1645  } else {
1646  if (DCPS::security_debug.auth_debug) {
1647  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1648  ACE_TEXT("Sent handshake final for participant: %C\n"),
1649  DCPS::LogGuid(src_participant).c_str()));
1650  }
1651  }
1655  match_authenticated(src_participant, iter);
1656  return;
1657  }
1658  }
1659  return;
1660  }
1661 
1668  reply.destination_participant_guid = src_participant;
1671  reply.message_data.length(1);
1672 
1673  DDS::Security::ValidationResult_t vr = auth->process_handshake(reply.message_data[0], msg.message_data[0],
1674  dp.handshake_handle_, se);
1675  switch (vr) {
1677  if (DCPS::security_debug.auth_warn) {
1678  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: ")
1679  ACE_TEXT("Spdp::handle_handshake_message() - ")
1680  ACE_TEXT("Failed to process incoming handshake message when ")
1681  ACE_TEXT("expecting %C from %C. Security Exception[%d.%d]: %C\n"),
1682  dp.is_requester_ ? "final" : "reply",
1683  DCPS::LogGuid(src_participant).c_str(),
1684  se.code, se.minor_code, se.message.in()));
1685  }
1686  return;
1687  }
1689  if (DCPS::security_debug.auth_warn) {
1690  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1691  ACE_TEXT("Unexpected validation pending retry\n")));
1692  }
1693  return;
1694  }
1696  if (DCPS::security_debug.auth_warn) {
1697  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1698  ACE_TEXT("Unexpected validation pending handshake request\n")));
1699  }
1700  return;
1701  }
1703  // Theoretically, this shouldn't happen unless handshakes can involve more than 3 messages
1704  if (send_handshake_message(src_participant, dp, reply) != DDS::RETCODE_OK) {
1705  if (DCPS::security_debug.auth_warn) {
1706  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1707  ACE_TEXT("Unable to write stateless message for handshake reply.\n")));
1708  }
1709  return;
1710  } else {
1711  if (DCPS::security_debug.auth_debug) {
1712  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1713  ACE_TEXT("Sent handshake unknown message for participant: %C\n"),
1714  DCPS::LogGuid(src_participant).c_str()));
1715  }
1716  }
1717  return;
1718  }
1722  // Install the shared secret before sending the final so that
1723  // we are prepared to receive the crypto tokens from the
1724  // replier.
1725 
1726  // Send the final first because match_authenticated takes forever.
1727  if (send_handshake_message(src_participant, iter->second, reply) != DDS::RETCODE_OK) {
1728  if (DCPS::security_debug.auth_warn) {
1729  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} WARNING: Spdp::handle_handshake_message() - ")
1730  ACE_TEXT("Unable to write stateless message for final message.\n")));
1731  }
1732  return;
1733  } else {
1734  if (DCPS::security_debug.auth_debug) {
1735  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::handle_handshake_message() - ")
1736  ACE_TEXT("Sent handshake final for participant: %C\n"),
1737  DCPS::LogGuid(src_participant).c_str()));
1738  }
1739  }
1740 
1742  match_authenticated(src_participant, iter);
1743  return;
1744  }
1749  match_authenticated(src_participant, iter);
1750  return;
1751  }
1752  }
1753  }
1754  }
1755 }
1756 
1757 void
1759 {
1761 
1763  return;
1764  }
1765 
1766  for (TimeQueue::iterator pos = handshake_deadlines_.begin(),
1767  limit = handshake_deadlines_.upper_bound(now); pos != limit;) {
1768 
1769  DiscoveredParticipantIter pit = participants_.find(pos->second);
1770  if (pit != participants_.end()) {
1771  if (DCPS::security_debug.auth_debug) {
1772  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::process_handshake_deadlines() - ")
1773  ACE_TEXT("Removing discovered participant due to authentication timeout: %C\n"),
1774  DCPS::LogGuid(pos->second).c_str()));
1775  }
1776  const DCPS::MonotonicTimePoint ptime = pos->first;
1778  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
1779  if (sedp_endpoint) {
1780  stop_ice(sedp_endpoint, pit->first, pit->second.pdata_.participantProxy.availableBuiltinEndpoints,
1781  pit->second.pdata_.participantProxy.availableExtendedBuiltinEndpoints);
1782  }
1783  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = tport_->get_ice_endpoint();
1784  if (spdp_endpoint) {
1785  ice_agent_->stop_ice(spdp_endpoint, guid_, pit->first);
1786  }
1787  handshake_deadlines_.erase(pos);
1789  participants_.erase(pit);
1790  } else {
1793  pit->second.handshake_state_ = HANDSHAKE_STATE_DONE;
1794  handshake_deadlines_.erase(pos);
1795  match_unauthenticated(pit);
1796  }
1797  pos = handshake_deadlines_.lower_bound(ptime);
1798  limit = handshake_deadlines_.upper_bound(now);
1799  } else {
1800  handshake_deadlines_.erase(pos++);
1801  }
1802  }
1803 
1804  if (!handshake_deadlines_.empty()) {
1805  tport_->handshake_deadline_task_->schedule(handshake_deadlines_.begin()->first - now);
1806  }
1807 }
1808 
1809 void
1811 {
1813 
1815  return;
1816  }
1817 
1818  bool processor_needs_cancel = false;
1819  for (TimeQueue::iterator pos = handshake_resends_.begin(), limit = handshake_resends_.end();
1820  pos != limit && pos->first <= now;) {
1821 
1822  DiscoveredParticipantIter pit = participants_.find(pos->second);
1823  if (pit != participants_.end() &&
1824  pit->second.stateless_msg_deadline_ <= now) {
1826  pit->second.stateless_msg_deadline_ = now + pit->second.handshake_resend_falloff_.get();
1827 
1828  // Send the auth req first to reset the remote if necessary.
1829  if (pit->second.have_auth_req_msg_) {
1830  // Send the SPDP announcement in case it got lost.
1831  tport_->write_i(pit->first, pit->second.last_recv_address_, SpdpTransport::SEND_RELAY | SpdpTransport::SEND_DIRECT);
1832  if (sedp_->transport_inst()->count_messages()) {
1833  ++tport_->transport_statistics_.writer_resend_count[make_id(guid_, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER)];
1834  }
1835  if (sedp_->write_stateless_message(pit->second.auth_req_msg_, reader) != DDS::RETCODE_OK) {
1836  if (DCPS::security_debug.auth_debug) {
1837  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::process_handshake_resends() - ")
1838  ACE_TEXT("Unable to write auth req message retry.\n")));
1839  }
1840  } else {
1841  if (DCPS::security_debug.auth_debug) {
1842  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::process_handshake_resends() - ")
1843  ACE_TEXT("Sent auth req message for participant: %C\n"),
1844  DCPS::LogGuid(pit->first).c_str()));
1845  }
1846  }
1847  }
1848  if (pit->second.have_handshake_msg_) {
1849  if (sedp_->transport_inst()->count_messages()) {
1850  ++tport_->transport_statistics_.writer_resend_count[make_id(guid_, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER)];
1851  }
1852  if (sedp_->write_stateless_message(pit->second.handshake_msg_, reader) != DDS::RETCODE_OK) {
1853  if (DCPS::security_debug.auth_debug) {
1854  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::process_handshake_resends() - ")
1855  ACE_TEXT("Unable to write handshake message retry.\n")));
1856  }
1857  } else {
1858  if (DCPS::security_debug.auth_debug) {
1859  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} DEBUG: Spdp::process_handshake_resends() - ")
1860  ACE_TEXT("Sent handshake message for participant: %C\n"),
1861  DCPS::LogGuid(pit->first).c_str()));
1862  }
1863  }
1864  }
1865  pit->second.handshake_resend_falloff_.advance(config_->max_auth_time());
1866 
1867  handshake_resends_.insert(std::make_pair(pit->second.stateless_msg_deadline_, pit->first));
1868  if (pit->second.stateless_msg_deadline_ < handshake_resends_.begin()->first) {
1869  processor_needs_cancel = true;
1870  }
1871  }
1872 
1873  handshake_resends_.erase(pos++);
1874  }
1875 
1876  if (!handshake_resends_.empty()) {
1877  if (processor_needs_cancel) {
1878  tport_->handshake_resend_task_->cancel();
1879  }
1880  tport_->handshake_resend_task_->schedule(handshake_resends_.begin()->first - now);
1881  }
1882 }
1883 
1884 bool
1886 {
1888 
1889  if (DCPS::security_debug.auth_debug) {
1891  ACE_TEXT("(%P|%t) Spdp::handle_participant_crypto_tokens() from %C\n"),
1892  DCPS::LogGuid(src_participant).c_str()));
1893  }
1894 
1895  DDS::Security::SecurityException se = {"", 0, 0};
1896  Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
1897 
1898  // If this message wasn't intended for us, ignore volatile message
1899  if (msg.destination_participant_guid != guid_ || !msg.message_data.length()) {
1900  return false;
1901  }
1902 
1904 
1906  // not configured for RTPS Protection, therefore doesn't support participant crypto tokens
1907  return false;
1908  }
1909 
1910  // If discovery hasn't initialized / validated this participant yet, ignore volatile message
1911  DiscoveredParticipantIter iter = participants_.find(src_participant);
1912  if (iter == participants_.end()) {
1913  if (DCPS::security_debug.auth_warn) {
1915  ACE_TEXT("(%P|%t) {auth_warn} Spdp::handle_participant_crypto_tokens() - ")
1916  ACE_TEXT("received tokens for undiscovered participant %C. Ignoring.\n"),
1917  DCPS::LogGuid(src_participant).c_str()));
1918  }
1919  return false;
1920  }
1921 
1923  log_progress("participant crypto token", guid_, src_participant, iter->second.discovered_at_.to_monotonic_time());
1924  }
1925 
1926  const DDS::Security::ParticipantCryptoTokenSeq& inboundTokens =
1927  reinterpret_cast<const DDS::Security::ParticipantCryptoTokenSeq&>(msg.message_data);
1928  const DDS::Security::ParticipantCryptoHandle dp_crypto_handle =
1929  sedp_->get_handle_registry()->get_remote_participant_crypto_handle(iter->first);
1930 
1931  if (!key_exchange->set_remote_participant_crypto_tokens(crypto_handle_, dp_crypto_handle, inboundTokens, se)) {
1932  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::handle_participant_crypto_tokens() - ")
1933  ACE_TEXT("Unable to set remote participant crypto tokens with crypto key exchange plugin. ")
1934  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
1935  se.code, se.minor_code, se.message.in()));
1936  return false;
1937  }
1938 
1939  sedp_->process_association_records_i(iter->second);
1940 
1941  return true;
1942 }
1943 
1948 {
1949  dp.handshake_msg_ = msg;
1951 
1952  const DCPS::GUID_t reader = make_id(guid, ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER);
1953  const DDS::ReturnCode_t retval = sedp_->write_stateless_message(dp.handshake_msg_, reader);
1954  dp.have_handshake_msg_ = true;
1956  return retval;
1957 }
1958 
1960 {
1961  const MonotonicTimePoint deadline = MonotonicTimePoint::now() + time;
1962  handshake_resends_.insert(std::make_pair(deadline, guid));
1963  if (deadline < handshake_resends_.begin()->first) {
1964  tport_->handshake_resend_task_->cancel();
1965  }
1966  tport_->handshake_resend_task_->schedule(time);
1967  return deadline;
1968 }
1969 
1970 bool
1972 {
1973  if (iter->second.handshake_handle_ == DDS::HANDLE_NIL) {
1974  return true;
1975  }
1976 
1977  DDS::Security::SecurityException se = {"", 0, 0};
1978 
1979  Security::Authentication_var auth = security_config_->get_authentication();
1980  Security::AccessControl_var access = security_config_->get_access_control();
1981  Security::CryptoKeyFactory_var key_factory = security_config_->get_crypto_key_factory();
1982  Security::CryptoKeyExchange_var key_exchange = security_config_->get_crypto_key_exchange();
1983  Security::HandleRegistry_rch handle_registry = security_config_->get_handle_registry(guid_);
1984 
1985  if (iter->second.shared_secret_handle_ != 0) {
1986  // Return the shared secret.
1987  if (!auth->return_sharedsecret_handle(iter->second.shared_secret_handle_, se)) {
1988  if (DCPS::security_debug.auth_warn) {
1989  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
1990  ACE_TEXT("Spdp::match_authenticated() - ")
1991  ACE_TEXT("Unable to return shared secret handle. Security Exception[%d.%d]: %C\n"),
1992  se.code, se.minor_code, se.message.in()));
1993  }
1994  return false;
1995  }
1996 
1997  // Get the new shared secret.
1998  iter->second.shared_secret_handle_ = auth->get_shared_secret(iter->second.handshake_handle_, se);
1999  if (iter->second.shared_secret_handle_ == 0) {
2000  if (DCPS::security_debug.auth_warn) {
2001  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2002  ACE_TEXT("Spdp::match_authenticated() - ")
2003  ACE_TEXT("Unable to get shared secret handle. Security Exception[%d.%d]: %C\n"),
2004  se.code, se.minor_code, se.message.in()));
2005  }
2006  return false;
2007  }
2008 
2009  sedp_->disassociate_volatile(iter->second);
2010  sedp_->cleanup_volatile_crypto(iter->first);
2011  sedp_->associate_volatile(iter->second);
2012  sedp_->generate_remote_matched_crypto_handles(iter->second);
2013  sedp_->process_association_records_i(iter->second);
2014 
2015  if (!auth->return_handshake_handle(iter->second.handshake_handle_, se)) {
2016  if (DCPS::security_debug.auth_warn) {
2017  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2018  ACE_TEXT("Spdp::send_handshake_request() - ")
2019  ACE_TEXT("Unable to return handshake handle. ")
2020  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
2021  se.code, se.minor_code, se.message.in()));
2022  }
2023  return false;
2024  }
2025 
2026  iter->second.handshake_handle_ = DDS::HANDLE_NIL;
2027  return true;
2028  }
2029 
2030  iter->second.shared_secret_handle_ = auth->get_shared_secret(iter->second.handshake_handle_, se);
2031  if (iter->second.shared_secret_handle_ == 0) {
2032  if (DCPS::security_debug.auth_warn) {
2033  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2034  ACE_TEXT("Spdp::match_authenticated() - ")
2035  ACE_TEXT("Unable to get shared secret handle. Security Exception[%d.%d]: %C\n"),
2036  se.code, se.minor_code, se.message.in()));
2037  }
2038  return false;
2039  }
2040 
2041  if (!auth->get_authenticated_peer_credential_token(
2042  iter->second.authenticated_peer_credential_token_, iter->second.handshake_handle_, se)) {
2043  if (DCPS::security_debug.auth_warn) {
2044  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2045  ACE_TEXT("Spdp::match_authenticated() - ")
2046  ACE_TEXT("Unable to get authenticated peer credential token. ")
2047  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
2048  se.code, se.minor_code, se.message.in()));
2049  }
2050  return false;
2051  }
2052 
2053  iter->second.permissions_handle_ = access->validate_remote_permissions(
2054  auth, identity_handle_, iter->second.identity_handle_,
2055  iter->second.permissions_token_, iter->second.authenticated_peer_credential_token_, se);
2056  handle_registry->insert_remote_participant_permissions_handle(guid, iter->second.permissions_handle_);
2057 
2059  iter->second.permissions_handle_ == DDS::HANDLE_NIL) {
2060  if (DCPS::security_debug.auth_warn) {
2061  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2062  ACE_TEXT("Spdp::match_authenticated() - ")
2063  ACE_TEXT("Unable to validate remote participant with access control plugin. ")
2064  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
2065  se.code, se.minor_code, se.message.in()));
2066  }
2067  return false;
2068  }
2069 
2071  if (!access->check_remote_participant(iter->second.permissions_handle_, domain_,
2072  iter->second.pdata_.ddsParticipantDataSecure, se)) {
2073  if (DCPS::security_debug.auth_warn) {
2074  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2075  ACE_TEXT("Spdp::match_authenticated() - ")
2076  ACE_TEXT("Remote participant check failed. Security Exception[%d.%d]: %C\n"),
2077  se.code, se.minor_code, se.message.in()));
2078  }
2079  return false;
2080  }
2081  }
2082 
2083  if (DCPS::security_debug.auth_debug) {
2084  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) {auth_debug} Spdp::match_authenticated - ")
2085  ACE_TEXT("auth and access control complete for peer %C\n"),
2086  DCPS::LogGuid(guid).c_str()));
2087  }
2088 
2090  log_progress("authentication", guid_, guid, iter->second.discovered_at_.to_monotonic_time());
2091  }
2092 
2093  DDS::Security::ParticipantCryptoHandle dp_crypto_handle =
2094  sedp_->get_handle_registry()->get_remote_participant_crypto_handle(iter->first);
2095 
2096  if (dp_crypto_handle == DDS::HANDLE_NIL) {
2097  dp_crypto_handle = key_factory->register_matched_remote_participant(
2098  crypto_handle_, iter->second.identity_handle_, iter->second.permissions_handle_,
2099  iter->second.shared_secret_handle_, se);
2100  sedp_->get_handle_registry()->insert_remote_participant_crypto_handle(iter->first, dp_crypto_handle);
2101  if (dp_crypto_handle == DDS::HANDLE_NIL) {
2102  if (DCPS::security_debug.auth_warn) {
2103  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
2104  ACE_TEXT("Spdp::match_authenticated() - Unable to register remote ")
2105  ACE_TEXT("participant with crypto key factory plugin. ")
2106  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
2107  se.code, se.minor_code, se.message.in()));
2108  }
2109  return false;
2110  }
2111  }
2112 
2114  if (!key_exchange->create_local_participant_crypto_tokens(
2115  iter->second.crypto_tokens_, crypto_handle_, dp_crypto_handle, se)) {
2116  if (DCPS::security_debug.auth_warn) {
2117  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_debug} ")
2118  ACE_TEXT("Spdp::match_authenticated() - ")
2119  ACE_TEXT("Unable to create local participant crypto ")
2120  ACE_TEXT("tokens with crypto key exchange plugin. ")
2121  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
2122  se.code, se.minor_code, se.message.in()));
2123  }
2124  return false;
2125  }
2126  }
2127 
2128  sedp_->generate_remote_matched_crypto_handles(iter->second);
2129 
2130  // Auth is now complete.
2131  sedp_->process_association_records_i(iter->second);
2132 
2133 #ifndef DDS_HAS_MINIMUM_BIT
2134  process_location_updates_i(iter, "match_authenticated");
2135 #endif
2136  return true;
2137 }
2138 
2139 void Spdp::update_agent_info(const DCPS::GUID_t&, const ICE::AgentInfo&)
2140 {
2141  if (is_security_enabled()) {
2143  }
2144 }
2145 
2146 void Spdp::remove_agent_info(const DCPS::GUID_t&)
2147 {
2148  if (is_security_enabled()) {
2150  }
2151 }
2152 #endif
2153 
2154 void
2156 {
2157  OPENDDS_ASSERT(bit_subscriber);
2158 
2159  bit_subscriber_ = bit_subscriber;
2160 
2161  // Defer initilization until we have the bit subscriber.
2163  tport_->open(sedp_->reactor_task(), sedp_->job_queue());
2164 
2165 #ifdef OPENDDS_SECURITY
2166  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
2167  if (sedp_endpoint) {
2169  ice_agent_->add_local_agent_info_listener(sedp_endpoint, l, DCPS::static_rchandle_cast<AgentInfoListener>(rchandle_from(this)));
2170  }
2171 #endif
2172 
2173  initialized_flag_ = true;
2174  tport_->enable_periodic_tasks();
2175 }
2176 
2177 void
2179 {
2180  bit_subscriber_->clear();
2181 }
2182 
2183 namespace {
2184  bool is_opendds(const ParticipantProxy_t& participant)
2185  {
2186  return 0 == std::memcmp(&participant.vendorId, DCPS::VENDORID_OCI, sizeof(VendorId_t));
2187  }
2188 }
2189 
2190 bool
2191 Spdp::is_expectant_opendds(const GUID_t& participant) const
2192 {
2193  const DiscoveredParticipantConstIter iter = participants_.find(participant);
2194  if (iter == participants_.end()) {
2195  return false;
2196  }
2197  return is_opendds(iter->second.pdata_.participantProxy) &&
2198  (iter->second.pdata_.participantProxy.opendds_participant_flags.bits & PFLAGS_NO_ASSOCIATED_WRITERS) == 0;
2199 }
2200 
2202 #ifdef OPENDDS_SECURITY
2203  bool always_in_the_clear,
2205 #endif
2206 )
2207 {
2208  // The RTPS spec has no constants for the builtinTopics{Writer,Reader}
2209 
2210  // This locator list should not be empty, but we won't actually be using it.
2211  // The OpenDDS publication/subscription data will have locators included.
2212  DCPS::LocatorSeq nonEmptyList(1);
2213  nonEmptyList.length(1);
2214  nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
2215  nonEmptyList[0].port = 12345;
2216  std::memset(nonEmptyList[0].address, 0, 12);
2217  nonEmptyList[0].address[12] = 127;
2218  nonEmptyList[0].address[13] = 0;
2219  nonEmptyList[0].address[14] = 0;
2220  nonEmptyList[0].address[15] = 1;
2221 
2222  const GuidPrefix_t& gp = guid_.guidPrefix;
2223 
2224  const DCPS::LocatorSeq unicast_locators = sedp_->unicast_locators();
2225  const DCPS::LocatorSeq multicast_locators = sedp_->multicast_locators();
2226 
2227  if (unicast_locators.length() == 0 && multicast_locators.length() == 0) {
2228  if (DCPS::DCPS_debug_level > 0) {
2230  ACE_TEXT("(%P|%t) ERROR: ")
2231  ACE_TEXT("Spdp::build_local_pdata: ")
2232  ACE_TEXT("no locators\n")));
2233  }
2234  }
2235 
2236 #ifdef OPENDDS_SECURITY
2238  kind,
2239  { // ParticipantBuiltinTopicDataSecure
2240  { // ParticipantBuiltinTopicData (security enhanced)
2241  get_part_bit_data(!always_in_the_clear),
2244  qos_.property,
2245  {
2248  },
2250  },
2252  },
2253 #else
2254  const SPDPdiscoveredParticipantData pdata = {
2255  get_part_bit_data(false),
2256 #endif
2257  { // ParticipantProxy_t
2258  domain_
2259  , ""
2260  , PROTOCOLVERSION
2261  , {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
2262  gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]}
2264  , false /*expectsIQoS*/
2266  , 0
2267  , unicast_locators
2268  , multicast_locators
2269  , nonEmptyList /*defaultMulticastLocatorList*/
2270  , nonEmptyList /*defaultUnicastLocatorList*/
2271  , {0 /*manualLivelinessCount*/} //FUTURE: implement manual liveliness
2272  , qos_.property
2273  , {config_->participant_flags()} // opendds_participant_flags
2275 #ifdef OPENDDS_SECURITY
2277 #endif
2278  },
2279  { // Duration_t (leaseDuration)
2280  static_cast<CORBA::Long>(lease_duration_.value().sec()),
2281  0 // we are not supporting fractional seconds in the lease duration
2282  },
2284  };
2285 
2286  return pdata;
2287 }
2288 
2289 
2291 {
2292 
2293 #ifdef OPENDDS_SECURITY
2294  if (is_security_enabled()) {
2296  }
2297 #endif
2298 
2299  return true;
2300 }
2301 
2302 #if !defined _MSC_VER || _MSC_VER >= 1900
2306 #endif
2307 
2309  : outer_(outer)
2310  , buff_(64 * 1024)
2311  , wbuff_(64 * 1024)
2312 #ifdef OPENDDS_SECURITY
2313  , relay_spdp_task_falloff_(outer->config()->sedp_heartbeat_period())
2314  , relay_stun_task_falloff_(outer->config()->sedp_heartbeat_period())
2315 #endif
2316  , network_is_unreachable_(false)
2317  , ice_endpoint_added_(false)
2318  , transport_statistics_(DCPS::TransportRegistry::DEFAULT_INST_PREFIX +
2319  OPENDDS_STRING("_SPDPTransportInst_") +
2320  DCPS::GuidConverter(outer->guid_).uniqueParticipantId() +
2321  DCPS::to_dds_string(outer->domain_))
2322 {
2323  hdr_.prefix[0] = 'R';
2324  hdr_.prefix[1] = 'T';
2325  hdr_.prefix[2] = 'P';
2326  hdr_.prefix[3] = 'S';
2329  DCPS::assign(hdr_.guidPrefix, outer->guid_.guidPrefix);
2332  data_.smHeader.submessageLength = 0; // last submessage in the Message
2333  data_.extraFlags = 0;
2337  data_.writerSN.high = 0;
2338  data_.writerSN.low = 0;
2339 
2340 #ifdef ACE_HAS_MAC_OSX
2343 #ifdef ACE_HAS_IPV6
2344  multicast_ipv6_socket_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
2346 #endif
2347 #endif
2348 
2349  multicast_interface_ = outer->disco_->multicast_interface();
2350 
2351  const u_short port_common = outer->config_->port_common(outer->domain_);
2352  multicast_address_ = outer->config_->multicast_address(port_common);
2353 
2354 #ifdef ACE_HAS_IPV6
2355  multicast_ipv6_address_ = outer->config_->ipv6_multicast_address(port_common);
2356 #endif
2357 
2358  send_addrs_.insert(multicast_address_);
2359 #ifdef ACE_HAS_IPV6
2360  send_addrs_.insert(multicast_ipv6_address_);
2361 #endif
2362 
2363  typedef RtpsDiscovery::AddrVec::const_iterator iter;
2364  const RtpsDiscovery::AddrVec addrs = outer->config_->spdp_send_addrs();
2365  for (iter it = addrs.begin(),
2366  end = addrs.end(); it != end; ++it) {
2367  send_addrs_.insert(ACE_INET_Addr(it->c_str()));
2368  }
2369 
2370  u_short participantId = 0;
2371 
2372 #ifdef OPENDDS_SAFETY_PROFILE
2373  const u_short startingParticipantId = participantId;
2374 #endif
2375 
2376  const u_short max_part_id = 119; // RTPS 2.5 9.6.2.3
2377  while (!open_unicast_socket(port_common, participantId)) {
2378  if (participantId == max_part_id && log_level >= LogLevel::Warning) {
2379  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: Spdp::SpdpTransport: "
2380  "participant id is going above max %u allowed by RTPS spec\n", max_part_id));
2381  // As long as it doesn't result in an invalid port, going past this
2382  // shouldn't cause a problem, but it could be a sign that OpenDDS has a
2383  // limited number of ports at its disposal. Also another implementation
2384  // could use this as a hard limit, but that's much less of a concern.
2385  }
2386  ++participantId;
2387  }
2388 #ifdef ACE_HAS_IPV6
2389  u_short port = uni_port_;
2390 
2391  while (!open_unicast_ipv6_socket(port)) {
2392  ++port;
2393  }
2394 #endif
2395 
2396 #ifdef OPENDDS_SAFETY_PROFILE
2397  if (participantId > startingParticipantId && ACE_OS::getpid() == -1) {
2398  // Since pids are not available, use the fact that we had to increment
2399  // participantId to modify the GUID's pid bytes. This avoids GUID conflicts
2400  // between processes on the same host which start at the same time
2401  // (resulting in the same seed value for the random number generator).
2402  hdr_.guidPrefix[8] = static_cast<CORBA::Octet>(participantId >> 8);
2403  hdr_.guidPrefix[9] = static_cast<CORBA::Octet>(participantId & 0xFF);
2404  outer->guid_.guidPrefix[8] = hdr_.guidPrefix[8];
2405  outer->guid_.guidPrefix[9] = hdr_.guidPrefix[9];
2406  }
2407 #endif
2408 }
2409 
2410 void
2413 {
2414  DCPS::RcHandle<Spdp> outer = outer_.lock();
2415  if (!outer) return;
2416 
2417 #ifdef OPENDDS_SECURITY
2418  // Add the endpoint before any sending and receiving occurs.
2420  if (endpoint) {
2421  outer->ice_agent_->add_endpoint(endpoint);
2422  ice_endpoint_added_ = true;
2423  outer->ice_agent_->add_local_agent_info_listener(endpoint, outer->guid_, DCPS::static_rchandle_cast<ICE::AgentInfoListener>(outer));
2424  }
2425 #endif
2426 
2427  reactor(reactor_task->get_reactor());
2428  reactor_task->interceptor()->execute_or_enqueue(DCPS::make_rch<RegisterHandlers>(rchandle_from(this), reactor_task));
2429 
2430 #ifdef OPENDDS_SECURITY
2431  // Now that the endpoint is added, SEDP can write the SPDP info.
2432  if (outer->is_security_enabled()) {
2433  outer->write_secure_updates();
2434  }
2435 #endif
2436 
2437  local_send_task_ = DCPS::make_rch<SpdpMulti>(reactor_task->interceptor(), outer->config_->resend_period(), rchandle_from(this), &SpdpTransport::send_local);
2438 
2439  if (outer->config_->periodic_directed_spdp()) {
2441  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2443  }
2444 
2446  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2448 
2449 #ifdef OPENDDS_SECURITY
2451  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2454  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2456 
2458  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2461  DCPS::make_rch<SpdpSporadic>(TheServiceParticipant->time_source(), reactor_task->interceptor(),
2463 #endif
2464 
2465 #ifndef DDS_HAS_MINIMUM_BIT
2466  // internal thread bit reporting
2467  if (TheServiceParticipant->get_thread_status_manager().update_thread_status()) {
2468  thread_status_task_ = DCPS::make_rch<SpdpPeriodic>(reactor_task->interceptor(), ref(*this), &SpdpTransport::thread_status_task);
2469  }
2470 #endif /* DDS_HAS_MINIMUM_BIT */
2471 
2472  this->job_queue(job_queue);
2473  network_interface_address_reader_ = DCPS::make_rch<DCPS::InternalDataReader<DCPS::NetworkInterfaceAddress> >(DCPS::DataReaderQosBuilder().reliability_reliable().durability_transient_local(), rchandle_from(this));
2474  TheServiceParticipant->network_interface_address_topic()->connect(network_interface_address_reader_);
2475 }
2476 
2478 {
2479  if (DCPS::DCPS_debug_level > 3) {
2480  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::~SpdpTransport\n")));
2481  }
2482 
2483  DCPS::RcHandle<Spdp> outer = outer_.lock();
2484 
2485  if (outer) {
2486  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2487  try {
2489  } catch (const CORBA::BAD_PARAM&) {}
2490  outer->eh_shutdown_ = true;
2491  outer->shutdown_cond_.notify_all();
2492  }
2493 
2496 #ifdef ACE_HAS_IPV6
2497  unicast_ipv6_socket_.close();
2498  multicast_ipv6_socket_.close();
2499 #endif
2500 }
2501 
2503  ACE_Reactor* reactor, ACE_SOCK_Dgram& socket, const char* what)
2504 {
2505 #ifdef ACE_WIN32
2506  // By default Winsock will cause reads to fail with "connection reset"
2507  // when UDP sends result in ICMP "port unreachable" messages.
2508  // The transport framework is not set up for this since returning <= 0
2509  // from our receive_bytes causes the framework to close down the datalink
2510  // which in this case is used to receive from multiple peers.
2511  {
2512  BOOL recv_udp_connreset = FALSE;
2513  socket.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
2514  }
2515 #endif
2516 
2517  if (reactor->register_handler(socket.get_handle(),
2518  this, ACE_Event_Handler::READ_MASK) != 0) {
2519  throw std::runtime_error(
2520  (DCPS::String("failed to register ") + what + " unicast input handler").c_str());
2521  }
2522 }
2523 
2525 {
2526  DCPS::RcHandle<Spdp> outer = outer_.lock();
2527  if (!outer) {
2528  return;
2529  }
2530  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2531 
2532  if (outer->shutdown_flag_) {
2533  return;
2534  }
2535 
2536  ACE_Reactor* const reactor = reactor_task->get_reactor();
2537  register_unicast_socket(reactor, unicast_socket_, "IPV4");
2538 #ifdef ACE_HAS_IPV6
2539  register_unicast_socket(reactor, unicast_ipv6_socket_, "IPV6");
2540 #endif
2541 }
2542 
2543 void
2545 {
2546  if (local_send_task_) {
2548  }
2549 
2550 #ifdef OPENDDS_SECURITY
2551  DCPS::RcHandle<Spdp> outer = outer_.lock();
2552  if (!outer) return;
2553 
2554  relay_spdp_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2556 
2557  relay_stun_task_falloff_.set(outer->config_->sedp_heartbeat_period());
2559 #endif
2560 
2561 #ifndef DDS_HAS_MINIMUM_BIT
2562  const DCPS::ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
2563  if (thread_status_manager.update_thread_status()) {
2564  thread_status_task_->enable(false, thread_status_manager.thread_status_interval());
2565  }
2566 #endif /* DDS_HAS_MINIMUM_BIT */
2567 }
2568 
2569 void
2571 {
2572  DCPS::RcHandle<Spdp> outer = outer_.lock();
2573  if (!outer) return;
2574 
2575  // Send the dispose/unregister SPDP sample
2578  data_.inlineQos.length(1);
2579  static const StatusInfo_t dispose_unregister = { {0, 0, 0, 3} };
2580  data_.inlineQos[0].status_info(dispose_unregister);
2581 
2582  ParameterList plist(1);
2583  plist.length(1);
2584  plist[0].guid(outer->guid_);
2585  plist[0]._d(PID_PARTICIPANT_GUID);
2586 
2587  wbuff_.reset();
2588  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2590  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2591  if (DCPS::DCPS_debug_level > 0) {
2593  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::dispose_unregister() - ")
2594  ACE_TEXT("failed to serialize headers for dispose/unregister\n")));
2595  }
2596  return;
2597  }
2598 
2600 }
2601 
2602 void
2604 {
2605  if (DCPS::DCPS_debug_level > 3) {
2606  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) SpdpTransport::close\n")));
2607  }
2608 
2609  DCPS::RcHandle<Spdp> outer = outer_.lock();
2610  if (!outer) return;
2611 
2612  TheServiceParticipant->network_interface_address_topic()->disconnect(network_interface_address_reader_);
2613 
2614 #ifdef OPENDDS_SECURITY
2616  if (endpoint) {
2617  outer->ice_agent_->remove_endpoint(endpoint);
2618  ice_endpoint_added_ = false;
2619  }
2620 
2622  handshake_deadline_task_->cancel();
2623  }
2624  if (handshake_resend_task_) {
2625  handshake_resend_task_->cancel();
2626  }
2627  if (relay_spdp_task_) {
2628  relay_spdp_task_->cancel();
2629  }
2630  if (relay_stun_task_) {
2631  relay_stun_task_->cancel();
2632  }
2633 #endif
2634  if (local_send_task_) {
2635  local_send_task_->disable();
2636  }
2637  if (directed_send_task_) {
2638  directed_send_task_->cancel();
2639  }
2640  if (lease_expiration_task_) {
2641  lease_expiration_task_->cancel();
2642  }
2643  if (thread_status_task_) {
2644  thread_status_task_->disable();
2645  }
2646 
2647  ACE_Reactor* reactor = reactor_task->get_reactor();
2648  const ACE_Reactor_Mask mask =
2650  reactor->remove_handler(multicast_socket_.get_handle(), mask);
2651  reactor->remove_handler(unicast_socket_.get_handle(), mask);
2652 #ifdef ACE_HAS_IPV6
2653  reactor->remove_handler(multicast_ipv6_socket_.get_handle(), mask);
2654  reactor->remove_handler(unicast_ipv6_socket_.get_handle(), mask);
2655 #endif
2656 }
2657 
2658 void
2660 {
2661  DCPS::RcHandle<Spdp> outer = outer_.lock();
2662  if (!outer) return;
2663 
2664  if (local_send_task_) {
2665  const TimeDuration quick_resend = outer->config_->resend_period() * outer->quick_resend_ratio_;
2666  local_send_task_->enable(std::max(quick_resend, outer->min_resend_delay_));
2667  }
2668 }
2669 
2670 void
2672 {
2673  DCPS::RcHandle<Spdp> outer = outer_.lock();
2674  if (!outer) return;
2675 
2676  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
2677  write_i(flags);
2678 }
2679 
2680 void
2682 {
2683  DCPS::RcHandle<Spdp> outer = outer_.lock();
2684  if (!outer) return;
2685 
2686  if (!outer->config_->undirected_spdp()) {
2687  return;
2688  }
2689 
2690  const ParticipantData_t pdata = outer->build_local_pdata(
2691 #ifdef OPENDDS_SECURITY
2692  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2693 #endif
2694  );
2695 
2697  ++seq_;
2698 
2699  ParameterList plist;
2700  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2701  if (DCPS::DCPS_debug_level > 0) {
2702  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2703  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2704  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2705  ACE_TEXT("to ParameterList\n")));
2706  }
2707  return;
2708  }
2709 
2710 #ifdef OPENDDS_SECURITY
2711  if (!outer->is_security_enabled()) {
2712  ICE::AgentInfoMap ai_map;
2713  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2714  if (sedp_endpoint) {
2715  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2716  }
2718  if (spdp_endpoint) {
2719  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2720  }
2721 
2722  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2723  if (DCPS::DCPS_debug_level > 0) {
2724  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2725  ACE_TEXT("Spdp::SpdpTransport::write() - ")
2726  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2727  ACE_TEXT("to ParameterList\n")));
2728  }
2729  return;
2730  }
2731  }
2732 #endif
2733 
2734  wbuff_.reset();
2735  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2737  if (!(ser << hdr_) || !(ser << data_) || !(ser << encap) || !(ser << plist)) {
2738  if (DCPS::DCPS_debug_level > 0) {
2740  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write() - ")
2741  ACE_TEXT("failed to serialize headers for SPDP\n")));
2742  }
2743  return;
2744  }
2745 
2746  send(flags);
2747 }
2748 
2749 void
2751 {
2752  if (!iter->second.pdata_.participantProxy.opendds_rtps_relay_application_participant) {
2753  return;
2754  }
2755 
2756  if (new_participant) {
2757 #ifdef OPENDDS_SECURITY
2758  tport_->relay_spdp_task_->cancel();
2759  tport_->relay_spdp_task_falloff_.set(config_->sedp_heartbeat_period());
2760  tport_->relay_spdp_task_->schedule(TimeDuration::zero_value);
2761 #endif
2762  }
2763 
2764  if (DCPS::DCPS_debug_level) {
2766  ACE_TEXT("(%P|%t) Spdp::update_rtps_relay_application_participant - %C is an RtpsRelay application participant\n"),
2767  DCPS::LogGuid(iter->first).c_str()));
2768  }
2769 
2770  for (DiscoveredParticipantIter pos = participants_.begin(), limit = participants_.end(); pos != limit;) {
2771  if (pos != iter && pos->second.pdata_.participantProxy.opendds_rtps_relay_application_participant) {
2772  if (DCPS::DCPS_debug_level) {
2774  ACE_TEXT("(%P|%t) Spdp::update_rtps_relay_application_participant - removing previous RtpsRelay application participant %C\n"),
2775  DCPS::LogGuid(pos->first).c_str()));
2776  }
2778  participants_.erase(pos++);
2779  } else {
2780  ++pos;
2781  }
2782  }
2783 }
2784 
2785 void
2787 {
2788 #ifdef OPENDDS_SECURITY
2790 
2791  tport_->relay_spdp_task_->cancel();
2792  tport_->relay_spdp_task_falloff_.set(config_->sedp_heartbeat_period());
2793  tport_->relay_spdp_task_->schedule(TimeDuration::zero_value);
2794 
2795  tport_->relay_stun_task_->cancel();
2796  tport_->relay_stun_task_falloff_.set(config_->sedp_heartbeat_period());
2797  tport_->relay_stun_task_->schedule(TimeDuration::zero_value);
2798 #endif
2799 }
2800 
2801 void
2803 {
2804  {
2806  append(seq, tport_->transport_statistics_);
2807  tport_->transport_statistics_.clear();
2808  }
2809  sedp_->append_transport_statistics(seq);
2810 }
2811 
2812 void
2813 Spdp::SpdpTransport::write_i(const DCPS::GUID_t& guid, const ACE_INET_Addr& local_address, WriteFlags flags)
2814 {
2815  DCPS::RcHandle<Spdp> outer = outer_.lock();
2816  if (!outer) return;
2817 
2818  const ParticipantData_t pdata = outer->build_local_pdata(
2819 #ifdef OPENDDS_SECURITY
2820  true, outer->is_security_enabled() ? Security::DPDK_ENHANCED : Security::DPDK_ORIGINAL
2821 #endif
2822  );
2823 
2825  ++seq_;
2826 
2827  ParameterList plist;
2828  if (!ParameterListConverter::to_param_list(pdata, plist)) {
2829  if (DCPS::DCPS_debug_level > 0) {
2830  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2831  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2832  ACE_TEXT("failed to convert from SPDPdiscoveredParticipantData ")
2833  ACE_TEXT("to ParameterList\n")));
2834  }
2835  return;
2836  }
2837 
2838 #ifdef OPENDDS_SECURITY
2839  if (!outer->is_security_enabled()) {
2840  ICE::AgentInfoMap ai_map;
2841  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = outer->sedp_->get_ice_endpoint();
2842  if (sedp_endpoint) {
2843  ai_map[SEDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(sedp_endpoint);
2844  }
2846  if (spdp_endpoint) {
2847  ai_map[SPDP_AGENT_INFO_KEY] = outer->ice_agent_->get_local_agent_info(spdp_endpoint);
2848  }
2849 
2850  if (!ParameterListConverter::to_param_list(ai_map, plist)) {
2851  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
2852  ACE_TEXT("Spdp::SpdpTransport::write_i() - ")
2853  ACE_TEXT("failed to convert from ICE::AgentInfo ")
2854  ACE_TEXT("to ParameterList\n")));
2855  return;
2856  }
2857  }
2858 #endif
2859 
2860  InfoDestinationSubmessage info_dst;
2861  info_dst.smHeader.submessageId = INFO_DST;
2862  info_dst.smHeader.flags = FLAG_E;
2863  info_dst.smHeader.submessageLength = sizeof(guid.guidPrefix);
2864  DCPS::assign(info_dst.guidPrefix, guid.guidPrefix);
2865 
2866  wbuff_.reset();
2867  DCPS::Serializer ser(&wbuff_, encoding_plain_native);
2869  if (!(ser << hdr_) || !(ser << info_dst) || !(ser << data_) || !(ser << encap)
2870  || !(ser << plist)) {
2871  if (DCPS::DCPS_debug_level > 0) {
2873  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::write_i() - ")
2874  ACE_TEXT("failed to serialize headers for SPDP\n")));
2875  }
2876  return;
2877  }
2878 
2879  send(flags, local_address);
2880 }
2881 
2882 void
2884 {
2885  DCPS::RcHandle<Spdp> outer = outer_.lock();
2886  if (!outer) return;
2887 
2888  if ((flags & SEND_MULTICAST) && !outer->config_->rtps_relay_only()) {
2889  typedef OPENDDS_SET(ACE_INET_Addr)::const_iterator iter_t;
2890  for (iter_t iter = send_addrs_.begin(); iter != send_addrs_.end(); ++iter) {
2891  send(*iter, false);
2892  }
2893  }
2894 
2895  if (((flags & SEND_DIRECT) && !outer->config_->rtps_relay_only()) &&
2896  local_address != ACE_INET_Addr()) {
2897  send(local_address, false);
2898  }
2899 
2900  if ((flags & SEND_RELAY) || outer->config_->rtps_relay_only()) {
2901  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
2902  if (relay_address != ACE_INET_Addr()) {
2903  send(relay_address, true);
2904  }
2905  }
2906 }
2907 
2908 const ACE_SOCK_Dgram&
2910 {
2911 #ifdef ACE_HAS_IPV6
2912  if (addr.get_type() == AF_INET6) {
2913  return unicast_ipv6_socket_;
2914  }
2915 #endif
2916  ACE_UNUSED_ARG(addr);
2917  return unicast_socket_;
2918 }
2919 
2920 ssize_t
2922 {
2923  DCPS::RcHandle<Spdp> outer = outer_.lock();
2924  if (!outer) return -1;
2925 
2926 #ifdef OPENDDS_TESTING_FEATURES
2927  if (outer->sedp_->transport_inst()->should_drop(wbuff_.length())) {
2928  return wbuff_.length();
2929  }
2930 #endif
2931 
2932  const ACE_SOCK_Dgram& socket = choose_send_socket(addr);
2933  const ssize_t res = socket.send(wbuff_.rd_ptr(), wbuff_.length(), addr);
2934  if (outer->sedp_->transport_inst()->count_messages()) {
2936  }
2937  if (res < 0) {
2938  if (outer->sedp_->transport_inst()->count_messages()) {
2941  }
2942  const int err = errno;
2943  if (err != ENETUNREACH || !network_is_unreachable_) {
2944  errno = err;
2945  if (DCPS::DCPS_debug_level > 0) {
2947  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::send() - ")
2948  ACE_TEXT("destination %C failed send: %m\n"), DCPS::LogAddr(addr).c_str()));
2949  }
2950  }
2951  if (err == ENETUNREACH) {
2952  network_is_unreachable_ = true;
2953  }
2954  } else {
2955  if (outer->sedp_->transport_inst()->count_messages()) {
2958  }
2959  network_is_unreachable_ = false;
2960  }
2961 
2962  return res;
2963 }
2964 
2965 const ACE_SOCK_Dgram&
2967 {
2968 #ifdef ACE_HAS_IPV6
2969  if (h == unicast_ipv6_socket_.get_handle()) {
2970  return unicast_ipv6_socket_;
2971  }
2972  if (h == multicast_ipv6_socket_.get_handle()) {
2973  return multicast_ipv6_socket_;
2974  }
2975 #endif
2976  if (h == multicast_socket_.get_handle()) {
2977  return multicast_socket_;
2978  }
2979 
2980  return unicast_socket_;
2981 }
2982 
2984 {
2985  return a.get_size() <= a.get_addr_size();
2986 }
2987 
2988 int
2990 {
2991  DCPS::ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2992 
2994  ACE_INET_Addr remote;
2995  buff_.reset();
2996 
2997 #ifdef ACE_LACKS_SENDMSG
2998  const ssize_t bytes = socket.recv(buff_.wr_ptr(), buff_.space(), remote);
2999 #else
3000  ACE_INET_Addr local;
3001 
3002  iovec iov[1];
3003  iov[0].iov_base = buff_.wr_ptr();
3004 #ifdef _MSC_VER
3005 #pragma warning(push)
3006  // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
3007  // since on other platforms iov_len is 64-bit
3008 #pragma warning(disable : 4267)
3009 #endif
3010  iov[0].iov_len = buff_.space();
3011 #ifdef _MSC_VER
3012 #pragma warning(pop)
3013 #endif
3014  const ssize_t bytes = socket.recv(iov, 1, remote, 0
3015 #if defined(ACE_RECVPKTINFO) || defined(ACE_RECVPKTINFO6)
3016  , &local
3017 #endif
3018  );
3019 #endif
3020 
3021  if (!valid_size(remote)) {
3022  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - invalid address size\n")));
3023  return 0;
3024  }
3025 
3026  if (bytes > 0) {
3027  buff_.wr_ptr(bytes);
3028  } else if (bytes == 0) {
3029  return 0;
3030  } else {
3031  ACE_DEBUG((
3032  LM_WARNING,
3033  ACE_TEXT("(%P|%t) WARNING: Spdp::SpdpTransport::handle_input() - ")
3034  ACE_TEXT("error reading from %C socket %p\n")
3035  , (h == unicast_socket_.get_handle()) ? "unicast" : "multicast",
3036  ACE_TEXT("ACE_SOCK_Dgram::recv")));
3037  return 0;
3038  }
3039 
3040  DCPS::RcHandle<Spdp> outer = outer_.lock();
3041 
3042  if (!outer) {
3043  return 0;
3044  }
3045 
3046  const bool relay_in_use = (outer->config_->rtps_relay_only() || outer->config_->use_rtps_relay());
3047  const bool remote_matches_relay_addr = (remote == outer->config_->spdp_rtps_relay_address());
3048  const bool from_relay = relay_in_use && remote_matches_relay_addr;
3049 
3050  // Ignore messages from the relay when not using it.
3051  if (!relay_in_use && remote_matches_relay_addr) {
3052  return 0;
3053  }
3054 
3055  if ((buff_.size() >= 4) && ACE_OS::memcmp(buff_.rd_ptr(), "RTPS", 4) == 0) {
3056  RTPS::Message message;
3057 
3058  DCPS::Serializer ser(&buff_, encoding_plain_native);
3059  Header header;
3060  if (!(ser >> header)) {
3061  if (DCPS::DCPS_debug_level > 0) {
3063  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3064  ACE_TEXT("failed to deserialize RTPS header for SPDP\n")));
3065  }
3066  return 0;
3067  }
3068 
3069  if (outer->sedp_->transport_inst()->count_messages()) {
3071  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3072  transport_statistics_.message_count[key].recv(bytes);
3073  }
3074 
3075  if (DCPS::transport_debug.log_messages) {
3076  message.hdr = header;
3077  }
3078 
3079  while (buff_.length() > 3) {
3080  const char subm = buff_.rd_ptr()[0], flags = buff_.rd_ptr()[1];
3081  ser.swap_bytes((flags & FLAG_E) != ACE_CDR_BYTE_ORDER);
3082  const size_t start = buff_.length();
3083  CORBA::UShort submessageLength = 0;
3084  switch (subm) {
3085  case DATA: {
3086  DataSubmessage data;
3087  if (!(ser >> data)) {
3088  if (DCPS::DCPS_debug_level > 0) {
3090  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3091  ACE_TEXT("failed to deserialize DATA header for SPDP\n")));
3092  }
3093  return 0;
3094  }
3095  submessageLength = data.smHeader.submessageLength;
3096 
3097  if (DCPS::transport_debug.log_messages) {
3098  append_submessage(message, data);
3099  }
3100 
3102  // Not our message: this could be the same multicast group used
3103  // for SEDP and other traffic.
3104  break;
3105  }
3106 
3107  ParameterList plist;
3108  if (data.smHeader.flags & (FLAG_D | FLAG_K_IN_DATA)) {
3110  DCPS::Encoding enc;
3111  if (!(ser >> encap) || !encap.to_encoding(enc, DCPS::MUTABLE) || enc.kind() != Encoding::KIND_XCDR1) {
3112  if (DCPS::DCPS_debug_level > 0) {
3114  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3115  ACE_TEXT("failed to deserialize encapsulation header for SPDP\n")));
3116  }
3117  return 0;
3118  }
3119  ser.encoding(enc);
3120  if (!(ser >> plist)) {
3121 
3122  if (DCPS::DCPS_debug_level > 0) {
3124  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3125  ACE_TEXT("failed to deserialize data payload for SPDP\n")));
3126  }
3127  return 0;
3128  }
3129  } else {
3130  plist.length(1);
3132  plist[0].guid(guid);
3133  plist[0]._d(PID_PARTICIPANT_GUID);
3134  }
3135 
3136  DCPS::RcHandle<Spdp> outer = outer_.lock();
3137  if (outer) {
3138  outer->data_received(data, plist, remote);
3139  }
3140  break;
3141  }
3142  case INFO_DST: {
3143  if (DCPS::transport_debug.log_messages) {
3145  if (!(ser >> sm)) {
3146  if (DCPS::DCPS_debug_level > 0) {
3148  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3149  ACE_TEXT("failed to deserialize INFO_DST header for SPDP\n")));
3150  }
3151  return 0;
3152  }
3153  submessageLength = sm.smHeader.submessageLength;
3154  append_submessage(message, sm);
3155  break;
3156  }
3157  }
3158  // fallthrough
3159  default:
3160  SubmessageHeader smHeader;
3161  if (!(ser >> smHeader)) {
3162  if (DCPS::DCPS_debug_level > 0) {
3164  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3165  ACE_TEXT("failed to deserialize SubmessageHeader for SPDP\n")));
3166  }
3167  return 0;
3168  }
3169  submessageLength = smHeader.submessageLength;
3170  break;
3171  }
3172  if (submessageLength && buff_.length()) {
3173  const size_t read = start - buff_.length();
3174  if (read < static_cast<size_t>(submessageLength + SMHDR_SZ)) {
3175  if (!ser.skip(static_cast<CORBA::UShort>(submessageLength + SMHDR_SZ
3176  - read))) {
3177  if (DCPS::DCPS_debug_level > 0) {
3179  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() - ")
3180  ACE_TEXT("failed to skip sub message length\n")));
3181  }
3182  return 0;
3183  }
3184  }
3185  } else if (!submessageLength) {
3186  break; // submessageLength of 0 indicates the last submessage
3187  }
3188  }
3189 
3190  } else if ((buff_.size() >= 4) && (ACE_OS::memcmp(buff_.rd_ptr(), "RTPX", 4) == 0)) {
3191  // Handle some RTI protocol multicast to the same address
3192  return 0; // Ignore
3193  }
3194 
3195 #ifdef OPENDDS_SECURITY
3196  // Assume STUN
3197  if (!outer->initialized() || outer->shutting_down()) {
3198  return 0;
3199  }
3200 
3201 #ifndef ACE_RECVPKTINFO
3202  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::handle_input() ")
3203  ACE_TEXT("potential STUN message received but this version of the ACE ")
3204  ACE_TEXT("library doesn't support the local_address extension in ")
3205  ACE_TEXT("ACE_SOCK_Dgram::recv\n")));
3206  ACE_NOTSUP_RETURN(0);
3207 #else
3208 
3209  DCPS::Serializer serializer(&buff_, STUN::encoding);
3210  STUN::Message message;
3211  message.block = &buff_;
3212  if (serializer >> message) {
3213  if (outer->sedp_->transport_inst()->count_messages()) {
3215  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, outer->lock_, -1);
3216  transport_statistics_.message_count[key].recv(bytes);
3217  }
3218 
3219  if (relay_srsm_.is_response(message)) {
3221  } else {
3223  if (endpoint) {
3224  outer->ice_agent_->receive(endpoint, local, remote, message);
3225  }
3226  }
3227  }
3228 #endif
3229 #endif
3230 
3231  return 0;
3232 }
3233 
3236 {
3237 #ifdef OPENDDS_SECURITY
3238  DCPS::RcHandle<Spdp> outer = outer_.lock();
3239  return outer && outer->config_->use_ice() ? DCPS::static_rchandle_cast<ICE::Endpoint>(rchandle_from(this)) : DCPS::WeakRcHandle<ICE::Endpoint>();
3240 #else
3242 #endif
3243 }
3244 
3245 #ifdef OPENDDS_SECURITY
3246 ICE::AddressListType
3248 {
3249  ICE::AddressListType addresses;
3250  ACE_INET_Addr addr;
3251 
3253  if (addr != ACE_INET_Addr()) {
3254  if (addr.is_any()) {
3255  ICE::AddressListType addrs;
3257  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3258  if (pos->get_type() == AF_INET) {
3259  pos->set_port_number(addr.get_port_number());
3260  addresses.push_back(*pos);
3261  }
3262  }
3263  } else {
3264  addresses.push_back(addr);
3265  }
3266  }
3267 
3268 #ifdef ACE_HAS_IPV6
3269  unicast_ipv6_socket_.get_local_addr(addr);
3270  if (addr != ACE_INET_Addr()) {
3271  if (addr.is_any()) {
3272  ICE::AddressListType addrs;
3274  for (ICE::AddressListType::iterator pos = addrs.begin(), limit = addrs.end(); pos != limit; ++pos) {
3275  if (pos->get_type() == AF_INET6) {
3276  pos->set_port_number(addr.get_port_number());
3277  addresses.push_back(*pos);
3278  }
3279  }
3280  } else {
3281  addresses.push_back(addr);
3282  }
3283  }
3284 #endif
3285 
3286  return addresses;
3287 }
3288 
3289 void
3291 {
3292  DCPS::RcHandle<Spdp> outer = outer_.lock();
3293  if (!outer) return;
3294 
3295  DCPS::RcHandle<DCPS::JobQueue> job_queue = outer->sedp_->job_queue();
3296  if (job_queue) {
3297  job_queue->enqueue(DCPS::make_rch<SendStun>(rchandle_from(this), address, message));
3298  }
3299 }
3300 
3301 void
3303 {
3304  DCPS::RcHandle<SpdpTransport> tport = tport_.lock();
3305  if (!tport) return;
3306 
3307  DCPS::RcHandle<Spdp> outer = tport->outer_.lock();
3308  if (!outer) return;
3309 
3310  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
3311  tport->wbuff_.reset();
3312  Serializer serializer(&tport->wbuff_, STUN::encoding);
3313  const_cast<STUN::Message&>(message_).block = &tport->wbuff_;
3314  serializer << message_;
3315 
3316 #ifdef OPENDDS_TESTING_FEATURES
3317  if (outer->sedp_->transport_inst()->should_drop(tport->wbuff_.length())) {
3318  return;
3319  }
3320 #endif
3321 
3322  const ACE_SOCK_Dgram& socket = tport->choose_send_socket(address_);
3323  const ssize_t res = socket.send(tport->wbuff_.rd_ptr(), tport->wbuff_.length(), address_);
3324  if (res < 0) {
3325  if (outer->sedp_->transport_inst()->count_messages()) {
3326  // Have the lock.
3327  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(address_), DCPS::MCK_STUN, address_ == outer->config_->spdp_stun_server_address());
3328  tport->transport_statistics_.message_count[key].send_fail(tport->wbuff_.length());
3329  }
3330  const int err = errno;
3331  if (err != ENETUNREACH || !tport->network_is_unreachable_) {
3332  errno = err;
3333  if (DCPS::DCPS_debug_level > 0) {
3335  ACE_TEXT("(%P|%t) WARNING: Spdp::SendStun::execute() - ")
3336  ACE_TEXT("destination %C failed send: %m\n"), DCPS::LogAddr(address_).c_str()));
3337  }
3338  }
3339  if (err == ENETUNREACH) {
3340  tport->network_is_unreachable_ = true;
3341  }
3342  } else {
3343  if (outer->sedp_->transport_inst()->count_messages()) {
3344  // Have the lock.
3345  const DCPS::InternalMessageCountKey key(DCPS::NetworkAddress(address_), DCPS::MCK_STUN, address_ == outer->config_->spdp_stun_server_address());
3346  tport->transport_statistics_.message_count[key].send(tport->wbuff_.length());
3347  }
3348  tport->network_is_unreachable_ = false;
3349  }
3350 }
3351 
3354 {
3355  DCPS::RcHandle<Spdp> outer = outer_.lock();
3356  return outer ? outer->config_->spdp_stun_server_address() : ACE_INET_Addr();
3357 }
3358 
3359 #ifndef DDS_HAS_MINIMUM_BIT
3360 void
3362 {
3363  DCPS::RcHandle<Spdp> outer = outer_.lock();
3364  if (!outer) return;
3365 
3366  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, true));
3367 }
3368 
3369 void
3371 {
3372  ACE_GUARD(ACE_Thread_Mutex, g, spdp_->lock_);
3373  for (ICE::GuidSetType::const_iterator pos = guids_.begin(), limit = guids_.end(); pos != limit; ++pos) {
3374  DiscoveredParticipantIter iter = spdp_->participants_.find(pos->remote);
3375  if (iter != spdp_->participants_.end()) {
3376  spdp_->enqueue_location_update_i(iter, compute_ice_location_mask(addr_), connect_ ? addr_ : ACE_INET_Addr(), "ICE connect");
3377  spdp_->process_location_updates_i(iter, "ICE connect");
3378  }
3379  }
3380 }
3381 
3382 void
3384 {
3385  DCPS::RcHandle<Spdp> outer = outer_.lock();
3386  if (!outer) return;
3387 
3388  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<IceConnect>(outer, guids, addr, false));
3389 }
3390 #endif /* DDS_HAS_MINIMUM_BIT */
3391 #endif /* OPENDDS_SECURITY */
3392 
3393 void
3395 {
3396  sedp_->signal_liveliness(kind);
3397 }
3398 
3399 bool
3401  u_short participant_id)
3402 {
3403  DCPS::RcHandle<Spdp> outer = outer_.lock();
3404  if (!outer) {
3405  throw std::runtime_error("couldn't get Spdp");
3406  }
3407 
3408  ACE_INET_Addr local_addr = outer->config_->spdp_local_address();
3409  const bool fixed_port = local_addr.get_port_number();
3410 
3411  if (fixed_port) {
3412  uni_port_ = local_addr.get_port_number();
3413  } else if (!outer->config_->spdp_request_random_port()) {
3414  const ACE_UINT32 port = static_cast<ACE_UINT32>(port_common) + outer->config_->d1() +
3415  outer->config_->pg() * participant_id;
3416  if (port > 65535) {
3417  if (log_level >= LogLevel::Error) {
3418  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: "
3419  "port %u is too high\n", port));
3420  }
3421  throw std::runtime_error("failed to open unicast port for SPDP (port too high)");
3422  }
3423  uni_port_ = static_cast<unsigned short>(port);
3424  local_addr.set_port_number(uni_port_);
3425  }
3426 
3427  if (unicast_socket_.open(local_addr, PF_INET) != 0) {
3428  if (fixed_port) {
3429  if (log_level >= LogLevel::Error) {
3430  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: "
3431  "failed to open %C %p.\n",
3432  LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3433  }
3434  throw std::runtime_error("failed to open unicast port for SPDP");
3435  }
3436  if (DCPS::DCPS_debug_level > 3) {
3438  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3439  ACE_TEXT("failed to open %C %p. ")
3440  ACE_TEXT("Trying next participantId...\n"),
3441  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3442  }
3443  return false;
3444  }
3445 
3446  if (!fixed_port && outer->config_->spdp_request_random_port()) {
3447  ACE_INET_Addr addr;
3448  if (unicast_socket_.get_local_addr(addr) == 0) {
3449  uni_port_ = addr.get_port_number();
3450  }
3451  }
3452 
3453  if (DCPS::DCPS_debug_level > 3) {
3454  ACE_DEBUG((LM_INFO,
3455  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_socket() - ")
3456  ACE_TEXT("opened unicast socket on port %d\n"),
3457  uni_port_));
3458  }
3459 
3460  if (!DCPS::set_socket_multicast_ttl(unicast_socket_, outer->config_->ttl())) {
3461  if (DCPS::DCPS_debug_level > 0) {
3463  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - ")
3464  ACE_TEXT("failed to set TTL value to %d ")
3465  ACE_TEXT("for port:%hu %p\n"),
3466  outer->config_->ttl(), uni_port_, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
3467  }
3468  throw std::runtime_error("failed to set TTL");
3469  }
3470 
3471  const int send_buffer_size = outer->config()->send_buffer_size();
3472  if (send_buffer_size > 0) {
3474  SO_SNDBUF,
3475  (void *) &send_buffer_size,
3476  sizeof(send_buffer_size)) < 0
3477  && errno != ENOTSUP) {
3478  if (DCPS::DCPS_debug_level > 0) {
3479  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the send buffer size to %d errno %m\n"), send_buffer_size));
3480  }
3481  throw std::runtime_error("failed to set send buffer size");
3482  }
3483  }
3484 
3485  const int recv_buffer_size = outer->config()->recv_buffer_size();
3486  if (recv_buffer_size > 0) {
3488  SO_RCVBUF,
3489  (void *) &recv_buffer_size,
3490  sizeof(recv_buffer_size)) < 0
3491  && errno != ENOTSUP) {
3492  if (DCPS::DCPS_debug_level > 0) {
3493  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket() - failed to set the recv buffer size to %d errno %m\n"), recv_buffer_size));
3494  }
3495  throw std::runtime_error("failed to set recv buffer size");
3496  }
3497  }
3498 
3499 #ifdef ACE_RECVPKTINFO
3500  int sockopt = 1;
3501  if (unicast_socket_.set_option(IPPROTO_IP, ACE_RECVPKTINFO, &sockopt, sizeof sockopt) == -1) {
3502  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_socket: set_option: %m\n")), false);
3503  }
3504 #endif
3505 
3506  return true;
3507 }
3508 
3509 #ifdef ACE_HAS_IPV6
3510 bool
3511 Spdp::SpdpTransport::open_unicast_ipv6_socket(u_short port)
3512 {
3513  DCPS::RcHandle<Spdp> outer = outer_.lock();
3514  if (!outer) return false;
3515 
3516  ACE_INET_Addr local_addr = outer->config_->ipv6_spdp_local_address();
3517  const bool fixed_port = local_addr.get_port_number();
3518 
3519  if (fixed_port) {
3520  ipv6_uni_port_ = local_addr.get_port_number();
3521  } else {
3522  ipv6_uni_port_ = port;
3523  local_addr.set_port_number(ipv6_uni_port_);
3524  }
3525 
3526  if (unicast_ipv6_socket_.open(local_addr, PF_INET6) != 0) {
3527  if (fixed_port) {
3528  if (DCPS::DCPS_debug_level > 0) {
3530  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_ipv6_socket() - ")
3531  ACE_TEXT("failed to open %C %p.\n"),
3532  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3533  }
3534  throw std::runtime_error("failed to open ipv6 unicast port for SPDP");
3535  }
3536  if (DCPS::DCPS_debug_level > 3) {
3538  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_ipv6_socket() - ")
3539  ACE_TEXT("failed to open %C %p. ")
3540  ACE_TEXT("Trying next port...\n"),
3541  DCPS::LogAddr(local_addr).c_str(), ACE_TEXT("ACE_SOCK_Dgram::open")));
3542  }
3543  return false;
3544  }
3545 
3546  if (DCPS::DCPS_debug_level > 3) {
3547  ACE_DEBUG((LM_INFO,
3548  ACE_TEXT("(%P|%t) Spdp::SpdpTransport::open_unicast_ipv6_socket() - ")
3549  ACE_TEXT("opened unicast ipv6 socket on port %d\n"),
3550  ipv6_uni_port_));
3551  }
3552 
3553  if (!DCPS::set_socket_multicast_ttl(unicast_ipv6_socket_, outer->config_->ttl())) {
3554  if (DCPS::DCPS_debug_level > 0) {
3556  ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_ipv6_socket() - ")
3557  ACE_TEXT("failed to set TTL value to %d ")
3558  ACE_TEXT("for port:%hu %p\n"),
3559  outer->config_->ttl(), ipv6_uni_port_, ACE_TEXT("DCPS::set_socket_multicast_ttl:")));
3560  }
3561  throw std::runtime_error("failed to set TTL");
3562  }
3563 
3564  const int send_buffer_size = outer->config()->send_buffer_size();
3565  if (send_buffer_size > 0) {
3566  if (unicast_ipv6_socket_.set_option(SOL_SOCKET,
3567  SO_SNDBUF,
3568  (void *) &send_buffer_size,
3569  sizeof(send_buffer_size)) < 0
3570  && errno != ENOTSUP) {
3571  if (DCPS::DCPS_debug_level > 0) {
3572  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_ipv6_socket() - failed to set the send buffer size to %d errno %m\n"), send_buffer_size));
3573  }
3574  throw std::runtime_error("failed to set send buffer size");
3575  }
3576  }
3577 
3578  const int recv_buffer_size = outer->config()->recv_buffer_size();
3579  if (recv_buffer_size > 0) {
3580  if (unicast_ipv6_socket_.set_option(SOL_SOCKET,
3581  SO_RCVBUF,
3582  (void *) &recv_buffer_size,
3583  sizeof(recv_buffer_size)) < 0
3584  && errno != ENOTSUP) {
3585  if (DCPS::DCPS_debug_level > 0) {
3586  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_ipv6_socket() - failed to set the recv buffer size to %d errno %m\n"), recv_buffer_size));
3587  }
3588  throw std::runtime_error("failed to set recv buffer size");
3589  }
3590  }
3591 
3592 #ifdef ACE_RECVPKTINFO6
3593  int sockopt = 1;
3594  if (unicast_ipv6_socket_.set_option(IPPROTO_IPV6, ACE_RECVPKTINFO6, &sockopt, sizeof sockopt) == -1) {
3595  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::SpdpTransport::open_unicast_ipv6_socket: set_option: %m\n")), false);
3596  }
3597 #endif
3598 
3599  return true;
3600 }
3601 #endif /* ACE_HAS_IPV6 */
3602 
3604 {
3605  DCPS::RcHandle<Spdp> outer = outer_.lock();
3606  if (!outer) return;
3607 
3608  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
3609  if (outer->shutting_down()) {
3610  return;
3611  }
3612 
3613  if (outer->shutdown_flag_) {
3614  return;
3615  }
3616 
3618  DCPS::InternalSampleInfoSequence infos;
3619 
3621 
3622  if (multicast_manager_.process(samples,
3623  infos,
3625  reactor(),
3626  this,
3629 #ifdef ACE_HAS_IPV6
3630  , DCPS::NetworkAddress(multicast_ipv6_address_),
3631  multicast_ipv6_socket_
3632 #endif
3633  )) {
3635  }
3636 }
3637 
3638 bool
3640  bool& inlineQos)
3641 {
3642  DiscoveredParticipantIter part_iter = participants_.find(part_id);
3643  if (part_iter == participants_.end()) {
3644  return false;
3645  } else {
3646  inlineQos = part_iter->second.pdata_.participantProxy.expectsInlineQos;
3647  DCPS::LocatorSeq& mc_source =
3648  part_iter->second.pdata_.participantProxy.defaultMulticastLocatorList;
3649  DCPS::LocatorSeq& uc_source =
3650  part_iter->second.pdata_.participantProxy.defaultUnicastLocatorList;
3651  CORBA::ULong mc_source_len = mc_source.length();
3652  CORBA::ULong uc_source_len = uc_source.length();
3653  CORBA::ULong target_len = target.length();
3654  target.length(mc_source_len + uc_source_len + target_len);
3655  // Copy multicast
3656  for (CORBA::ULong mci = 0; mci < mc_source.length(); ++mci) {
3657  target[target_len + mci] = mc_source[mci];
3658  }
3659  // Copy unicast
3660  for (CORBA::ULong uci = 0; uci < uc_source.length(); ++uci) {
3661  target[target_len + mc_source_len + uci] = uc_source[uci];
3662  }
3663  }
3664  return true;
3665 }
3666 
3667 bool
3669  bool& inlineQos)
3670 {
3671  DiscoveredParticipantIter pos = participants_.find(part_id);
3672  if (pos != participants_.end() && pos->second.last_recv_address_ != ACE_INET_Addr()) {
3673  inlineQos = pos->second.pdata_.participantProxy.expectsInlineQos;
3674  target.length(1);
3675  DCPS::address_to_locator(target[0], pos->second.last_recv_address_);
3676  return true;
3677  }
3678  return false;
3679 }
3680 
3681 bool
3683 {
3684  return !participants_.empty();
3685 }
3686 
3687 bool
3688 Spdp::has_discovered_participant(const DCPS::GUID_t& guid) const
3689 {
3690  return participants_.find(guid) != participants_.end();
3691 }
3692 
3694 {
3695  const DiscoveredParticipantMap::const_iterator iter = participants_.find(guid);
3696  if (iter == participants_.end()) {
3697  return PFLAGS_EMPTY;
3698  }
3699  return is_opendds(iter->second.pdata_.participantProxy)
3700  ? iter->second.pdata_.participantProxy.opendds_participant_flags.bits : PFLAGS_EMPTY;
3701 }
3702 
3703 void
3705 {
3706  for (std::pair<TimeQueue::iterator, TimeQueue::iterator> x = lease_expirations_.equal_range(iter->second.lease_expiration_);
3707  x.first != x.second; ++x.first) {
3708  if (x.first->second == iter->first) {
3709  lease_expirations_.erase(x.first);
3710  break;
3711  }
3712  }
3713 }
3714 
3715 void
3717  const DCPS::MonotonicTimePoint& now)
3718 {
3720 
3721  // Compute new expiration.
3722  const DCPS::TimeDuration d =
3723  rtps_duration_to_time_duration(iter->second.pdata_.leaseDuration,
3724  iter->second.pdata_.participantProxy.protocolVersion,
3725  iter->second.pdata_.participantProxy.vendorId);
3726 
3727  iter->second.lease_expiration_ = now + d + lease_extension_;
3728 
3729  // Insert.
3730  const bool cancel = !lease_expirations_.empty() && iter->second.lease_expiration_ < lease_expirations_.begin()->first;
3731  const bool schedule = lease_expirations_.empty() || iter->second.lease_expiration_ < lease_expirations_.begin()->first;
3732 
3733  lease_expirations_.insert(std::make_pair(iter->second.lease_expiration_, iter->first));
3734 
3735  if (cancel) {
3736  tport_->lease_expiration_task_->cancel();
3737  }
3738  if (schedule) {
3739  tport_->lease_expiration_task_->schedule(d);
3740  }
3741 }
3742 
3743 void
3745 {
3747 
3748  for (TimeQueue::iterator pos = lease_expirations_.begin(), limit = lease_expirations_.end();
3749  pos != limit && pos->first <= now;) {
3750  DiscoveredParticipantIter part = participants_.find(pos->second);
3751  // Pre-emptively erase so purge_discovered_participant will not modify lease_expirations_.
3752  lease_expirations_.erase(pos++);
3753 
3754  if (part == participants_.end()) {
3755  continue;
3756  }
3757 
3758  if (DCPS::DCPS_debug_level) {
3760  ACE_TEXT("(%P|%t) Spdp::process_lease_expirations() - ")
3761  ACE_TEXT("participant %C exceeded lease duration, removing\n"),
3762  DCPS::LogGuid(part->first).c_str()));
3763  }
3764 
3765 #ifdef OPENDDS_SECURITY
3766  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
3767  if (sedp_endpoint) {
3768  stop_ice(sedp_endpoint, part->first, part->second.pdata_.participantProxy.availableBuiltinEndpoints,
3769  part->second.pdata_.participantProxy.availableExtendedBuiltinEndpoints);
3770  }
3771  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = tport_->get_ice_endpoint();
3772  if (spdp_endpoint) {
3773  ice_agent_->stop_ice(spdp_endpoint, guid_, part->first);
3774  }
3776 #endif
3778  participants_.erase(part);
3779  }
3780 
3781  if (!lease_expirations_.empty()) {
3782  tport_->lease_expiration_task_->schedule(lease_expirations_.begin()->first - now);
3783  }
3784 }
3785 
3786 #ifdef OPENDDS_SECURITY
3788 Spdp::lookup_participant_crypto_info(const DCPS::GUID_t& id) const
3789 {
3790  ParticipantCryptoInfoPair result = ParticipantCryptoInfoPair(DDS::HANDLE_NIL, DDS::Security::SharedSecretHandle_var());
3791 
3793  if (pi != participants_.end()) {
3794  result.first = sedp_->get_handle_registry()->get_remote_participant_crypto_handle(id);
3795  result.second = pi->second.shared_secret_handle_;
3796  }
3797  return result;
3798 }
3799 
3800 void
3802 {
3803  const DCPS::GUID_t peer = make_id(id, ENTITYID_PARTICIPANT);
3804  const DiscoveredParticipantIter iter = participants_.find(peer);
3805  if (iter == participants_.end()) {
3806  if (DCPS::DCPS_debug_level > 0) {
3807  const DCPS::LogGuid logger(peer);
3808  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_participant_crypto_tokens() - ")
3809  ACE_TEXT("Discovered participant %C not found.\n"),
3810  logger.c_str()));
3811  }
3812  return;
3813  }
3814 
3815  const DDS::Security::ParticipantCryptoTokenSeq& pcts = iter->second.crypto_tokens_;
3816 
3817  if (pcts.length() != 0) {
3819 
3820  const DCPS::GUID_t reader = make_id(peer, ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER);
3821 
3823  msg.message_identity.source_guid = writer;
3826  msg.destination_participant_guid = peer;
3827  msg.destination_endpoint_guid = GUID_UNKNOWN; // unknown = whole participant
3829  msg.message_data = reinterpret_cast<const DDS::Security::DataHolderSeq&>(pcts);
3830 
3831  if (sedp_->write_volatile_message(msg, reader) != DDS::RETCODE_OK) {
3832  if (DCPS::DCPS_debug_level > 0) {
3833  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::send_participant_crypto_tokens() - ")
3834  ACE_TEXT("Unable to write volatile message.\n")));
3835  }
3836  }
3837  }
3838 
3839  iter->second.participant_tokens_sent_ = true;
3840 }
3841 
3843 Spdp::lookup_participant_permissions(const DCPS::GUID_t& id) const
3844 {
3846 
3849  if (pi != participants_.end()) {
3850  result = pi->second.permissions_handle_;
3851  }
3852  return result;
3853 }
3854 
3856 {
3859  if (pi != participants_.end()) {
3860  return pi->second.auth_state_;
3861  }
3862  return AUTH_STATE_HANDSHAKE;
3863 }
3864 #endif
3865 
3866 #ifdef OPENDDS_SECURITY
3869  const ICE::AgentInfo& agent_info)
3870 {
3871  GUID_t l = guid_;
3872 
3873  // See RTPS v2.1 section 8.5.5.1
3877  ice_agent_->start_ice(endpoint, l, r, agent_info);
3878  }
3882  ice_agent_->start_ice(endpoint, l, r, agent_info);
3883  }
3887  ice_agent_->start_ice(endpoint, l, r, agent_info);
3888  }
3892  ice_agent_->start_ice(endpoint, l, r, agent_info);
3893  }
3897  ice_agent_->start_ice(endpoint, l, r, agent_info);
3898  }
3902  ice_agent_->start_ice(endpoint, l, r, agent_info);
3903  }
3907  ice_agent_->start_ice(endpoint, l, r, agent_info);
3908  }
3912  ice_agent_->start_ice(endpoint, l, r, agent_info);
3913  }
3917  ice_agent_->start_ice(endpoint, l, r, agent_info);
3918  }
3922  ice_agent_->start_ice(endpoint, l, r, agent_info);
3923  }
3924 
3925  using namespace DDS::Security;
3926  // See DDS-Security v1.1 section 7.3.7.1
3930  ice_agent_->start_ice(endpoint, l, r, agent_info);
3931  }
3935  ice_agent_->start_ice(endpoint, l, r, agent_info);
3936  }
3940  ice_agent_->start_ice(endpoint, l, r, agent_info);
3941  }
3945  ice_agent_->start_ice(endpoint, l, r, agent_info);
3946  }
3950  ice_agent_->start_ice(endpoint, l, r, agent_info);
3951  }
3955  ice_agent_->start_ice(endpoint, l, r, agent_info);
3956  }
3960  ice_agent_->start_ice(endpoint, l, r, agent_info);
3961  }
3965  ice_agent_->start_ice(endpoint, l, r, agent_info);
3966  }
3970  ice_agent_->start_ice(endpoint, l, r, agent_info);
3971  }
3975  ice_agent_->start_ice(endpoint, l, r, agent_info);
3976  }
3980  ice_agent_->start_ice(endpoint, l, r, agent_info);
3981  }
3985  ice_agent_->start_ice(endpoint, l, r, agent_info);
3986  }
3987  if (extended_avail & TYPE_LOOKUP_SERVICE_REQUEST_WRITER_SECURE) {
3990  ice_agent_->start_ice(endpoint, l, r, agent_info);
3991  }
3992  if (extended_avail & TYPE_LOOKUP_SERVICE_REQUEST_READER_SECURE) {
3995  ice_agent_->start_ice(endpoint, l, r, agent_info);
3996  }
3997  if (extended_avail & TYPE_LOOKUP_SERVICE_REPLY_WRITER_SECURE) {
4000  ice_agent_->start_ice(endpoint, l, r, agent_info);
4001  }
4002  if (extended_avail & TYPE_LOOKUP_SERVICE_REPLY_READER_SECURE) {
4005  ice_agent_->start_ice(endpoint, l, r, agent_info);
4006  }
4007 }
4008 
4011  GUID_t l = guid_;
4012 
4013  // See RTPS v2.1 section 8.5.5.1
4017  ice_agent_->stop_ice(endpoint, l, r);
4018  }
4022  ice_agent_->stop_ice(endpoint, l, r);
4023  }
4027  ice_agent_->stop_ice(endpoint, l, r);
4028  }
4032  ice_agent_->stop_ice(endpoint, l, r);
4033  }
4037  ice_agent_->stop_ice(endpoint, l, r);
4038  }
4042  ice_agent_->stop_ice(endpoint, l, r);
4043  }
4047  ice_agent_->stop_ice(endpoint, l, r);
4048  }
4052  ice_agent_->stop_ice(endpoint, l, r);
4053  }
4057  ice_agent_->stop_ice(endpoint, l, r);
4058  }
4062  ice_agent_->stop_ice(endpoint, l, r);
4063  }
4064 
4065  using namespace DDS::Security;
4066  // See DDS-Security v1.1 section 7.3.7.1
4070  ice_agent_->stop_ice(endpoint, l, r);
4071  }
4075  ice_agent_->stop_ice(endpoint, l, r);
4076  }
4080  ice_agent_->stop_ice(endpoint, l, r);
4081  }
4085  ice_agent_->stop_ice(endpoint, l, r);
4086  }
4090  ice_agent_->stop_ice(endpoint, l, r);
4091  }
4095  ice_agent_->stop_ice(endpoint, l, r);
4096  }
4100  ice_agent_->stop_ice(endpoint, l, r);
4101  }
4105  ice_agent_->stop_ice(endpoint, l, r);
4106  }
4110  ice_agent_->stop_ice(endpoint, l, r);
4111  }
4115  ice_agent_->stop_ice(endpoint, l, r);
4116  }
4120  ice_agent_->stop_ice(endpoint, l, r);
4121  }
4125  ice_agent_->stop_ice(endpoint, l, r);
4126  }
4127  if (extended_avail & TYPE_LOOKUP_SERVICE_REQUEST_WRITER_SECURE) {
4130  ice_agent_->stop_ice(endpoint, l, r);
4131  }
4132  if (extended_avail & TYPE_LOOKUP_SERVICE_REQUEST_READER_SECURE) {
4135  ice_agent_->stop_ice(endpoint, l, r);
4136  }
4137  if (extended_avail & TYPE_LOOKUP_SERVICE_REPLY_WRITER_SECURE) {
4140  ice_agent_->stop_ice(endpoint, l, r);
4141  }
4142  if (extended_avail & TYPE_LOOKUP_SERVICE_REPLY_READER_SECURE) {
4145  ice_agent_->stop_ice(endpoint, l, r);
4146  }
4147 }
4148 
4150 Spdp::remote_crypto_handle(const DCPS::GUID_t& remote_participant) const
4151 {
4152  return sedp_->get_handle_registry()->get_remote_participant_crypto_handle(remote_participant);
4153 }
4154 
4155 // Request and maintain a server-reflexive address.
4157 {
4158  DCPS::RcHandle<Spdp> outer = outer_.lock();
4159  if (!outer) return;
4160 
4161  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4162  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4163  if (relay_address != ACE_INET_Addr()) {
4165  send(relay_address, relay_srsm_.message());
4166  relay_stun_task_falloff_.advance(ICE::Configuration::instance()->server_reflexive_address_period());
4167  relay_stun_task_->schedule(relay_stun_task_falloff_.get());
4168  }
4169  }
4170 }
4171 
4173 {
4174 #ifndef DDS_HAS_MINIMUM_BIT
4175  DCPS::RcHandle<Spdp> outer = outer_.lock();
4176  if (!outer) return;
4177 
4178  DCPS::ConnectionRecord connection_record;
4179  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4180  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4182 
4183  switch (sc) {
4186  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4187  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4189  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4190  }
4191  break;
4194  // Lengthen to normal period.
4195  relay_stun_task_falloff_.set(ICE::Configuration::instance()->server_reflexive_address_period());
4196  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4197  connection_record.latency = relay_srsm_.latency().to_dds_duration();
4199  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, true, connection_record));
4200  break;
4202  connection_record.address = DCPS::LogAddr(relay_srsm_.unset_stun_server_address()).c_str();
4203  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4204  break;
4205  }
4206 #else
4207  ACE_UNUSED_ARG(sc);
4208 #endif
4209 }
4210 
4212 {
4213 #ifndef DDS_HAS_MINIMUM_BIT
4214  DCPS::RcHandle<Spdp> outer = outer_.lock();
4215  if (!outer) return;
4216 
4217  relay_stun_task_->cancel();
4218 
4219  DCPS::ConnectionRecord connection_record;
4220  std::memset(connection_record.guid, 0, sizeof(connection_record.guid));
4221  connection_record.protocol = DCPS::RTPS_RELAY_STUN_PROTOCOL;
4223 
4225  connection_record.address = DCPS::LogAddr(relay_srsm_.stun_server_address()).c_str();
4226  outer->sedp_->job_queue()->enqueue(DCPS::make_rch<DCPS::WriteConnectionRecords>(outer->bit_subscriber_, false, connection_record));
4227  }
4228 
4230 #endif
4231 }
4232 
4234 {
4235  DCPS::RcHandle<Spdp> outer = outer_.lock();
4236  if (!outer) return;
4237 
4238  if (outer->config_->use_rtps_relay() || outer->config_->rtps_relay_only()) {
4239  const ACE_INET_Addr relay_address = outer->config_->spdp_rtps_relay_address();
4240  if (relay_address != ACE_INET_Addr()) {
4241  write(SEND_RELAY);
4242  relay_spdp_task_falloff_.advance(outer->config_->spdp_rtps_relay_send_period());
4243  relay_spdp_task_->schedule(relay_spdp_task_falloff_.get());
4244  }
4245  }
4246 }
4247 #endif
4248 
4250 {
4252 }
4253 
4255 {
4256  DCPS::RcHandle<Spdp> outer = outer_.lock();
4257  if (!outer) return;
4258 
4259  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4260 
4261  while (!directed_guids_.empty()) {
4262  const DCPS::GUID_t id = directed_guids_.front();
4263  directed_guids_.pop_front();
4264 
4265  DiscoveredParticipantConstIter pos = outer->participants_.find(id);
4266  if (pos == outer->participants_.end()) {
4267  continue;
4268  }
4269 
4270  write_i(id, pos->second.last_recv_address_, SEND_DIRECT | SEND_RELAY);
4271  directed_guids_.push_back(id);
4272  directed_send_task_->schedule(outer->config_->resend_period() * (1.0 / directed_guids_.size()));
4273  break;
4274  }
4275 }
4276 
4277 void
4279 {
4280  DCPS::RcHandle<Spdp> outer = outer_.lock();
4281  if (!outer) return;
4282 
4283  outer->process_lease_expirations(now);
4284 }
4285 
4287 {
4288  ACE_UNUSED_ARG(now);
4289 #ifndef DDS_HAS_MINIMUM_BIT
4290  DCPS::RcHandle<Spdp> outer = outer_.lock();
4291  if (!outer) return;
4292 
4293  if (DCPS::DCPS_debug_level > 4) {
4295  "(%P|%t) Spdp::SpdpTransport::thread_status_task(): Updating internal thread status BIT.\n"));
4296  }
4297 
4298  ACE_GUARD(ACE_Thread_Mutex, g, outer->lock_);
4299 
4300  typedef DCPS::ThreadStatusManager::List List;
4301  List running;
4302  List removed;
4303  TheServiceParticipant->get_thread_status_manager().harvest(last_harvest, running, removed);
4304  last_harvest = now;
4305  for (List::const_iterator i = removed.begin(); i != removed.end(); ++i) {
4307  data.thread_id = i->bit_key().c_str();
4308  outer->bit_subscriber_->remove_thread_status(data);
4309  }
4310  for (List::const_iterator i = running.begin(); i != running.end(); ++i) {
4312  data.thread_id = i->bit_key().c_str();
4313  data.utilization = i->utilization(now);
4314  outer->bit_subscriber_->add_thread_status(data, DDS::NEW_VIEW_STATE, i->timestamp());
4315  }
4316 
4317 #endif /* DDS_HAS_MINIMUM_BIT */
4318 }
4319 
4320 #ifdef OPENDDS_SECURITY
4322 {
4323  DCPS::RcHandle<Spdp> outer = outer_.lock();
4324  if (!outer) return;
4325 
4326  outer->process_handshake_deadlines(now);
4327 }
4328 
4330 {
4331  DCPS::RcHandle<Spdp> outer = outer_.lock();
4332  if (!outer) return;
4333 
4334  outer->process_handshake_resends(now);
4335 }
4336 
4338 {
4339  if (iter == participants_.end()) {
4340  return;
4341  }
4342 
4344 
4345  std::pair<TimeQueue::iterator, TimeQueue::iterator> range = handshake_deadlines_.equal_range(iter->second.handshake_deadline_);
4346  for (; range.first != range.second; ++range.first) {
4347  if (range.first->second == iter->first) {
4348  handshake_deadlines_.erase(range.first);
4349  break;
4350  }
4351  }
4352 }
4353 
4355 {
4356  if (iter == participants_.end()) {
4357  return;
4358  }
4359 
4360  iter->second.have_auth_req_msg_ = false;
4361  iter->second.have_handshake_msg_ = false;
4362  iter->second.handshake_resend_falloff_.set(config_->auth_resend_period());
4363 
4364  std::pair<TimeQueue::iterator, TimeQueue::iterator> range = handshake_resends_.equal_range(iter->second.stateless_msg_deadline_);
4365  for (; range.first != range.second; ++range.first) {
4366  if (range.first->second == iter->first) {
4367  handshake_resends_.erase(range.first);
4368  break;
4369  }
4370  }
4371 }
4372 
4374  const ParticipantData_t& pdata,
4375  const DCPS::GUID_t& guid)
4376 {
4377  ICE::AgentInfoMap ai_map;
4378  if (!ParameterListConverter::from_param_list(plist, ai_map)) {
4379  if (DCPS::DCPS_debug_level > 0) {
4380  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::process_participant_ice - ")
4381  ACE_TEXT("failed to convert from ParameterList to ")
4382  ACE_TEXT("ICE::AgentInfo\n")));
4383  }
4384  return;
4385  }
4386  ICE::AgentInfoMap::const_iterator sedp_pos = ai_map.find(SEDP_AGENT_INFO_KEY);
4387  ICE::AgentInfoMap::const_iterator spdp_pos = ai_map.find(SPDP_AGENT_INFO_KEY);
4388 
4389  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint;
4390  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint;
4391  {
4394  return;
4395  }
4396  if (sedp_) {
4397  sedp_endpoint = sedp_->get_ice_endpoint();
4398  }
4399  if (tport_) {
4400  spdp_endpoint = tport_->get_ice_endpoint();
4401  }
4402  DiscoveredParticipantIter iter = participants_.find(guid);
4403  if (iter != participants_.end()) {
4404  if (sedp_pos != ai_map.end()) {
4405  iter->second.have_sedp_info_ = true;
4406  iter->second.sedp_info_ = sedp_pos->second;
4407  } else {
4408  iter->second.have_sedp_info_ = false;
4409  }
4410 
4411  if (spdp_pos != ai_map.end()) {
4412  iter->second.have_spdp_info_ = true;
4413  iter->second.spdp_info_ = spdp_pos->second;
4414  } else {
4415  iter->second.have_spdp_info_ = false;
4416  }
4417  }
4418  }
4419 
4420  if (sedp_endpoint) {
4421  if (sedp_pos != ai_map.end()) {
4422  start_ice(sedp_endpoint, guid, pdata.participantProxy.availableBuiltinEndpoints,
4423  pdata.participantProxy.availableExtendedBuiltinEndpoints, sedp_pos->second);
4424  } else {
4425  stop_ice(sedp_endpoint, guid, pdata.participantProxy.availableBuiltinEndpoints,
4427  }
4428  }
4429 
4430  if (spdp_endpoint) {
4431  if (spdp_pos != ai_map.end()) {
4432  ice_agent_->start_ice(spdp_endpoint, guid_, guid, spdp_pos->second);
4433  } else {
4434  ice_agent_->stop_ice(spdp_endpoint, guid_, guid);
4435 #ifndef DDS_HAS_MINIMUM_BIT
4437  DiscoveredParticipantIter iter = participants_.find(guid);
4438  if (iter != participants_.end()) {
4440  process_location_updates_i(iter, "stop ice");
4441  }
4442 #endif
4443  }
4444  }
4445 }
4446 
4447 #endif
4448 
4449 const ParticipantData_t& Spdp::get_participant_data(const DCPS::GUID_t& guid) const
4450 {
4452  return iter->second.pdata_;
4453 }
4454 
4456 {
4458  return iter->second.pdata_;
4459 }
4460 
4462 {
4464 }
4465 
4467 {
4469  return iter->second.discovered_at_.to_monotonic_time();
4470 }
4471 
4472 void
4474 {
4475  ACE_UNUSED_ARG(flag);
4476 
4477 #ifdef OPENDDS_SECURITY
4478  sedp_->rtps_relay_only_now(flag);
4479 
4480  if (flag) {
4482 
4483  tport_->relay_spdp_task_falloff_.set(config_->sedp_heartbeat_period());
4484  tport_->relay_spdp_task_->schedule(TimeDuration::zero_value);
4485 
4486  tport_->relay_stun_task_falloff_.set(config_->sedp_heartbeat_period());
4487  tport_->relay_stun_task_->schedule(TimeDuration::zero_value);
4488 
4489 #ifndef DDS_HAS_MINIMUM_BIT
4490  const DCPS::ParticipantLocation mask =
4495 
4496  for (DiscoveredParticipantIter iter = participants_.begin();
4497  iter != participants_.end();
4498  ++iter) {
4499  enqueue_location_update_i(iter, mask, ACE_INET_Addr(), "rtps_relay_only_now");
4500  process_location_updates_i(iter, "rtps_relay_only_now");
4501  }
4502 #endif
4503  } else {
4504  if (!config_->use_rtps_relay()) {
4505  if (tport_->relay_spdp_task_) {
4506  tport_->relay_spdp_task_->cancel();
4507  }
4508  if (tport_->relay_stun_task_) {
4509  tport_->disable_relay_stun_task();
4510  }
4511  }
4512  }
4513 #endif
4514 }
4515 
4516 void
4518 {
4519  ACE_UNUSED_ARG(f);
4520 
4521 #ifdef OPENDDS_SECURITY
4522  sedp_->use_rtps_relay_now(f);
4523 
4524  if (f) {
4526  tport_->relay_spdp_task_falloff_.set(config_->sedp_heartbeat_period());
4527  tport_->relay_spdp_task_->schedule(TimeDuration::zero_value);
4528 
4529  tport_->relay_stun_task_falloff_.set(config_->sedp_heartbeat_period());
4530  tport_->relay_stun_task_->schedule(TimeDuration::zero_value);
4531  } else {
4533 
4534  if (!config_->rtps_relay_only()) {
4535  if (tport_->relay_spdp_task_) {
4536  tport_->relay_spdp_task_->cancel();
4537  }
4538  if (tport_->relay_stun_task_) {
4539  tport_->disable_relay_stun_task();
4540  }
4541  }
4542 
4543 #ifndef DDS_HAS_MINIMUM_BIT
4544  const DCPS::ParticipantLocation mask =
4547 
4548  for (DiscoveredParticipantIter iter = participants_.begin();
4549  iter != participants_.end();
4550  ++iter) {
4551  enqueue_location_update_i(iter, mask, ACE_INET_Addr(), "use_rtps_relay_now");
4552  process_location_updates_i(iter, "use_rtps_relay_now");
4553  }
4554 #endif
4555 
4556  }
4557 #endif
4558 }
4559 
4560 void
4562 {
4563  ACE_UNUSED_ARG(flag);
4564 
4565 #ifdef OPENDDS_SECURITY
4566  sedp_->use_ice_now(flag);
4567 
4568  if (flag) {
4569  DCPS::WeakRcHandle<ICE::Endpoint> spdp_endpoint = tport_->get_ice_endpoint();
4570  DCPS::WeakRcHandle<ICE::Endpoint> sedp_endpoint = sedp_->get_ice_endpoint();
4571 
4572  if (sedp_endpoint) {
4574  ice_agent_->add_local_agent_info_listener(sedp_endpoint, l, DCPS::static_rchandle_cast<ICE::AgentInfoListener>(DCPS::rchandle_from(this)));
4575  }
4576 
4577  ice_agent_->add_endpoint(DCPS::static_rchandle_cast<ICE::Endpoint>(tport_));
4579  tport_->ice_endpoint_added_ = true;
4580  if (spdp_endpoint) {
4581  ice_agent_->add_local_agent_info_listener(spdp_endpoint, guid_, DCPS::static_rchandle_cast<ICE::AgentInfoListener>(DCPS::rchandle_from(this)));
4582  }
4583 
4584  for (DiscoveredParticipantConstIter pos = participants_.begin(), limit = participants_.end(); pos != limit; ++pos) {
4585  if (spdp_endpoint && pos->second.have_spdp_info_) {
4586  ice_agent_->start_ice(spdp_endpoint, guid_, pos->first, pos->second.spdp_info_);
4587  }
4588 
4589  if (sedp_endpoint && pos->second.have_sedp_info_) {
4590  start_ice(sedp_endpoint, pos->first, pos->second.pdata_.participantProxy.availableBuiltinEndpoints,
4591  pos->second.pdata_.participantProxy.availableExtendedBuiltinEndpoints, pos->second.sedp_info_);
4592  }
4593  }
4594  } else {
4595  ice_agent_->remove_endpoint(DCPS::static_rchandle_cast<ICE::Endpoint>(tport_));
4597  tport_->ice_endpoint_added_ = false;
4598 
4599 #ifndef DDS_HAS_MINIMUM_BIT
4600  const DCPS::ParticipantLocation mask =
4603 
4604  for (DiscoveredParticipantIter part = participants_.begin();
4605  part != participants_.end();
4606  ++part) {
4607  enqueue_location_update_i(part, mask, ACE_INET_Addr(), "use_ice_now");
4608  process_location_updates_i(part, "use_ice_now");
4609  }
4610 #endif
4611  }
4612 
4613  if (is_security_enabled()) {
4615  }
4616 #endif
4617 }
4618 
4620 {
4621 #ifdef OPENDDS_SECURITY
4622  return security_enabled_ && config_->secure_participant_user_data();
4623 #else
4624  return false;
4625 #endif
4626 }
4627 
4629 {
4630  bool include_user_data = true;
4631 #ifdef OPENDDS_SECURITY
4632  if (secure_part_user_data()) {
4633  include_user_data = secure;
4634  }
4635 #else
4636  ACE_UNUSED_ARG(secure);
4637 #endif
4639  bit_data.key = DDS::BuiltinTopicKey_t();
4640  bit_data.user_data = include_user_data ? qos_.user_data : DDS::UserDataQosPolicy();
4641  return bit_data;
4642 }
4643 
4645 {
4647  endpoint_manager().ignore(ignoreId);
4648 
4649  DiscoveredParticipantIter iter = participants_.find(ignoreId);
4650  if (iter != participants_.end()) {
4652  participants_.erase(iter);
4653  }
4654 }
4655 
4657 {
4659 
4660  DiscoveredParticipantIter iter = participants_.find(removeId);
4661  if (iter != participants_.end()) {
4663  participants_.erase(iter);
4664  }
4665 }
4666 
4668 {
4670  qos_ = qos;
4672 }
4673 
4674 bool Spdp::has_domain_participant(const GUID_t& remote) const
4675 {
4677  return has_discovered_participant(remote);
4678 }
4679 
4680 DCPS::TopicStatus Spdp::assert_topic(GUID_t& topicId, const char* topicName,
4681  const char* dataTypeName, const DDS::TopicQos& qos,
4682  bool hasDcpsKey, DCPS::TopicCallbacks* topic_callbacks)
4683 {
4684  if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
4685  if (DCPS_debug_level) {
4686  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
4687  ACE_TEXT("topic or type name length limit (256) exceeded\n")));
4688  }
4690  }
4691 
4692  return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey, topic_callbacks);
4693 }
4694 
4696 {
4697  if (iter == participants_.end()) {
4698  return;
4699  }
4700  endpoint_manager().disassociate(iter->second);
4701  bit_subscriber_->remove_participant(iter->second.bit_ih_, iter->second.location_ih_);
4702  if (DCPS_debug_level > 3) {
4703  ACE_DEBUG((LM_DEBUG, "(%P|%t) LocalParticipant::purge_discovered_participant: "
4704  "erasing %C (%B)\n",
4705  DCPS::LogGuid(iter->first).c_str(), participants_.size()));
4706  }
4707 
4709 
4710 #ifdef OPENDDS_SECURITY
4711  if (security_config_) {
4712  DDS::Security::SecurityException se = {"", 0, 0};
4713  DDS::Security::Authentication_var auth = security_config_->get_authentication();
4714  DDS::Security::AccessControl_var access = security_config_->get_access_control();
4715 
4717  sedp_->get_handle_registry()->get_remote_participant_crypto_handle(iter->first);
4718  if (!security_config_->get_crypto_key_factory()->unregister_participant(pch, se)) {
4719  if (DCPS::security_debug.auth_warn) {
4720  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
4721  ACE_TEXT("Spdp::purge_discovered_participant() - ")
4722  ACE_TEXT("Unable to return crypto handle. ")
4723  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
4724  se.code, se.minor_code, se.message.in()));
4725  }
4726  }
4727  sedp_->get_handle_registry()->erase_remote_participant_crypto_handle(iter->first);
4728  sedp_->get_handle_registry()->erase_remote_participant_permissions_handle(iter->first);
4729 
4730  if (iter->second.identity_handle_ != DDS::HANDLE_NIL) {
4731  if (!auth->return_identity_handle(iter->second.identity_handle_, se)) {
4732  if (DCPS::security_debug.auth_warn) {
4733  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
4734  ACE_TEXT("Spdp::purge_discovered_participant() - ")
4735  ACE_TEXT("Unable to return identity handle. ")
4736  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
4737  se.code, se.minor_code, se.message.in()));
4738  }
4739  }
4740  }
4741 
4742  if (iter->second.handshake_handle_ != DDS::HANDLE_NIL) {
4743  if (!auth->return_handshake_handle(iter->second.handshake_handle_, se)) {
4744  if (DCPS::security_debug.auth_warn) {
4745  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
4746  ACE_TEXT("Spdp::purge_discovered_participant() - ")
4747  ACE_TEXT("Unable to return handshake handle. ")
4748  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
4749  se.code, se.minor_code, se.message.in()));
4750  }
4751  }
4752  }
4753 
4754  if (iter->second.shared_secret_handle_ != 0) {
4755  if (!auth->return_sharedsecret_handle(iter->second.shared_secret_handle_, se)) {
4756  if (DCPS::security_debug.auth_warn) {
4757  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
4758  ACE_TEXT("Spdp::purge_discovered_participant() - ")
4759  ACE_TEXT("Unable to return sharedsecret handle. ")
4760  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
4761  se.code, se.minor_code, se.message.in()));
4762  }
4763  }
4764  }
4765 
4766  if (iter->second.permissions_handle_ != DDS::HANDLE_NIL) {
4767  if (!access->return_permissions_handle(iter->second.permissions_handle_, se)) {
4768  if (DCPS::security_debug.auth_warn) {
4769  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) {auth_warn} ")
4770  ACE_TEXT("Spdp::purge_discovered_participant() - ")
4771  ACE_TEXT("Unable to return permissions handle. ")
4772  ACE_TEXT("Security Exception[%d.%d]: %C\n"),
4773  se.code, se.minor_code, se.message.in()));
4774  }
4775  }
4776  }
4777  }
4778 
4779  if (iter->second.auth_state_ == AUTH_STATE_HANDSHAKE) {
4781  }
4782 #endif
4783 }
4784 
4785 #ifdef OPENDDS_SECURITY
4787 {
4788  if (dp.auth_state_ == AUTH_STATE_HANDSHAKE &&
4789  new_state != AUTH_STATE_HANDSHAKE) {
4791  }
4792  dp.auth_state_ = new_state;
4793 }
4794 #endif
4795 
4796 } // namespace RTPS
4797 } // namespace OpenDDS
4798 
void signal_liveliness(DDS::LivelinessQosPolicyKind kind)
Definition: Spdp.cpp:3394
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER
Definition: MessageTypes.h:81
UserDataQosPolicy user_data
DDS::Security::AuthRequestMessageToken remote_auth_request_token_
void write_secure_updates()
Definition: Spdp.cpp:428
bool from_param_list(const ParameterList &param_list, DDS::ParticipantBuiltinTopicData &pbtd)
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
DCPS::RcHandle< SpdpTransport > tport_
Definition: Spdp.h:556
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
void process_participant_ice(const ParameterList &plist, const ParticipantData_t &pdata, const DCPS::GUID_t &guid)
Definition: Spdp.cpp:4373
ICE::ServerReflexiveStateMachine relay_srsm_
Definition: Spdp.h:545
void thread_status_interval(const TimeDuration &thread_status_interval)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const char * c_str(void) const
void register_handlers(const DCPS::ReactorTask_rch &reactor_task)
Definition: Spdp.cpp:2524
void process_location_updates_i(const DiscoveredParticipantIter &iter, const char *reason, bool force_publish=false)
Definition: Spdp.cpp:474
const char RTPS_RELAY_APPLICATION_PARTICIPANT[]
Definition: RtpsDiscovery.h:42
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER
Definition: RtpsCore.idl:206
BinaryPropertySeq binary_properties
const LogLevel::Value value
Definition: debug.cpp:61
DCPS::RcHandle< RtpsDiscoveryConfig > config_
Definition: Spdp.h:384
const char RTPS_DISCOVERY_ENDPOINT_ANNOUNCEMENTS[]
Definition: RtpsDiscovery.h:40
const InstanceHandle_t HANDLE_NIL
void close(const DCPS::ReactorTask_rch &reactor_task)
Definition: Spdp.cpp:2603
#define ENOTSUP
void ice_connect(const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
Definition: Spdp.cpp:3361
const octet FLAG_E
Definition: RtpsCore.idl:521
std::string String
const ExtendedBuiltinEndpointSet_t TYPE_LOOKUP_SERVICE_REPLY_READER_SECURE
BUILT_IN_TOPIC_KEY string protocol
bool is_expectant_opendds(const GUID_t &participant) const
Definition: Spdp.cpp:2191
const ParticipantSecurityAttributesMask PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_LIVELINESS_PROTECTED
bool update_domain_participant_qos(const DDS::DomainParticipantQos &qos)
Definition: Spdp.cpp:4667
const ParticipantSecurityAttributesMask PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_RTPS_PROTECTED
bool is_ip_equal(const ACE_INET_Addr &SAP) const
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_TYPE_LOOKUP_REQUEST_DATA_WRITER
Definition: RtpsCore.idl:223
size_t length(void) const
if(!(yy_init))
unsigned long ACE_Reactor_Mask
bool locators_changed(const ParticipantProxy_t &x, const ParticipantProxy_t &y)
Definition: Sedp.cpp:5982
int access(const char *path, int amode)
SubmessageHeader smHeader
Definition: RtpsCore.idl:667
LM_INFO
OpenDDS_Dcps_Export TransportDebug transport_debug
Definition: debug.cpp:26
DCPS::FibonacciSequence< TimeDuration > relay_spdp_task_falloff_
Definition: Spdp.h:541
DDS::PropertySeq PropertySeq
const long LOCATOR_KIND_UDPv4
Definition: RtpsCore.idl:111
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127
DCPS::ConditionVariable< ACE_Thread_Mutex > shutdown_cond_
Definition: Spdp.h:601
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER
Definition: RtpsCore.idl:198
bool skip(size_t n, int size=1)
Definition: Serializer.inl:443
const EntityId_t ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER
Definition: GuidUtils.h:43
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER
Definition: RtpsCore.idl:202
const ParameterId_t PID_PARTICIPANT_GUID
Definition: RtpsCore.idl:289
void handle_auth_request(const DDS::Security::ParticipantStatelessMessage &msg)
Definition: Spdp.cpp:1142
ACE_Thread_Mutex lock_
Definition: Spdp.h:378
Sedp & endpoint_manager()
Definition: Spdp.h:354
const BuiltinEndpointSet_t SEDP_BUILTIN_PUBLICATIONS_SECURE_READER
void relay_stun_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4156
const EntityId_t ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_READER
Definition: MessageTypes.h:78
void reset(void)
DDS::Security::PermissionsCredentialToken permissions_credential_token_
Definition: Spdp.h:630
sequence< octet > key
DDS::Security::IdentityToken identity_token_
Definition: Spdp.h:627
DCPS::MulticastManager multicast_manager_
Definition: Spdp.h:518
void process_handshake_deadlines(const DCPS::MonotonicTimePoint &tv)
Definition: Spdp.cpp:1758
void match_unauthenticated(const DiscoveredParticipantIter &dp_iter)
Definition: Spdp.cpp:1126
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER
Definition: GuidUtils.h:47
String to_dds_string(unsigned short to_convert)
bool is_response(const STUN::Message &message) const
Definition: RTPS/ICE/Ice.h:241
void write(WriteFlags flags)
Definition: Spdp.cpp:2671
bool security_enabled_
Definition: Spdp.h:619
const VendorId_t VENDORID_OPENDDS
Definition: MessageTypes.h:26
int locator_to_address(ACE_INET_Addr &dest, const DCPS::Locator_t &locator, bool map)
bool associated() const
Definition: Spdp.cpp:3682
const EntityId_t ENTITYID_TL_SVC_REQ_WRITER
Definition: GuidUtils.h:52
const ParticipantLocation LOCATION_LOCAL
const char SEDP_AGENT_INFO_KEY[]
Definition: Spdp.h:57
TimeQueue handshake_deadlines_
Definition: Spdp.h:643
const ParticipantLocation LOCATION_LOCAL6
DiscoveredParticipantMap::iterator DiscoveredParticipantIter
Definition: Spdp.h:70
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
OpenDDS::DCPS::GUID_t destination_participant_guid
const char * c_str() const
void opts(int opts)
void data_received(const DataSubmessage &data, const ParameterList &plist, const ACE_INET_Addr &from)
Definition: Spdp.cpp:1048
virtual int handle_input(ACE_HANDLE h)
Definition: Spdp.cpp:2989
const EntityId_t ENTITYID_TL_SVC_REPLY_READER
Definition: GuidUtils.h:55
CommandPtr execute_or_enqueue(CommandPtr command)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
int get_size(void) const
ACE_CDR::ULong get_participant_flags(const DCPS::GUID_t &guid) const
Definition: Spdp.cpp:3693
sequence< Locator_t > LocatorSeq
const SampleStateMask ANY_SAMPLE_STATE
unsigned long ExtendedBuiltinEndpointSet_t
PluginParticipantSecurityAttributesMask plugin_participant_security_attributes
const ACE_Time_Value & value() const
const ACE_CDR::UShort DATA_OCTETS_TO_IQOS
Definition: MessageTypes.h:102
const EntityId_t ENTITYID_TL_SVC_REQ_WRITER_SECURE
Definition: MessageTypes.h:90
key GuidPrefix_t guidPrefix
Definition: DdsDcpsGuid.idl:58
const DDS::BuiltinTopicKey_t BUILTIN_TOPIC_KEY_UNKNOWN
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_READER
Definition: MessageTypes.h:88
ACE_Message_Block buff_
Definition: Spdp.h:520
SpdpTransport(DCPS::RcHandle< Spdp > outer)
Definition: Spdp.cpp:2308
ACE_SOCK_Dgram unicast_socket_
Definition: Spdp.h:507
const ExtendedBuiltinEndpointSet_t TYPE_LOOKUP_SERVICE_REPLY_WRITER_SECURE
DCPS::RcHandle< ICE::Agent > ice_agent_
Definition: Spdp.h:634
DCPS::TimeDuration lease_duration_
Definition: Spdp.h:387
DCPS::WeakRcHandle< Spdp > outer_
Definition: Spdp.h:502
DDS::OctetSeq local_participant_data_as_octets() const
Definition: Spdp.cpp:1217
int ssize_t
#define ACE_CDR_BYTE_ORDER
boolean opendds_rtps_relay_application_participant
Definition: RtpsCore.idl:618
int release(void)
int set_option(int level, int option, void *optval, int optlen) const
const ProtocolVersion_t PROTOCOLVERSION
Definition: MessageTypes.h:67
sequence< DataHolder > DataHolderSeq
DCPS::FibonacciSequence< TimeDuration > relay_stun_task_falloff_
Definition: Spdp.h:544
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER
Definition: RtpsCore.idl:218
DCPS::RcHandle< SpdpMulti > local_send_task_
Definition: Spdp.h:525
RtpsDiscovery * disco_
Definition: Spdp.h:383
ACE_Message_Block wbuff_
Definition: Spdp.h:520
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
DDS::DomainParticipantQos qos_
Definition: Spdp.h:380
const MessageCountKind MCK_STUN
const char RTPS_REFLECT_HEARTBEAT_COUNT[]
Definition: RtpsDiscovery.h:43
char * rd_ptr(void) const
int get_type(void) const
const ParticipantSecurityAttributesMask PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_VALID
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint()
Definition: Spdp.cpp:3235
void ice_disconnect(const ICE::GuidSetType &guids, const ACE_INET_Addr &addr)
Definition: Spdp.cpp:3383
DDS::Security::IdentityHandle identity_handle_
Definition: Spdp.h:623
void purge_handshake_resends(DiscoveredParticipantIter iter)
Definition: Spdp.cpp:4354
void OpenDDS_Dcps_Export log_progress(const char *activity, const GUID_t &local, const GUID_t &remote, const MonotonicTime_t &start_time, const GUID_t &reference)
Definition: Logging.cpp:20
const ACE_SOCK_Dgram & choose_recv_socket(ACE_HANDLE h) const
Definition: Spdp.cpp:2966
bool get_last_recv_locator(const DCPS::GUID_t &part_id, DCPS::LocatorSeq &target, bool &inlineQos)
Definition: Spdp.cpp:3668
OpenDDS::RTPS::ParticipantProxy_t participantProxy
DCPS::RcHandle< Sedp > sedp_
Definition: Spdp.h:606
#define SOL_SOCKET
unsigned short octetsToInlineQos
Definition: RtpsCore.idl:672
const EntityId_t ENTITYID_TL_SVC_REPLY_READER_SECURE
Definition: MessageTypes.h:93
AuthState lookup_participant_auth_state(const GUID_t &id) const
Definition: Spdp.cpp:3855
bool is_ip_equal(const ACE_INET_Addr &a, const DCPS::Locator_t &locator)
Definition: Spdp.cpp:641
void remove_lease_expiration_i(DiscoveredParticipantIter iter)
Definition: Spdp.cpp:3704
const ACE_CDR::UShort SMHDR_SZ
Definition: MessageTypes.h:106
OpenDDS_Dcps_Export void address_to_locator(Locator_t &locator, const ACE_INET_Addr &addr)
const ParticipantData_t & get_participant_data(const DCPS::GUID_t &guid) const
Definition: Spdp.cpp:4449
const char RTPS_DISCOVERY_TYPE_LOOKUP_SERVICE[]
Definition: RtpsDiscovery.h:41
DCPS::SequenceNumber seq_
Definition: Spdp.h:505
ACE_HANDLE socket(int protocol_family, int type, int proto)
CryptoTokenSeq ParticipantCryptoTokenSeq
MessageId
One byte message id (<256)
bool is_any(void) const
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
const BuiltinEndpointSet_t SPDP_BUILTIN_PARTICIPANT_SECURE_READER
BUILT_IN_TOPIC_KEY string address
void process_handshake_deadlines(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4321
void send_directed(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4254
const octet FLAG_Q
Definition: RtpsCore.idl:522
DiscoveredParticipantMap::const_iterator DiscoveredParticipantConstIter
Definition: Spdp.h:71
#define OPENDDS_STRING
DCPS::MonotonicTimePoint stateless_msg_deadline_
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
DDS::Security::ParticipantBuiltinTopicDataSecure ddsParticipantDataSecure
bool is_application_participant_
Definition: Spdp.h:395
ProtocolVersion_t version
Definition: RtpsCore.idl:656
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
#define SO_SNDBUF
DCPS::RcHandle< SpdpSporadic > relay_spdp_task_
Definition: Spdp.h:540
PluginParticipantSecurityAttributesMask plugin_participant_attributes
DCPS::InternalTransportStatistics transport_statistics_
Definition: Spdp.h:552
DOMAINID_TYPE_NATIVE DomainId_t
const EntityId_t ENTITYID_SPDP_BUILTIN_PARTICIPANT_WRITER
Definition: GuidUtils.h:44
DDS::Security::ParticipantCryptoHandle crypto_handle_
Definition: Spdp.h:625
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
const double quick_resend_ratio_
Definition: Spdp.h:385
LM_DEBUG
bool validateSequenceNumber(const DCPS::MonotonicTimePoint &now, const DCPS::SequenceNumber &seq, DiscoveredParticipantIter &iter)
Definition: Spdp.cpp:1030
bool get_default_locators(const DCPS::GUID_t &part_id, DCPS::LocatorSeq &target, bool &inlineQos)
Definition: Spdp.cpp:3639
DCPS::TimeDuration rtps_duration_to_time_duration(const Duration_t &rtps_duration, const ProtocolVersion_t &version, const VendorId_t &vendor)
ParticipantSecurityAttributesMask participant_security_attributes
const ACE_SOCK_Dgram & choose_send_socket(const ACE_INET_Addr &addr) const
Definition: Spdp.cpp:2909
DDS::Security::ExtendedBuiltinEndpointSet_t available_extended_builtin_endpoints_
Definition: Spdp.h:617
const ViewStateKind NOT_NEW_VIEW_STATE
StateChange send(const ACE_INET_Addr &address, size_t indication_count_limit, const DCPS::GuidPrefix_t &guid_prefix)
Definition: Ice.cpp:128
DCPS::WeakRcHandle< ICE::Endpoint > get_ice_endpoint_if_added()
Definition: Spdp.cpp:612
void remove_domain_participant(const GUID_t &removeId)
Definition: Spdp.cpp:4656
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:658
const ParameterId_t PID_STATUS_INFO
Definition: RtpsCore.idl:296
void update_lease_expiration_i(DiscoveredParticipantIter iter, const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:3716
ACE_CDR::ULong ULong
Handshake concluded or timed out.
bool ip_in_AgentInfo(const ACE_INET_Addr &from, const ParameterList &plist)
Definition: Spdp.cpp:677
const ACE_INET_Addr & stun_server_address() const
Definition: RTPS/ICE/Ice.h:239
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
Replier should call begin_handshake_reply.
ParticipantCryptoInfoPair lookup_participant_crypto_info(const DCPS::GUID_t &id) const
Definition: Spdp.cpp:3788
void append_transport_statistics(DCPS::TransportStatisticsSequence &seq)
Definition: Spdp.cpp:2802
CandidatesType::const_iterator const_iterator
Definition: Ice.h:72
bool set_socket_multicast_ttl(const ACE_SOCK_Dgram &socket, const unsigned char &ttl)
void start_ice(DCPS::WeakRcHandle< ICE::Endpoint > endpoint, DCPS::GUID_t remote, BuiltinEndpointSet_t avail, DDS::Security::ExtendedBuiltinEndpointSet_t extended_avail, const ICE::AgentInfo &agent_info)
Definition: Spdp.cpp:3867
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
Definition: RtpsCore.idl:220
const DCPS::TimeDuration min_resend_delay_
Definition: Spdp.h:386
ExtendedBuiltinEndpointSet_t extended_builtin_endpoints
void set_auth_state(DiscoveredParticipant &dp, AuthState state)
Definition: Spdp.cpp:4786
unsigned long ParticipantLocation
void thread_status_task(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4286
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
size_t size(void) const
SequenceNumber_t writerSN
Definition: RtpsCore.idl:675
sequence< TransportStatistics > TransportStatisticsSequence
void register_unicast_socket(ACE_Reactor *reactor, ACE_SOCK_Dgram &socket, const char *what)
Definition: Spdp.cpp:2502
bool operator==(const Duration_t &x, const Duration_t &y)
Definition: MessageUtils.h:109
DCPS::RcHandle< SpdpSporadic > directed_send_task_
Definition: Spdp.h:527
bool match_authenticated(const DCPS::GUID_t &guid, DiscoveredParticipantIter &iter)
Definition: Spdp.cpp:1971
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void send_local(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4249
bool has_domain_participant(const GUID_t &ignoreId) const
Definition: Spdp.cpp:4674
Spdp(DDS::DomainId_t domain, DCPS::GUID_t &guid, const DDS::DomainParticipantQos &qos, RtpsDiscovery *disco, XTypes::TypeLookupService_rch tls)
Definition: Spdp.cpp:219
virtual void * get_addr(void) const
ParticipantGenericMessage ParticipantStatelessMessage
const InstanceStateMask ANY_INSTANCE_STATE
ParticipantData_t build_local_pdata(bool always_in_the_clear, Security::DiscoveredParticipantDataKind kind)
Definition: Spdp.cpp:2201
bool cmp_ip4(const ACE_INET_Addr &a, const DCPS::Locator_t &locator)
Definition: Spdp.cpp:617
bool handle_participant_crypto_tokens(const DDS::Security::ParticipantVolatileMessageSecure &msg)
Definition: Spdp.cpp:1885
const EntityId_t ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER
Definition: MessageTypes.h:79
int control(int cmd, void *) const
void spdp_rtps_relay_address_change()
Definition: Spdp.cpp:2786
DDS::Duration_t to_dds_duration() const
const ViewStateKind NEW_VIEW_STATE
ACE_Message_Block * block
Definition: Stun.h:241
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER
DDS::Security::PermissionsHandle lookup_participant_permissions(const DCPS::GUID_t &id) const
Definition: Spdp.cpp:3843
DDS::Security::ExtendedBuiltinEndpointSet_t availableExtendedBuiltinEndpoints
Definition: RtpsCore.idl:620
void send_handshake_request(const DCPS::GUID_t &guid, DiscoveredParticipant &dp)
Definition: Spdp.cpp:1274
void get_interface_addrs(OPENDDS_VECTOR(ACE_INET_Addr)&addrs)
RtpsDiscoveryConfig::AddrVec AddrVec
Definition: RtpsDiscovery.h:57
const ViewStateMask ANY_VIEW_STATE
DDS::Security::PermissionsToken permissions_token_
Definition: Spdp.h:629
DCPS::SequenceNumber stateless_sequence_number_
Definition: Spdp.h:621
const char * c_str() const
Definition: LogAddr.h:32
DCPS::LocatorSeq metatrafficUnicastLocatorList
Definition: RtpsCore.idl:611
const EntityId_t ENTITYID_TL_SVC_REPLY_WRITER_SECURE
Definition: MessageTypes.h:92
void disassociate(DiscoveredParticipant &participant)
Definition: Sedp.cpp:1544
bool log_bits
Definition: Logging.cpp:18
void send_participant_crypto_tokens(const DCPS::GUID_t &id)
Definition: Spdp.cpp:3801
DCPS::TimeDuration lease_extension_
Definition: Spdp.h:388
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
const OpenDDSParticipantFlagsBits_t PFLAGS_EMPTY
Definition: RtpsCore.idl:322
ACE_CDR::UShort UShort
const long LOCATOR_KIND_UDPv6
Definition: RtpsCore.idl:112
virtual ACE_CString _info(void) const=0
ssize_t read(ACE_HANDLE handle, void *buf, size_t len)
int get_local_addr(ACE_Addr &) const
#define AF_INET
#define PF_INET
void process_handshake_resends(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4329
void process_handshake_resends(const DCPS::MonotonicTimePoint &tv)
Definition: Spdp.cpp:1810
DCPS::EntityId_t writerId
Definition: RtpsCore.idl:674
BuiltinEndpointSet_t available_builtin_endpoints_
Definition: Spdp.h:605
const ParticipantLocation LOCATION_ICE
DDS::BinaryPropertySeq BinaryPropertySeq
DCPS::TopicStatus assert_topic(GUID_t &topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, DCPS::TopicCallbacks *topic_callbacks)
Definition: Spdp.cpp:4680
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER
const EntityId_t ENTITYID_TL_SVC_REQ_READER
Definition: GuidUtils.h:53
const EntityId_t ENTITYID_TL_SVC_REPLY_WRITER
Definition: GuidUtils.h:54
time_t sec(void) const
DDS::Security::PermissionsHandle permissions_handle_
Definition: Spdp.h:624
const ExtendedBuiltinEndpointSet_t TYPE_LOOKUP_SERVICE_REQUEST_READER_SECURE
DDS::PropertySeq PropertySeq
Definition: RtpsCore.idl:49
const EntityId_t ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER
Definition: GuidUtils.h:40
DDS::Security::ParticipantSecurityAttributes participant_sec_attr_
Definition: Spdp.h:632
char * wr_ptr(void) const
ACE_HANDLE get_handle(void) const
LM_WARNING
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
AtomicBool initialized_flag_
Spdp initialized.
Definition: Spdp.h:598
void init(DDS::DomainId_t domain, DCPS::GUID_t &guid, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls)
Definition: Spdp.cpp:135
DCPS::RcHandle< SpdpSporadic > handshake_deadline_task_
Definition: Spdp.h:536
std::pair< DDS::Security::ParticipantCryptoHandle, DDS::Security::SharedSecretHandle_var > ParticipantCryptoInfoPair
Definition: Spdp.h:165
ACE_UINT32 ULong
void append(TransportStatisticsSequence &seq, const InternalTransportStatistics &istats)
The End User API.
const STUN::Message & message() const
Definition: RTPS/ICE/Ice.h:237
DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > network_interface_address_reader_
Definition: Spdp.h:533
DCPS::MonotonicTime_t get_participant_discovered_at() const
Definition: Spdp.cpp:4461
DCPS::MonotonicTimePoint handshake_deadline_
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_WRITER
Definition: MessageTypes.h:83
StateChange receive(const STUN::Message &message)
Definition: Ice.cpp:156
DDS::ParticipantBuiltinTopicData base
virtual ACE_Reactor * reactor(void) const
void open(const DCPS::ReactorTask_rch &reactor_task, const DCPS::JobQueue_rch &job_queue)
Definition: Spdp.cpp:2411
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR
Definition: RtpsCore.idl:200
bool open_unicast_socket(u_short port_common, u_short participant_id)
Definition: Spdp.cpp:3400
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_TYPE_LOOKUP_REPLY_DATA_READER
Definition: RtpsCore.idl:229
void update_agent_info(const DCPS::GUID_t &local_guid, const ICE::AgentInfo &agent_info)
Definition: Spdp.cpp:2139
DCPS::RcHandle< SpdpPeriodic > thread_status_task_
Definition: Spdp.h:532
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_WRITER
Definition: MessageTypes.h:85
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER
Definition: MessageTypes.h:82
DCPS::EntityId_t readerId
Definition: RtpsCore.idl:673
void purge_discovered_participant(const DiscoveredParticipantIter &iter)
Definition: Spdp.cpp:4695
DCPS::GUID_t guid_
Definition: Spdp.h:393
#define IPPROTO_IP
sequence< Parameter > ParameterList
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_TYPE_LOOKUP_REPLY_DATA_WRITER
Definition: RtpsCore.idl:227
void append_submessage(RTPS::Message &message, const RTPS::InfoDestinationSubmessage &submessage)
Definition: MessageUtils.h:147
ACE_TEXT("TCP_Factory")
size_t space(void) const
OpenDDS_Dcps_Export GUID_t make_part_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:216
void remove_agent_info(const DCPS::GUID_t &local_guid)
Definition: Spdp.cpp:2146
const EntityId_t ENTITYID_SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER
Definition: MessageTypes.h:77
void process_lease_expirations(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4278
static const TimeDuration zero_value
Definition: TimeDuration.h:31
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR
Definition: RtpsCore.idl:208
ACE_INT32 Long
const EntityId_t ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER
Definition: GuidUtils.h:42
void write_i(WriteFlags flags)
Definition: Spdp.cpp:2681
const char SPDP_AGENT_INFO_KEY[]
Definition: Spdp.h:56
OctetArray4 prefix
Definition: RtpsCore.idl:655
ACE_SOCK_Dgram_Mcast multicast_socket_
Definition: Spdp.h:510
const BuiltinEndpointSet_t SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER
DDS::ParticipantBuiltinTopicData get_part_bit_data(bool secure) const
Definition: Spdp.cpp:4628
BuiltinEndpointSet_t availableBuiltinEndpoints
Definition: RtpsCore.idl:609
void on_data_available(DCPS::RcHandle< DCPS::InternalDataReader< DCPS::NetworkInterfaceAddress > > reader)
Definition: Spdp.cpp:3603
TimeQueue lease_expirations_
Definition: Spdp.h:614
DCPS::RcHandle< SpdpSporadic > relay_stun_task_
Definition: Spdp.h:543
DCPS::RcHandle< SpdpSporadic > lease_expiration_task_
Definition: Spdp.h:530
void swap_bytes(bool do_swap)
Definition: Serializer.inl:403
const BuiltinEndpointSet_t SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
OpenDDS::DCPS::GUID_t source_guid
static const WriteFlags SEND_RELAY
Definition: Spdp.h:434
void send(WriteFlags flags, const ACE_INET_Addr &local_address=ACE_INET_Addr())
Definition: Spdp.cpp:2883
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_VOLATILE_SECURE_READER
Definition: MessageTypes.h:86
const DCPS::MonotonicTime_t participant_discovered_at_
Definition: Spdp.h:394
const OpenDDSParticipantFlagsBits_t PFLAGS_NO_ASSOCIATED_WRITERS
Definition: RtpsCore.idl:323
OpenDDS_Dcps_Export LogLevel log_level
DCPS::GuidPrefix_t guidPrefix
Definition: RtpsCore.idl:606
u_short get_port_number(void) const
AtomicBool shutdown_flag_
Spdp shutting down.
Definition: Spdp.h:603
static const WriteFlags SEND_DIRECT
Definition: Spdp.h:435
DDS::Security::HandshakeHandle handshake_handle_
int strcasecmp(const char *s, const char *t)
void stop_ice(DCPS::WeakRcHandle< ICE::Endpoint > endpoint, DCPS::GUID_t remote, BuiltinEndpointSet_t avail, DDS::Security::ExtendedBuiltinEndpointSet_t extended_avail)
Definition: Spdp.cpp:4009
void publish_location_update_i(const DiscoveredParticipantIter &iter)
Definition: Spdp.cpp:602
int memcmp(const void *t, const void *s, size_t len)
bool is_security_enabled() const
Definition: Spdp.h:159
bool valid_size(const ACE_INET_Addr &a)
Definition: Spdp.cpp:2983
const BuiltinEndpointSet_t SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER
DDS::Security::AuthRequestMessageToken local_auth_request_token_
const string GMCLASSID_SECURITY_AUTH_HANDSHAKE
String str(unsigned decimal_places=3, bool just_sec=false) const
const String & str() const
Definition: LogAddr.h:31
DDS::ReturnCode_t send_handshake_message(const DCPS::GUID_t &guid, DiscoveredParticipant &dp, const DDS::Security::ParticipantStatelessMessage &msg)
Definition: Spdp.cpp:1945
Sequence number abstraction. Only allows positive 64 bit values.
DCPS::TimeDuration latency() const
Definition: RTPS/ICE/Ice.h:246
void ignore(const GUID_t &to_ignore)
Definition: Sedp.cpp:5988
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
void handle_handshake_message(const DDS::Security::ParticipantStatelessMessage &msg)
Definition: Spdp.cpp:1452
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
void print_locator(const CORBA::ULong i, const DCPS::Locator_t &o)
Definition: Spdp.cpp:651
void rtps_relay_only_now(bool f)
Definition: Spdp.cpp:4473
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_STATELESS_READER
Definition: MessageTypes.h:84
const EntityId_t ENTITYID_TL_SVC_REQ_READER_SECURE
Definition: MessageTypes.h:91
const string GMCLASSID_SECURITY_AUTH_REQUEST
ACE_INET_Addr stun_server_address() const
Definition: Spdp.cpp:3353
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
size_t n_participants_in_authentication_
Definition: Spdp.h:648
static Configuration * instance()
Definition: Ice.cpp:109
const DCPS::GUID_t & guid() const
Definition: Spdp.h:94
const ExtendedBuiltinEndpointSet_t TYPE_LOOKUP_SERVICE_REQUEST_WRITER_SECURE
const Encoding & encoding() const
Definition: Serializer.inl:199
static const Value MAX_VALUE
bool announce_domain_participant_qos()
Definition: Spdp.cpp:2290
DDS::Security::IdentityToken identity_token_
int get_addr_size(void) const
void use_ice_now(bool f)
Definition: Spdp.cpp:4561
ACE_CDR::Octet Octet
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
bool eh_shutdown_
Definition: Spdp.h:600
bool to_param_list(const DDS::ParticipantBuiltinTopicData &pbtd, ParameterList &param_list)
const ACE_INET_Addr & unset_stun_server_address() const
Definition: RTPS/ICE/Ice.h:238
Security::SecurityConfig_rch security_config_
Definition: Spdp.h:618
void use_rtps_relay_now(bool f)
Definition: Spdp.cpp:4517
DCPS::TopicStatus assert_topic(GUID_t &topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, DCPS::TopicCallbacks *topic_callbacks)
Definition: Sedp.cpp:6060
void purge_handshake_deadlines(DiscoveredParticipantIter iter)
Definition: Spdp.cpp:4337
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
DCPS::RcHandle< SpdpSporadic > handshake_resend_task_
Definition: Spdp.h:538
const MessageCountKind MCK_RTPS
DCPS::RcHandle< RtpsDiscoveryConfig > config() const
Definition: Spdp.h:218
void set_port_number(u_short, int encode=1)
DataReaderQosBuilder & durability_transient_local()
Definition: Qos_Helper.h:1747
#define ENETUNREACH
const BuiltinEndpointSet_t DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR
Definition: RtpsCore.idl:204
DCPS::FibonacciSequence< DCPS::TimeDuration > handshake_resend_falloff_
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14
const ReturnCode_t RETCODE_OK
Requester and replier should call process handshake.
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER
void update_rtps_relay_application_participant_i(DiscoveredParticipantIter iter, bool new_participant)
Definition: Spdp.cpp:2750
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER
DCPS::RcHandle< DCPS::BitSubscriber > bit_subscriber_
Definition: Spdp.h:379
void process_lease_expirations(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:3744
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
const EntityId_t ENTITYID_SPDP_RELIABLE_BUILTIN_PARTICIPANT_SECURE_WRITER
Definition: MessageTypes.h:87
ACE_INET_Addr multicast_address_
Definition: Spdp.h:509
DDS::Security::ParticipantCryptoHandle remote_crypto_handle(const DCPS::GUID_t &remote_participant) const
Definition: Spdp.cpp:4150
bool secure_part_user_data() const
Definition: Spdp.cpp:4619
#define SO_RCVBUF
const BuiltinEndpointSet_t BUILTIN_ENDPOINT_TYPE_LOOKUP_REQUEST_DATA_READER
Definition: RtpsCore.idl:225
const ParticipantLocation LOCATION_RELAY
const EntityId_t ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER
Definition: GuidUtils.h:41
const long LENGTH_UNLIMITED
#define ACE_ERROR_RETURN(X, Y)
const string RTPS_RELAY_STUN_PROTOCOL
ACE_thread_mutex_t lock_
TimeQueue handshake_resends_
Definition: Spdp.h:646
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER
const EntityId_t ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER
Definition: GuidUtils.h:46
OPENDDS_STRING multicast_interface_
Definition: Spdp.h:508
RTPS::SequenceNumber_t to_rtps_seqnum(const DCPS::SequenceNumber &opendds_seqnum)
Definition: MessageUtils.h:139
DCPS::MonotonicTimePoint last_harvest
Definition: Spdp.h:553
const EntityId_t ENTITYID_UNKNOWN
Definition: GuidUtils.h:36
DCPS::MonotonicTimePoint schedule_handshake_resend(const DCPS::TimeDuration &time, const DCPS::GUID_t &guid)
Definition: Spdp.cpp:1959
const GuidVendorId_t VENDORID_OCI
Vendor Id value specified for OCI is used for OpenDDS.
Definition: GuidUtils.h:29
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
Defines the interface for Discovery callbacks into the Topic.
const ParticipantLocation LOCATION_ICE6
#define TheServiceParticipant
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
const BuiltinEndpointSet_t SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER
void send_relay(const DCPS::MonotonicTimePoint &now)
Definition: Spdp.cpp:4233
void attempt_authentication(const DiscoveredParticipantIter &iter, bool from_discovery)
Definition: Spdp.cpp:1336
DDS::Security::IdentityHandle identity_handle_
void process_relay_sra(ICE::ServerReflexiveStateMachine::StateChange)
Definition: Spdp.cpp:4172
DataReaderQosBuilder & reliability_reliable()
Definition: Qos_Helper.h:1819
OpenDDS_Dcps_Export DDS::BuiltinTopicKey_t guid_to_bit_key(const GUID_t &guid)
Definition: GuidUtils.h:243
bool ip_in_locator_list(const ACE_INET_Addr &from, const DCPS::LocatorSeq &locators)
Definition: Spdp.cpp:660
LM_ERROR
bool has_discovered_participant(const DCPS::GUID_t &guid) const
Definition: Spdp.cpp:3688
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
pid_t getpid(void)
const octet FLAG_K_IN_DATA
Definition: RtpsCore.idl:530
const ParticipantSecurityAttributesMask PARTICIPANT_SECURITY_ATTRIBUTES_FLAG_IS_DISCOVERY_PROTECTED
DiscoveredParticipantMap participants_
Definition: Spdp.h:382
ICE::AddressListType host_addresses() const
Definition: Spdp.cpp:3247
OPENDDS_SET(ACE_INET_Addr) send_addrs_
void init_bit(RcHandle< DCPS::BitSubscriber > bit_subscriber)
Definition: Spdp.cpp:2155
DCPS::SequenceNumber to_opendds_seqnum(const RTPS::SequenceNumber_t &rtps_seqnum)
Definition: MessageUtils.h:132
int close(void)
XTypes::TypeLookupService_rch type_lookup_service_
Definition: Spdp.h:389
const DDS::DomainId_t domain_
Definition: Spdp.h:392
#define ACE_NOTSUP_RETURN(FAILVALUE)
DDS::OctetArray16 address
LivelinessQosPolicyKind
const EntityId_t ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER
Definition: MessageTypes.h:80
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
const ParticipantLocation LOCATION_RELAY6
BUILT_IN_TOPIC_KEY DDS::OctetArray16 guid
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
bool operator<(const GUID_t &lhs, const GUID_t &rhs)
Definition: GuidUtils.h:80
std::set< GuidPair > GuidSetType
Definition: RTPS/ICE/Ice.h:39
PropertyQosPolicy property
static const WriteFlags SEND_MULTICAST
Definition: Spdp.h:433
unsigned long BuiltinEndpointSet_t
Definition: RtpsCore.idl:197
void handle_participant_data(DCPS::MessageId id, const ParticipantData_t &pdata, const DCPS::MonotonicTimePoint &now, const DCPS::SequenceNumber &seq, const ACE_INET_Addr &from, bool from_sedp)
Definition: Spdp.cpp:710
Discovery Strategy class that implements RTPS discovery.
Definition: RtpsDiscovery.h:55
DDS::Security::ParticipantCryptoHandle crypto_handle() const
Definition: Spdp.h:129
DDS::Security::IdentityStatusToken identity_status_token_
Definition: Spdp.h:628
DDS::Security::ParticipantStatelessMessage handshake_msg_
DDS::Security::ParticipantStatelessMessage auth_req_msg_
DDS::Security::ParticipantSecurityAttributesMask security_attributes_to_bitmask(const DDS::Security::ParticipantSecurityAttributes &sec_attr)
Definition: MessageUtils.h:177
const OpenDDSParticipantFlagsBits_t PFLAGS_REFLECT_HEARTBEAT_COUNT
Definition: RtpsCore.idl:326
const string GMCLASSID_SECURITY_PARTICIPANT_CRYPTO_TOKENS
Requester should call begin_handshake_request.
void write_secure_disposes()
Definition: Spdp.cpp:441
const BuiltinEndpointSet_t BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER
const octet FLAG_D
Definition: RtpsCore.idl:526
bool process(InternalDataReader< NetworkInterfaceAddress >::SampleSequence &samples, InternalSampleInfoSequence &infos, const OPENDDS_STRING &multicast_interface, ACE_Reactor *reactor, ACE_Event_Handler *event_handler, const NetworkAddress &multicast_group_address, ACE_SOCK_Dgram_Mcast &multicast_socket)
Returns true if at least one group was joined.
void enqueue_location_update_i(DiscoveredParticipantIter iter, DCPS::ParticipantLocation mask, const ACE_INET_Addr &from, const char *reason)
Definition: Spdp.cpp:460
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153
void ignore_domain_participant(const GUID_t &ignoreId)
Definition: Spdp.cpp:4644