OpenDDS  Snapshot(2023/04/28-20:55)
RtpsDiscovery.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include "RtpsDiscovery.h"
7 
8 #include "RtpsDiscoveryConfig.h"
9 
10 #include <dds/DCPS/LogAddr.h>
12 #include <dds/DCPS/ConfigUtils.h>
18 
21 
22 #include <dds/DdsDcpsInfoUtilsC.h>
23 
24 #include <cstdlib>
25 #include <limits>
26 
28 
29 namespace OpenDDS {
30 namespace RTPS {
31 
32 using DCPS::TimeDuration;
33 
35  : Discovery(key)
36  , config_(DCPS::make_rch<RtpsDiscoveryConfig>())
37 {
38 }
39 
41 {
42 }
43 
44 namespace {
45  const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
46 }
47 
48 int
50 {
53 
54  if (cf.open_section(root, RTPS_SECTION_NAME, 0, rtps_sect) == 0) {
55 
56  // Ensure there are no properties in this section
57  DCPS::ValueMap vm;
58  if (DCPS::pullValues(cf, rtps_sect, vm) > 0) {
59  // There are values inside [rtps_discovery]
61  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
62  ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
63  -1);
64  }
65  // Process the subsections of this section (the individual rtps_discovery/*)
66  DCPS::KeyList keys;
67  if (DCPS::processSections(cf, rtps_sect, keys) != 0) {
69  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
70  ACE_TEXT("too many nesting layers in the [rtps] section.\n")),
71  -1);
72  }
73 
74  // Loop through the [rtps_discovery/*] sections
75  for (DCPS::KeyList::const_iterator it = keys.begin();
76  it != keys.end(); ++it) {
77  const OPENDDS_STRING& rtps_name = it->first;
78 
79  RtpsDiscovery_rch discovery = OpenDDS::DCPS::make_rch<RtpsDiscovery>(rtps_name);
80  RtpsDiscoveryConfig_rch config = discovery->config();
81 
82  // spdpaddr defaults to DCPSDefaultAddress if set
83  if (TheServiceParticipant->default_address().to_addr() != ACE_INET_Addr()) {
84  config->spdp_local_address(TheServiceParticipant->default_address().to_addr());
85  config->multicast_interface(DCPS::LogAddr::ip(TheServiceParticipant->default_address().to_addr()));
86  }
87 
88  DCPS::ValueMap values;
89  DCPS::pullValues(cf, it->second, values);
90  for (DCPS::ValueMap::const_iterator it = values.begin();
91  it != values.end(); ++it) {
92  const OPENDDS_STRING& name = it->first;
93  if (name == "ResendPeriod") {
94  const OPENDDS_STRING& value = it->second;
95  int resend = 0;
96  if (!DCPS::convertToInteger(value, resend)) {
98  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
99  ACE_TEXT("Invalid entry (%C) for ResendPeriod in ")
100  ACE_TEXT("[rtps_discovery/%C] section.\n"),
101  value.c_str(), rtps_name.c_str()), -1);
102  }
103  config->resend_period(TimeDuration(resend));
104  } else if (name == "QuickResendRatio") {
105  const OPENDDS_STRING& value = it->second;
106  double ratio = 0.0;
107  if (!DCPS::convertToDouble(value, ratio)) {
109  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
110  ACE_TEXT("Invalid entry (%C) for QuickResendRatio in ")
111  ACE_TEXT("[rtps_discovery/%C] section.\n"),
112  value.c_str(), rtps_name.c_str()), -1);
113  }
114  config->quick_resend_ratio(ratio);
115  } else if (name == "MinResendDelay") {
116  const OPENDDS_STRING& value = it->second;
117  int delay = 0;
118  if (!DCPS::convertToInteger(value, delay)) {
120  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
121  ACE_TEXT("Invalid entry (%C) for MinResendDelay in ")
122  ACE_TEXT("[rtps_discovery/%C] section.\n"),
123  value.c_str(), rtps_name.c_str()), -1);
124  }
125  config->min_resend_delay(TimeDuration::from_msec(delay));
126  } else if (name == "LeaseDuration") {
127  const OPENDDS_STRING& value = it->second;
128  int duration = 0;
129  if (!DCPS::convertToInteger(value, duration)) {
131  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
132  ACE_TEXT("Invalid entry (%C) for LeaseDuration in ")
133  ACE_TEXT("[rtps_discovery/%C] section.\n"),
134  value.c_str(), rtps_name.c_str()), -1);
135  }
136  config->lease_duration(TimeDuration(duration));
137  } else if (name == "MaxLeaseDuration") {
138  const OPENDDS_STRING& value = it->second;
139  int duration = 0;
140  if (!DCPS::convertToInteger(value, duration)) {
142  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
143  ACE_TEXT("Invalid entry (%C) for MaxLeaseDuration in ")
144  ACE_TEXT("[rtps_discovery/%C] section.\n"),
145  value.c_str(), rtps_name.c_str()), -1);
146  }
147  config->max_lease_duration(TimeDuration(duration));
148 #ifdef OPENDDS_SECURITY
149  } else if (name == "SecurityUnsecureLeaseDuration") {
150  const OPENDDS_STRING& value = it->second;
151  int duration = 0;
152  if (!DCPS::convertToInteger(value, duration)) {
154  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
155  ACE_TEXT("Invalid entry (%C) for SecurityUnsecureLeaseDuration in ")
156  ACE_TEXT("[rtps_discovery/%C] section.\n"),
157  value.c_str(), rtps_name.c_str()), -1);
158  }
159  config->security_unsecure_lease_duration(TimeDuration(duration));
160  } else if (name == "MaxParticipantsInAuthentication") {
161  const OPENDDS_STRING& value = it->second;
162  unsigned int max_participants = 0;
163  if (!DCPS::convertToInteger(value, max_participants)) {
165  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
166  ACE_TEXT("Invalid entry (%C) for MaxParticipantsInAuthentication in ")
167  ACE_TEXT("[rtps_discovery/%C] section.\n"),
168  value.c_str(), rtps_name.c_str()), -1);
169  }
170  config->max_participants_in_authentication(max_participants);
171 #endif
172  } else if (name == "LeaseExtension") {
173  const OPENDDS_STRING& value = it->second;
174  int extension = 0;
175  if (!DCPS::convertToInteger(value, extension)) {
177  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
178  ACE_TEXT("Invalid entry (%C) for LeaseExtension in ")
179  ACE_TEXT("[rtps_discovery/%C] section.\n"),
180  value.c_str(), rtps_name.c_str()), -1);
181  }
182  config->lease_extension(TimeDuration(extension));
183  } else if (name == "PB") {
184  const OPENDDS_STRING& value = it->second;
185  u_short pb = 0;
186  if (!DCPS::convertToInteger(value, pb)) {
188  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
189  ACE_TEXT("Invalid entry (%C) for PB in ")
190  ACE_TEXT("[rtps_discovery/%C] section.\n"),
191  value.c_str(), rtps_name.c_str()), -1);
192  }
193  config->pb(pb);
194  } else if (name == "DG") {
195  const OPENDDS_STRING& value = it->second;
196  u_short dg = 0;
197  if (!DCPS::convertToInteger(value, dg)) {
199  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
200  ACE_TEXT("Invalid entry (%C) for DG in ")
201  ACE_TEXT("[rtps_discovery/%C] section.\n"),
202  value.c_str(), rtps_name.c_str()), -1);
203  }
204  config->dg(dg);
205  } else if (name == "PG") {
206  const OPENDDS_STRING& value = it->second;
207  u_short pg = 0;
208  if (!DCPS::convertToInteger(value, pg)) {
210  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
211  ACE_TEXT("Invalid entry (%C) for PG in ")
212  ACE_TEXT("[rtps_discovery/%C] section.\n"),
213  value.c_str(), rtps_name.c_str()), -1);
214  }
215  config->pg(pg);
216  } else if (name == "D0") {
217  const OPENDDS_STRING& value = it->second;
218  u_short d0 = 0;
219  if (!DCPS::convertToInteger(value, d0)) {
221  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
222  ACE_TEXT("Invalid entry (%C) for D0 in ")
223  ACE_TEXT("[rtps_discovery/%C] section.\n"),
224  value.c_str(), rtps_name.c_str()), -1);
225  }
226  config->d0(d0);
227  } else if (name == "D1") {
228  const OPENDDS_STRING& value = it->second;
229  u_short d1 = 0;
230  if (!DCPS::convertToInteger(value, d1)) {
232  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
233  ACE_TEXT("Invalid entry (%C) for D1 in ")
234  ACE_TEXT("[rtps_discovery/%C] section.\n"),
235  value.c_str(), rtps_name.c_str()), -1);
236  }
237  config->d1(d1);
238  } else if (name == "DX") {
239  const OPENDDS_STRING& value = it->second;
240  u_short dx = 0;
241  if (!DCPS::convertToInteger(value, dx)) {
243  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
244  ACE_TEXT("Invalid entry (%C) for DX in ")
245  ACE_TEXT("[rtps_discovery/%C] section.\n"),
246  value.c_str(), rtps_name.c_str()), -1);
247  }
248  config->dx(dx);
249  } else if (name == "TTL") {
250  const OPENDDS_STRING& value = it->second;
251  unsigned short ttl_us = 0;
252  if (!DCPS::convertToInteger(value, ttl_us) || ttl_us > UCHAR_MAX) {
254  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
255  ACE_TEXT("Invalid entry (%C) for TTL in ")
256  ACE_TEXT("[rtps_discovery/%C] section.\n"),
257  value.c_str(), rtps_name.c_str()), -1);
258  }
259  config->ttl(static_cast<unsigned char>(ttl_us));
260  } else if (name == "SendBufferSize") {
261  const OPENDDS_STRING& value = it->second;
262  ACE_INT32 send_buffer_size = 0;
263  if (!DCPS::convertToInteger(value, send_buffer_size)) {
265  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
266  ACE_TEXT("Invalid entry (%C) for SendBufferSize in ")
267  ACE_TEXT("[rtps_discovery/%C] section.\n"),
268  value.c_str(), rtps_name.c_str()), -1);
269  }
270  config->send_buffer_size(send_buffer_size);
271  } else if (name == "RecvBufferSize") {
272  const OPENDDS_STRING& value = it->second;
273  ACE_INT32 recv_buffer_size = 0;
274  if (!DCPS::convertToInteger(value, recv_buffer_size)) {
276  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
277  ACE_TEXT("Invalid entry (%C) for RecvBufferSize in ")
278  ACE_TEXT("[rtps_discovery/%C] section.\n"),
279  value.c_str(), rtps_name.c_str()), -1);
280  }
281  config->recv_buffer_size(recv_buffer_size);
282  } else if (name == "SedpMulticast") {
283  const OPENDDS_STRING& value = it->second;
284  int smInt = 0;
285  if (!DCPS::convertToInteger(value, smInt)) {
287  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
288  ACE_TEXT("Invalid entry (%C) for SedpMulticast in ")
289  ACE_TEXT("[rtps_discovery/%C] section.\n"),
290  value.c_str(), rtps_name.c_str()), -1);
291  }
292  config->sedp_multicast(bool(smInt));
293  } else if (name == "MulticastInterface") {
294  config->multicast_interface(it->second);
295  } else if (name == "SedpLocalAddress") {
296  ACE_INET_Addr addr;
297  if (addr.set(it->second.c_str())) {
299  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
300  ACE_TEXT("failed to parse SedpLocalAddress %C\n"),
301  it->second.c_str()),
302  -1);
303  }
304  config->sedp_local_address(addr);
305  } else if (name == "SedpAdvertisedLocalAddress") {
306  ACE_INET_Addr addr;
307  if (addr.set(it->second.c_str())) {
309  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
310  ACE_TEXT("failed to parse SedpAdvertisedLocalAddress %C\n"),
311  it->second.c_str()),
312  -1);
313  }
314  config->sedp_advertised_address(addr);
315  } else if (name == "SpdpLocalAddress") {
316  ACE_INET_Addr addr;
317  if (addr.set(u_short(0), it->second.c_str())) {
319  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
320  ACE_TEXT("failed to parse SpdpLocalAddress %C\n"),
321  it->second.c_str()),
322  -1);
323  }
324  config->spdp_local_address(addr);
325  } else if (name == "SpdpRequestRandomPort") {
326  const OPENDDS_STRING& value = it->second;
327  int smInt = 0;
328  if (!DCPS::convertToInteger(value, smInt)) {
330  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
331  ACE_TEXT("Invalid entry (%C) for SpdpRequestRandomPort in ")
332  ACE_TEXT("[rtps_discovery/%C] section.\n"),
333  value.c_str(), rtps_name.c_str()), -1);
334  }
335  config->spdp_request_random_port(bool(smInt));
336  } else if (name == "GuidInterface") {
337  config->guid_interface(it->second);
338  } else if (name == "InteropMulticastOverride") {
339  /// FUTURE: handle > 1 group.
340  ACE_INET_Addr addr;
341  if (addr.set(u_short(0), it->second.c_str())) {
343  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
344  ACE_TEXT("failed to parse InteropMulticastOverride %C\n"),
345  it->second.c_str()),
346  -1);
347  }
348  config->default_multicast_group(addr);
349  } else if (name == "SpdpSendAddrs") {
351  const OPENDDS_STRING& value = it->second;
352  size_t i = 0;
353  do {
354  i = value.find_first_not_of(' ', i); // skip spaces
355  const size_t n = value.find_first_of(", ", i);
356  spdp_send_addrs.push_back(value.substr(i, (n == OPENDDS_STRING::npos) ? n : n - i));
357  i = value.find(',', i);
358  } while (i++ != OPENDDS_STRING::npos); // skip past comma if there is one
359  config->spdp_send_addrs(spdp_send_addrs);
360  } else if (name == "SpdpRtpsRelayAddress") {
361  ACE_INET_Addr addr;
362  if (addr.set(it->second.c_str())) {
364  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
365  ACE_TEXT("failed to parse SpdpRtpsRelayAddress %C\n"),
366  it->second.c_str()),
367  -1);
368  }
369  config->spdp_rtps_relay_address(addr);
370  } else if (name == "SpdpRtpsRelayBeaconPeriod") {
372  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
373  ACE_TEXT("Entry SpdpRtpsRelayBeaconPeriod is deprecated and will be ignored.\n")));
374  } else if (name == "SpdpRtpsRelaySendPeriod") {
375  const OPENDDS_STRING& value = it->second;
376  int period = 0;
377  if (!DCPS::convertToInteger(value, period)) {
379  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
380  ACE_TEXT("Invalid entry (%C) for SpdpRtpsRelaySendPeriod in ")
381  ACE_TEXT("[rtps_discovery/%C] section.\n"),
382  value.c_str(), rtps_name.c_str()), -1);
383  }
384  config->spdp_rtps_relay_send_period(TimeDuration(period));
385  } else if (name == "SedpRtpsRelayAddress") {
386  ACE_INET_Addr addr;
387  if (addr.set(it->second.c_str())) {
389  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
390  ACE_TEXT("failed to parse SedpRtpsRelayAddress %C\n"),
391  it->second.c_str()),
392  -1);
393  }
394  config->sedp_rtps_relay_address(addr);
395  } else if (name == "SedpRtpsRelayBeaconPeriod") {
397  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
398  ACE_TEXT("Entry SedpRtpsRelayBeaconPeriod is deprecated and will be ignored.\n")));
399  } else if (name == "RtpsRelayOnly") {
400  const OPENDDS_STRING& value = it->second;
401  int smInt = 0;
402  if (!DCPS::convertToInteger(value, smInt)) {
404  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
405  ACE_TEXT("Invalid entry (%C) for RtpsRelayOnly in ")
406  ACE_TEXT("[rtps_discovery/%C] section.\n"),
407  value.c_str(), rtps_name.c_str()), -1);
408  }
409  config->rtps_relay_only(bool(smInt));
410  } else if (name == "UseRtpsRelay") {
411  const OPENDDS_STRING& value = it->second;
412  int smInt = 0;
413  if (!DCPS::convertToInteger(value, smInt)) {
415  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
416  ACE_TEXT("Invalid entry (%C) for UseRtpsRelay in ")
417  ACE_TEXT("[rtps_discovery/%C] section.\n"),
418  value.c_str(), rtps_name.c_str()), -1);
419  }
420  config->use_rtps_relay(bool(smInt));
421 #ifdef OPENDDS_SECURITY
422  } else if (name == "SpdpStunServerAddress") {
423  ACE_INET_Addr addr;
424  if (addr.set(it->second.c_str())) {
426  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
427  ACE_TEXT("failed to parse SpdpStunServerAddress %C\n"),
428  it->second.c_str()),
429  -1);
430  }
431  config->spdp_stun_server_address(addr);
432  } else if (name == "SedpStunServerAddress") {
433  ACE_INET_Addr addr;
434  if (addr.set(it->second.c_str())) {
436  ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config(): ")
437  ACE_TEXT("failed to parse SedpStunServerAddress %C\n"),
438  it->second.c_str()),
439  -1);
440  }
441  config->sedp_stun_server_address(addr);
442  } else if (name == "UseIce") {
443  const OPENDDS_STRING& value = it->second;
444  int smInt = 0;
445  if (!DCPS::convertToInteger(value, smInt)) {
447  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
448  ACE_TEXT("Invalid entry (%C) for UseIce in ")
449  ACE_TEXT("[rtps_discovery/%C] section.\n"),
450  value.c_str(), rtps_name.c_str()), -1);
451  }
452  config->use_ice(bool(smInt));
453  if (smInt && !TheServiceParticipant->get_security()) {
454  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Security must be enabled (-DCPSSecurity 1) when using ICE (UseIce)\n")), -1);
455  }
456  } else if (name == "IceTa") {
457  // In milliseconds.
458  const OPENDDS_STRING& string_value = it->second;
459  int int_value = 0;
460  if (DCPS::convertToInteger(string_value, int_value)) {
462  } else {
464  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
465  ACE_TEXT("Invalid entry (%C) for IceTa in ")
466  ACE_TEXT("[rtps_discovery/%C] section.\n"),
467  string_value.c_str(), rtps_name.c_str()), -1);
468  }
469  } else if (name == "IceConnectivityCheckTTL") {
470  // In seconds.
471  const OPENDDS_STRING& string_value = it->second;
472  int int_value = 0;
473  if (DCPS::convertToInteger(string_value, int_value)) {
475  } else {
477  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
478  ACE_TEXT("Invalid entry (%C) for IceConnectivityCheckTTL in ")
479  ACE_TEXT("[rtps_discovery/%C] section.\n"),
480  string_value.c_str(), rtps_name.c_str()), -1);
481  }
482  } else if (name == "IceChecklistPeriod") {
483  // In seconds.
484  const OPENDDS_STRING& string_value = it->second;
485  int int_value = 0;
486  if (DCPS::convertToInteger(string_value, int_value)) {
488  } else {
490  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
491  ACE_TEXT("Invalid entry (%C) for IceChecklistPeriod in ")
492  ACE_TEXT("[rtps_discovery/%C] section.\n"),
493  string_value.c_str(), rtps_name.c_str()), -1);
494  }
495  } else if (name == "IceIndicationPeriod") {
496  // In seconds.
497  const OPENDDS_STRING& string_value = it->second;
498  int int_value = 0;
499  if (DCPS::convertToInteger(string_value, int_value)) {
501  } else {
503  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
504  ACE_TEXT("Invalid entry (%C) for IceIndicationPeriod in ")
505  ACE_TEXT("[rtps_discovery/%C] section.\n"),
506  string_value.c_str(), rtps_name.c_str()), -1);
507  }
508  } else if (name == "IceNominatedTTL") {
509  // In seconds.
510  const OPENDDS_STRING& string_value = it->second;
511  int int_value = 0;
512  if (DCPS::convertToInteger(string_value, int_value)) {
514  } else {
516  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
517  ACE_TEXT("Invalid entry (%C) for IceNominatedTTL in ")
518  ACE_TEXT("[rtps_discovery/%C] section.\n"),
519  string_value.c_str(), rtps_name.c_str()), -1);
520  }
521  } else if (name == "IceServerReflexiveAddressPeriod") {
522  // In seconds.
523  const OPENDDS_STRING& string_value = it->second;
524  int int_value = 0;
525  if (DCPS::convertToInteger(string_value, int_value)) {
527  } else {
529  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
530  ACE_TEXT("Invalid entry (%C) for IceServerReflexiveAddressPeriod in ")
531  ACE_TEXT("[rtps_discovery/%C] section.\n"),
532  string_value.c_str(), rtps_name.c_str()), -1);
533  }
534  } else if (name == "IceServerReflexiveIndicationCount") {
535  const OPENDDS_STRING& string_value = it->second;
536  int int_value = 0;
537  if (DCPS::convertToInteger(string_value, int_value)) {
539  } else {
541  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
542  ACE_TEXT("Invalid entry (%C) for IceServerReflexiveIndicationCount in ")
543  ACE_TEXT("[rtps_discovery/%C] section.\n"),
544  string_value.c_str(), rtps_name.c_str()), -1);
545  }
546  } else if (name == "IceDeferredTriggeredCheckTTL") {
547  // In seconds.
548  const OPENDDS_STRING& string_value = it->second;
549  int int_value = 0;
550  if (DCPS::convertToInteger(string_value, int_value)) {
552  } else {
554  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
555  ACE_TEXT("Invalid entry (%C) for IceDeferredTriggeredCheckTTL in ")
556  ACE_TEXT("[rtps_discovery/%C] section.\n"),
557  string_value.c_str(), rtps_name.c_str()), -1);
558  }
559  } else if (name == "IceChangePasswordPeriod") {
560  // In seconds.
561  const OPENDDS_STRING& string_value = it->second;
562  int int_value = 0;
563  if (DCPS::convertToInteger(string_value, int_value)) {
565  } else {
567  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
568  ACE_TEXT("Invalid entry (%C) for IceChangePasswordPeriod in ")
569  ACE_TEXT("[rtps_discovery/%C] section.\n"),
570  string_value.c_str(), rtps_name.c_str()), -1);
571  }
572  } else if (name == "MaxAuthTime") {
573  // In seconds.
574  const OPENDDS_STRING& string_value = it->second;
575  int int_value = 0;
576  if (DCPS::convertToInteger(string_value, int_value)) {
577  config->max_auth_time(TimeDuration(int_value));
578  } else {
580  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
581  ACE_TEXT("Invalid entry (%C) for MaxAuthTime in ")
582  ACE_TEXT("[rtps_discovery/%C] section.\n"),
583  string_value.c_str(), rtps_name.c_str()), -1);
584  }
585  } else if (name == "AuthResendPeriod") {
586  // In seconds.
587  const OPENDDS_STRING& string_value = it->second;
588  double double_value = 0.0;
589  if (DCPS::convertToDouble(string_value, double_value)) {
590  config->auth_resend_period(TimeDuration::from_double(double_value));
591  } else {
593  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
594  ACE_TEXT("Invalid entry (%C) for AuthResendPeriod in ")
595  ACE_TEXT("[rtps_discovery/%C] section.\n"),
596  string_value.c_str(), rtps_name.c_str()), -1);
597  }
598 #endif /* OPENDDS_SECURITY */
599  } else if (name == "MaxSpdpSequenceMsgResetChecks") {
600  const OPENDDS_STRING& string_value = it->second;
601  u_short value = 0;
602  if (DCPS::convertToInteger(string_value, value)) {
603  config->max_spdp_sequence_msg_reset_check(value);
604  } else {
606  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
607  ACE_TEXT("Invalid entry (%C) for MaxSpdpSequenceMsgResetChecks in ")
608  ACE_TEXT("[rtps_discovery/%C] section.\n"),
609  string_value.c_str(), rtps_name.c_str()), -1);
610  }
611  } else if (name == "SedpMaxMessageSize") {
612  const OPENDDS_STRING& string_value = it->second;
613  size_t value = 0;
614  if (DCPS::convertToInteger(string_value, value)) {
615  config->sedp_max_message_size(value);
616  } else {
618  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
619  ACE_TEXT("Invalid entry (%C) for SedpMaxMessageSize in ")
620  ACE_TEXT("[rtps_discovery/%C] section.\n"),
621  string_value.c_str(), rtps_name.c_str()), -1);
622  }
623  } else if (name == "SedpHeartbeatPeriod") {
624  const OPENDDS_STRING& string_value = it->second;
625  int value = 0;
626  if (DCPS::convertToInteger(string_value, value)) {
627  config->sedp_heartbeat_period(TimeDuration::from_msec(value));
628  } else {
630  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
631  ACE_TEXT("Invalid entry (%C) for SedpHeartbeatPeriod in ")
632  ACE_TEXT("[rtps_discovery/%C] section.\n"),
633  string_value.c_str(), rtps_name.c_str()), -1);
634  }
635  } else if (name == "SedpNakResponseDelay") {
636  const OPENDDS_STRING& string_value = it->second;
637  int value = 0;
638  if (DCPS::convertToInteger(string_value, value)) {
639  config->sedp_nak_response_delay(TimeDuration::from_msec(value));
640  } else {
642  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
643  ACE_TEXT("Invalid entry (%C) for SedpNakResponseDelay in ")
644  ACE_TEXT("[rtps_discovery/%C] section.\n"),
645  string_value.c_str(), rtps_name.c_str()), -1);
646  }
647  } else if (name == "SedpSendDelay") {
648  const OPENDDS_STRING& string_value = it->second;
649  int value = 0;
650  if (DCPS::convertToInteger(string_value, value)) {
651  config->sedp_send_delay(TimeDuration::from_msec(value));
652  } else {
654  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
655  ACE_TEXT("Invalid entry (%C) for SedpSendDelay in ")
656  ACE_TEXT("[rtps_discovery/%C] section.\n"),
657  string_value.c_str(), rtps_name.c_str()), -1);
658  }
659  } else if (name == "SedpFragmentReassemblyTimeout") {
660  const OPENDDS_STRING& string_value = it->second;
661  int value = 0;
662  if (DCPS::convertToInteger(string_value, value)) {
663  config->sedp_fragment_reassembly_timeout(TimeDuration::from_msec(value));
664  } else {
666  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
667  ACE_TEXT("Invalid entry (%C) for SedpFragmentReassemblyTimeout in ")
668  ACE_TEXT("[rtps_discovery/%C] section.\n"),
669  string_value.c_str(), rtps_name.c_str()), -1);
670  }
671  } else if (name == "SedpPassiveConnectDuration") {
672  const OPENDDS_STRING& string_value = it->second;
673  int value = 0;
674  if (DCPS::convertToInteger(string_value, value)) {
675  config->sedp_passive_connect_duration(TimeDuration::from_msec(value));
676  } else {
678  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
679  ACE_TEXT("Invalid entry (%C) for SedpPassiveConnectDuration in ")
680  ACE_TEXT("[rtps_discovery/%C] section.\n"),
681  string_value.c_str(), rtps_name.c_str()), -1);
682  }
683  } else if (name == "SecureParticipantUserData") {
684  const OPENDDS_STRING& string_value = it->second;
685  int int_value = 0;
686  if (!DCPS::convertToInteger(string_value, int_value)) {
687  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: RtpsDiscovery::Config::discovery_config: ")
688  ACE_TEXT("Invalid entry (%C) for SecureParticipantUserData in ")
689  ACE_TEXT("[rtps_discovery/%C] section.\n"),
690  string_value.c_str(), rtps_name.c_str()));
691  return -1;
692  }
693  config->secure_participant_user_data(int_value);
694  } else if (name == "Customization") {
695  if (DCPS::DCPS_debug_level > 0) {
697  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
698  ACE_TEXT("%C section has a Customization setting.\n"),
699  rtps_name.c_str()));
700  }
701  } else if (name == "UndirectedSpdp") {
702  const OPENDDS_STRING& value = it->second;
703  int smInt = 0;
704  if (!DCPS::convertToInteger(value, smInt)) {
706  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
707  ACE_TEXT("Invalid entry (%C) for UndirectedSpdp in ")
708  ACE_TEXT("[rtps_discovery/%C] section.\n"),
709  value.c_str(), rtps_name.c_str()), -1);
710  }
711  config->undirected_spdp(bool(smInt));
712  } else if (name == "PeriodicDirectedSpdp") {
713  const OPENDDS_STRING& value = it->second;
714  int smInt = 0;
715  if (!DCPS::convertToInteger(value, smInt)) {
717  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
718  ACE_TEXT("Invalid entry (%C) for PeriodicDirectedSpdp in ")
719  ACE_TEXT("[rtps_discovery/%C] section.\n"),
720  value.c_str(), rtps_name.c_str()), -1);
721  }
722  config->periodic_directed_spdp(bool(smInt));
723  } else if (name == "TypeLookupServiceReplyTimeout") {
724  const OPENDDS_STRING& value = it->second;
725  int timeout = 0;
726  if (!DCPS::convertToInteger(value, timeout)) {
728  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
729  ACE_TEXT("Invalid entry (%C) for TypeLookupServiceReplyTimeout in ")
730  ACE_TEXT("[rtps_discovery/%C] section.\n"),
731  value.c_str(), rtps_name.c_str()), -1);
732  }
733  config->max_type_lookup_service_reply_period(TimeDuration::from_msec(timeout));
734  } else if (name == "UseXTypes") {
735  const OPENDDS_STRING& value = it->second;
736  int smInt = 0;
737  bool valid = value.size() == 1 && DCPS::convertToInteger(value, smInt) &&
738  smInt >= 0 && smInt <= 2;
739  if (valid) {
740  config->use_xtypes(static_cast<RtpsDiscoveryConfig::UseXTypes>(smInt));
741  } else {
742  valid = config->use_xtypes(value.c_str());
743  }
744  if (!valid) {
746  "(%P|%t) RtpsDiscovery::Config::discovery_config: "
747  "Invalid entry (%C) for UseXTypes in "
748  "[rtps_discovery/%C] section.\n",
749  value.c_str(), rtps_name.c_str()));
750  return -1;
751  }
752  } else if (name == "SedpResponsiveMode") {
753  const OPENDDS_STRING& value = it->second;
754  int smInt = 0;
755  if (!DCPS::convertToInteger(value, smInt)) {
757  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
758  ACE_TEXT("Invalid entry (%C) for SedpResponsiveMode in ")
759  ACE_TEXT("[rtps_discovery/%C] section.\n"),
760  value.c_str(), rtps_name.c_str()), -1);
761  }
762  config->sedp_responsive_mode(bool(smInt));
763  } else if (name == "SedpReceivePreallocatedMessageBlocks") {
764  const String& string_value = it->second;
765  size_t value;
766  if (DCPS::convertToInteger(string_value, value)) {
767  config->sedp_receive_preallocated_message_blocks(value);
768  } else {
770  "(%P|%t) RtpsDiscovery::Config::discovery_config(): "
771  "Invalid entry (%C) for SedpReceivePreallocatedMessageBlocks in "
772  "[rtps_discovery/%C] section.\n",
773  string_value.c_str(), rtps_name.c_str()), -1);
774  }
775  } else if (name == "SedpReceivePreallocatedDataBlocks") {
776  const String& string_value = it->second;
777  size_t value = 0;
778  if (DCPS::convertToInteger(string_value, value)) {
779  config->sedp_receive_preallocated_data_blocks(value);
780  } else {
782  "(%P|%t) RtpsDiscovery::Config::discovery_config(): "
783  "Invalid entry (%C) for SedpReceivePreallocatedDataBlocks in "
784  "[rtps_discovery/%C] section.\n",
785  string_value.c_str(), rtps_name.c_str()), -1);
786  }
787  } else if (name == "CheckSourceIp") {
788  const OPENDDS_STRING& value = it->second;
789  int smInt = 0;
790  if (!DCPS::convertToInteger(value, smInt)) {
792  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
793  ACE_TEXT("Invalid entry (%C) for CheckSourceIp in ")
794  ACE_TEXT("[rtps_discovery/%C] section.\n"),
795  value.c_str(), rtps_name.c_str()), -1);
796  }
797  config->check_source_ip(bool(smInt));
798  } else {
800  ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
801  ACE_TEXT("Unexpected entry (%C) in [rtps_discovery/%C] section.\n"),
802  name.c_str(), rtps_name.c_str()), -1);
803  }
804  }
805 
806  TheServiceParticipant->add_discovery(discovery);
807  }
808  }
809 
810  // If the default RTPS discovery object has not been configured,
811  // instantiate it now.
812  const DCPS::Service_Participant::RepoKeyDiscoveryMap& discoveryMap = TheServiceParticipant->discoveryMap();
813  if (discoveryMap.find(Discovery::DEFAULT_RTPS) == discoveryMap.end()) {
814  TheServiceParticipant->add_discovery(OpenDDS::DCPS::make_rch<RtpsDiscovery>(Discovery::DEFAULT_RTPS));
815  }
816 
817  return 0;
818 }
819 
820 // Participant operations:
823 {
826  const OPENDDS_STRING guid_interface = config_->guid_interface();
827  if (!guid_interface.empty()) {
828  if (guid_gen_.interfaceName(guid_interface.c_str()) != 0) {
830  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::generate_participant_guid()"
831  " - attempt to use network interface %C MAC addr for"
832  " GUID generation failed.\n", guid_interface.c_str()));
833  }
834  }
835  }
836  guid_gen_.populate(id);
837  id.entityId = ENTITYID_PARTICIPANT;
838  return id;
839 }
840 
843  const DDS::DomainParticipantQos& qos,
845 {
846  DCPS::AddDomainStatus ads = {OpenDDS::DCPS::GUID_t(), false /*federated*/};
847  {
849  const OPENDDS_STRING guid_interface = config_->guid_interface();
850  if (!guid_interface.empty()) {
851  if (guid_gen_.interfaceName(guid_interface.c_str()) != 0) {
853  ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant()"
854  " - attempt to use specific network interface %C MAC addr for"
855  " GUID generation failed.\n", guid_interface.c_str()));
856  }
857  }
858  }
859  guid_gen_.populate(ads.id);
860  }
862  try {
863  const DCPS::RcHandle<Spdp> spdp(DCPS::make_rch<Spdp>(domain, ref(ads.id), qos, this, tls));
864  // ads.id may change during Spdp constructor
866  participants_[domain][ads.id] = spdp;
867  } catch (const std::exception& e) {
868  ads.id = GUID_UNKNOWN;
869  ACE_ERROR((LM_ERROR, "(%P|%t) RtpsDiscovery::add_domain_participant() - "
870  "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
871  e.what()));
872  }
873  return ads;
874 }
875 
876 #if defined(OPENDDS_SECURITY)
879  DDS::DomainId_t domain,
880  const DDS::DomainParticipantQos& qos,
882  const OpenDDS::DCPS::GUID_t& guid,
886 {
887  DCPS::AddDomainStatus ads = {guid, false /*federated*/};
889  try {
890  const DCPS::RcHandle<Spdp> spdp(DCPS::make_rch<Spdp>(
891  domain, ads.id, qos, this, tls, id, perm, part_crypto));
893  participants_[domain][ads.id] = spdp;
894  } catch (const std::exception& e) {
895  ads.id = GUID_UNKNOWN;
896  ACE_ERROR((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant_secure() - "
897  "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
898  e.what()));
899  }
900  return ads;
901 }
902 #endif
903 
904 void
906  const OpenDDS::DCPS::GUID_t& part_id,
908 {
909  get_part(domain_id, part_id)->signal_liveliness(kind);
910 }
911 
912 void
914 {
916  const bool before = config->rtps_relay_only();
917  config->rtps_relay_only(after);
918 
919  if (before != after) {
921  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
922  dom_pos != dom_limit; ++dom_pos) {
923  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
924  part_pos->second->rtps_relay_only_now(after);
925  }
926  }
927  }
928 }
929 
930 void
932 {
934  const bool before = config->use_rtps_relay();
935  config->use_rtps_relay(after);
936 
937  if (before != after) {
939  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
940  dom_pos != dom_limit; ++dom_pos) {
941  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
942  part_pos->second->use_rtps_relay_now(after);
943  }
944  }
945  }
946 }
947 
948 void
950 {
952  const bool before = config->use_ice();
953  config->use_ice(after);
954 
955  if (before != after) {
957  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
958  dom_pos != dom_limit; ++dom_pos) {
959  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
960  part_pos->second->use_ice_now(after);
961  }
962  }
963  }
964 }
965 
966 #ifdef OPENDDS_SECURITY
969  const DCPS::GUID_t& local_participant,
970  const DCPS::GUID_t& remote_participant) const
971 {
972  ParticipantHandle p = get_part(domain, local_participant);
973  if (p) {
974  if (remote_participant == GUID_UNKNOWN || remote_participant == local_participant) {
975  return p->crypto_handle();
976  } else {
977  return p->remote_crypto_handle(remote_participant);
978  }
979  }
980 
981  return DDS::HANDLE_NIL;
982 }
983 
984 #endif
985 
987 {
988  TheServiceParticipant->register_discovery_type("rtps_discovery", new Config);
989 }
990 
991 u_short
993  const DCPS::GUID_t& local_participant) const
994 {
995  ParticipantHandle p = get_part(domain, local_participant);
996  if (p) {
997  return p->get_spdp_port();
998  }
999 
1000  return 0;
1001 }
1002 
1003 u_short
1005  const DCPS::GUID_t& local_participant) const
1006 {
1007  ParticipantHandle p = get_part(domain, local_participant);
1008  if (p) {
1009  return p->get_sedp_port();
1010  }
1011 
1012  return 0;
1013 }
1014 
1015 #ifdef ACE_HAS_IPV6
1016 
1017 u_short
1018 RtpsDiscovery::get_ipv6_spdp_port(DDS::DomainId_t domain,
1019  const DCPS::GUID_t& local_participant) const
1020 {
1021  ParticipantHandle p = get_part(domain, local_participant);
1022  if (p) {
1023  return p->get_ipv6_spdp_port();
1024  }
1025 
1026  return 0;
1027 }
1028 
1029 u_short
1030 RtpsDiscovery::get_ipv6_sedp_port(DDS::DomainId_t domain,
1031  const DCPS::GUID_t& local_participant) const
1032 {
1033  ParticipantHandle p = get_part(domain, local_participant);
1034  if (p) {
1035  return p->get_ipv6_sedp_port();
1036  }
1037 
1038  return 0;
1039 }
1040 #endif
1041 
1042 void
1044 {
1046  const ACE_INET_Addr prev = config->spdp_rtps_relay_address();
1047  if (prev == address) {
1048  return;
1049  }
1050 
1051  config->spdp_rtps_relay_address(address);
1052 
1053  if (address == ACE_INET_Addr()) {
1054  return;
1055  }
1056 
1058  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
1059  dom_pos != dom_limit; ++dom_pos) {
1060  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
1061  part_pos->second->spdp_rtps_relay_address_change();
1062  }
1063  }
1064 }
1065 
1066 void
1068 {
1070  config->sedp_rtps_relay_address(address);
1071 
1073  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
1074  dom_pos != dom_limit; ++dom_pos) {
1075  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
1076  part_pos->second->sedp_rtps_relay_address(address);
1077  }
1078  }
1079 }
1080 
1081 void
1083 {
1084  get_config()->spdp_stun_server_address(address);
1085 }
1086 
1087 void
1089 {
1090  get_config()->sedp_stun_server_address(address);
1091 
1093  for (DomainParticipantMap::const_iterator dom_pos = participants_.begin(), dom_limit = participants_.end();
1094  dom_pos != dom_limit; ++dom_pos) {
1095  for (ParticipantMap::const_iterator part_pos = dom_pos->second.begin(), part_limit = dom_pos->second.end(); part_pos != part_limit; ++part_pos) {
1096  part_pos->second->sedp_stun_server_address(address);
1097  }
1098  }
1099 }
1100 
1101 void
1103  const DCPS::GUID_t& local_participant,
1105 {
1106  ParticipantHandle p = get_part(domain, local_participant);
1107  if (p) {
1108  p->append_transport_statistics(seq);
1109  }
1110 }
1111 
1113 {
1114  DDS::Subscriber_var bit_subscriber;
1115 #ifndef DDS_HAS_MINIMUM_BIT
1116  if (!TheServiceParticipant->get_BIT()) {
1117  DCPS::RcHandle<DCPS::BitSubscriber> bit_subscriber_rch = DCPS::make_rch<DCPS::BitSubscriber>();
1118  get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber_rch);
1120  }
1121 
1122  if (create_bit_topics(participant) != DDS::RETCODE_OK) {
1124  }
1125 
1126  bit_subscriber =
1128  DDS::SubscriberListener::_nil(),
1130  DCPS::SubscriberImpl* sub = dynamic_cast<DCPS::SubscriberImpl*>(bit_subscriber.in());
1131  if (sub == 0) {
1132  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
1133  ACE_TEXT(" - Could not cast Subscriber to SubscriberImpl\n")));
1135  }
1136 
1137  DDS::DataReaderQos dr_qos;
1138  sub->get_default_datareader_qos(dr_qos);
1139  dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
1140 
1141  dr_qos.reader_data_lifecycle.autopurge_nowriter_samples_delay =
1142  TheServiceParticipant->bit_autopurge_nowriter_samples_delay();
1143  dr_qos.reader_data_lifecycle.autopurge_disposed_samples_delay =
1144  TheServiceParticipant->bit_autopurge_disposed_samples_delay();
1145 
1146  DDS::TopicDescription_var bit_part_topic =
1149  sub, dr_qos);
1150 
1151  DDS::TopicDescription_var bit_topic_topic =
1154  sub, dr_qos);
1155 
1156  DDS::TopicDescription_var bit_pub_topic =
1159  sub, dr_qos);
1160 
1161  DDS::TopicDescription_var bit_sub_topic =
1164  sub, dr_qos);
1165 
1166  DDS::TopicDescription_var bit_part_loc_topic =
1169  sub, dr_qos);
1170 
1171  DDS::TopicDescription_var bit_connection_record_topic =
1173  create_bit_dr(bit_connection_record_topic, DCPS::BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE,
1174  sub, dr_qos);
1175 
1176  DDS::TopicDescription_var bit_internal_thread_topic =
1178  create_bit_dr(bit_internal_thread_topic, DCPS::BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE,
1179  sub, dr_qos);
1180 
1181  const DDS::ReturnCode_t ret = bit_subscriber->enable();
1182  if (ret != DDS::RETCODE_OK) {
1183  if (DCPS_debug_level) {
1184  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
1185  ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
1186  }
1188  }
1189 #endif /* DDS_HAS_MINIMUM_BIT */
1190 
1191  DCPS::RcHandle<DCPS::BitSubscriber> bit_subscriber_rch = DCPS::make_rch<DCPS::BitSubscriber>(bit_subscriber);
1192  get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber_rch);
1193 
1194  return bit_subscriber_rch;
1195 }
1196 
1198 {
1199  get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
1200 }
1201 
1203  DDS::DomainId_t /*domainId*/, const GUID_t& /*participantId*/)
1204 {
1205  return false; // This is just for DCPSInfoRepo?
1206 }
1207 
1209  DDS::DomainId_t domain_id, const GUID_t& participantId)
1210 {
1211  // Use reference counting to ensure participant
1212  // does not get deleted until lock as been released.
1213  ParticipantHandle participant;
1215  DomainParticipantMap::iterator domain = participants_.find(domain_id);
1216  if (domain == participants_.end()) {
1217  return false;
1218  }
1219  ParticipantMap::iterator part = domain->second.find(participantId);
1220  if (part == domain->second.end()) {
1221  return false;
1222  }
1223  participant = part->second;
1224  domain->second.erase(part);
1225  if (domain->second.empty()) {
1226  participants_.erase(domain);
1227  }
1228  g.release();
1229 
1230  participant->shutdown();
1231  return true;
1232 }
1233 
1235  DDS::DomainId_t domain, const GUID_t& myParticipantId, const GUID_t& ignoreId)
1236 {
1237  get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
1238  return true;
1239 }
1240 
1242  DDS::DomainId_t domain, const GUID_t& myParticipantId, const GUID_t& removeId)
1243 {
1244  get_part(domain, myParticipantId)->remove_domain_participant(removeId);
1245  return true;
1246 }
1247 
1249  DDS::DomainId_t domain, const GUID_t& participant, const DDS::DomainParticipantQos& qos)
1250 {
1251  return get_part(domain, participant)->update_domain_participant_qos(qos);
1252 }
1253 
1254 bool RtpsDiscovery::has_domain_participant(DDS::DomainId_t domain, const GUID_t& local, const GUID_t& remote) const
1255 {
1256  return get_part(domain, local)->has_domain_participant(remote);
1257 }
1258 
1260  GUID_t& topicId,
1261  DDS::DomainId_t domainId,
1262  const GUID_t& participantId,
1263  const char* topicName,
1264  const char* dataTypeName,
1265  const DDS::TopicQos& qos,
1266  bool hasDcpsKey,
1267  DCPS::TopicCallbacks* topic_callbacks)
1268 {
1269  ParticipantHandle part = get_part(domainId, participantId);
1270  if (part) {
1271  return part->assert_topic(topicId, topicName,
1272  dataTypeName, qos,
1273  hasDcpsKey, topic_callbacks);
1274  }
1275  return DCPS::INTERNAL_ERROR;
1276 }
1277 
1279  DDS::DomainId_t domainId,
1280  const GUID_t& participantId,
1281  const char* topicName,
1282  CORBA::String_out dataTypeName,
1283  DDS::TopicQos_out qos,
1284  GUID_t& topicId)
1285 {
1286  ParticipantHandle part = get_part(domainId, participantId);
1287  if (part) {
1288  return part->find_topic(topicName, dataTypeName, qos, topicId);
1289  }
1290  return DCPS::INTERNAL_ERROR;
1291 }
1292 
1294  DDS::DomainId_t domainId,
1295  const GUID_t& participantId,
1296  const GUID_t& topicId)
1297 {
1298  ParticipantHandle part = get_part(domainId, participantId);
1299  if (part) {
1300  return part->remove_topic(topicId);
1301  }
1302  return DCPS::INTERNAL_ERROR;
1303 }
1304 
1305 bool RtpsDiscovery::ignore_topic(DDS::DomainId_t domainId, const GUID_t& myParticipantId,
1306  const GUID_t& ignoreId)
1307 {
1308  get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
1309  return true;
1310 }
1311 
1313  const GUID_t& participantId, const DDS::TopicQos& qos)
1314 {
1315  ParticipantHandle part = get_part(domainId, participantId);
1316  if (part) {
1317  return part->update_topic_qos(topicId, qos);
1318  }
1319  return false;
1320 }
1321 
1323  DDS::DomainId_t domainId,
1324  const GUID_t& participantId,
1325  const GUID_t& topicId,
1326  DCPS::DataWriterCallbacks_rch publication,
1327  const DDS::DataWriterQos& qos,
1328  const DCPS::TransportLocatorSeq& transInfo,
1329  const DDS::PublisherQos& publisherQos,
1330  const XTypes::TypeInformation& type_info)
1331 {
1332  return get_part(domainId, participantId)->add_publication(
1333  topicId, publication, qos, transInfo, publisherQos, type_info);
1334 }
1335 
1337  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& publicationId)
1338 {
1339  get_part(domainId, participantId)->remove_publication(publicationId);
1340  return true;
1341 }
1342 
1344  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
1345 {
1346  get_part(domainId, participantId)->ignore_publication(ignoreId);
1347  return true;
1348 }
1349 
1351  DDS::DomainId_t domainId,
1352  const GUID_t& partId,
1353  const GUID_t& dwId,
1354  const DDS::DataWriterQos& qos,
1355  const DDS::PublisherQos& publisherQos)
1356 {
1357  return get_part(domainId, partId)->update_publication_qos(dwId, qos,
1358  publisherQos);
1359 }
1360 
1362  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& dwId,
1363  const DCPS::TransportLocatorSeq& transInfo)
1364 {
1365  get_part(domainId, partId)->update_publication_locators(dwId, transInfo);
1366 }
1367 
1369  DDS::DomainId_t domainId,
1370  const GUID_t& participantId,
1371  const GUID_t& topicId,
1372  DCPS::DataReaderCallbacks_rch subscription,
1373  const DDS::DataReaderQos& qos,
1374  const DCPS::TransportLocatorSeq& transInfo,
1375  const DDS::SubscriberQos& subscriberQos,
1376  const char* filterClassName,
1377  const char* filterExpr,
1378  const DDS::StringSeq& params,
1379  const XTypes::TypeInformation& type_info)
1380 {
1381  return get_part(domainId, participantId)->add_subscription(
1382  topicId, subscription, qos, transInfo, subscriberQos, filterClassName,
1383  filterExpr, params, type_info);
1384 }
1385 
1387  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& subscriptionId)
1388 {
1389  get_part(domainId, participantId)->remove_subscription(subscriptionId);
1390  return true;
1391 }
1392 
1394  DDS::DomainId_t domainId, const GUID_t& participantId, const GUID_t& ignoreId)
1395 {
1396  get_part(domainId, participantId)->ignore_subscription(ignoreId);
1397  return true;
1398 }
1399 
1401  DDS::DomainId_t domainId,
1402  const GUID_t& partId,
1403  const GUID_t& drId,
1404  const DDS::DataReaderQos& qos,
1405  const DDS::SubscriberQos& subQos)
1406 {
1407  return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
1408 }
1409 
1411  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId, const DDS::StringSeq& params)
1412 {
1413  return get_part(domainId, partId)->update_subscription_params(subId, params);
1414 }
1415 
1417  DDS::DomainId_t domainId, const GUID_t& partId, const GUID_t& subId,
1418  const DCPS::TransportLocatorSeq& transInfo)
1419 {
1420  get_part(domainId, partId)->update_subscription_locators(subId, transInfo);
1421 }
1422 
1424  const GUID_t& partId) const
1425 {
1426  return get_part(domainId, partId)->sedp_transport_inst();
1427 }
1428 
1429 ParticipantHandle RtpsDiscovery::get_part(const DDS::DomainId_t domain_id, const GUID_t& part_id) const
1430 {
1432  const DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
1433  if (domain == participants_.end()) {
1434  return ParticipantHandle();
1435  }
1436  const ParticipantMap::const_iterator part = domain->second.find(part_id);
1437  if (part == domain->second.end()) {
1438  return ParticipantHandle();
1439  }
1440  return part->second;
1441 }
1442 
1444 {
1446  return config_;
1447 }
1448 
1449 void RtpsDiscovery::create_bit_dr(DDS::TopicDescription_ptr topic,
1450  const char* type, DCPS::SubscriberImpl* sub, const DDS::DataReaderQos& qos)
1451 {
1452  DCPS::TopicDescriptionImpl* bit_topic_i =
1453  dynamic_cast<DCPS::TopicDescriptionImpl*>(topic);
1454  if (bit_topic_i == 0) {
1455  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PeerDiscovery::create_bit_dr: ")
1456  ACE_TEXT("Could not cast TopicDescription to TopicDescriptionImpl\n")));
1457  return;
1458  }
1459 
1460  DDS::DomainParticipant_var participant = sub->get_participant();
1461  DCPS::DomainParticipantImpl* participant_i =
1462  dynamic_cast<DCPS::DomainParticipantImpl*>(participant.in());
1463  if (participant_i == 0) {
1464  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PeerDiscovery::create_bit_dr: ")
1465  ACE_TEXT("Could not cast DomainParticipant to DomainParticipantImpl\n")));
1466  return;
1467  }
1468 
1469  DCPS::TypeSupport_var type_support =
1470  Registered_Data_Types->lookup(participant, type);
1471 
1472  DDS::DataReader_var dr = type_support->create_datareader();
1473  DCPS::DataReaderImpl* dri = dynamic_cast<DCPS::DataReaderImpl*>(dr.in());
1474  if (dri == 0) {
1475  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: PeerDiscovery::create_bit_dr: ")
1476  ACE_TEXT("Could not cast DataReader to DataReaderImpl\n")));
1477  return;
1478  }
1479 
1480  dri->init(bit_topic_i, qos, 0 /*listener*/, 0 /*mask*/, participant_i, sub);
1481  dri->disable_transport();
1482  dri->enable();
1483 }
1484 
1486  DDS::DomainId_t domain, const GUID_t& local_participant,
1487  const GUID_t& remote_entity, const XTypes::TypeInformation& remote_type_info,
1488  DCPS::TypeObjReqCond& cond)
1489 {
1490  ParticipantHandle spdp = get_part(domain, local_participant);
1491  spdp->request_remote_complete_type_objects(remote_entity, remote_type_info, cond);
1492 }
1493 
1494 } // namespace DCPS
1495 } // namespace OpenDDS
1496 
RtpsDiscoveryConfig_rch config_
const char *const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE
ParticipantHandle get_part(const DDS::DomainId_t domain_id, const GUID_t &part_id) const
int discovery_config(ACE_Configuration_Heap &cf)
bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &ignoreId)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void populate(DCPS::GUID_t &container)
OPENDDS_STRING guid_interface() const
AddrVec spdp_send_addrs() const
const LogLevel::Value value
Definition: debug.cpp:61
bool remove_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &subscriptionId)
GUID_t add_subscription(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId, DCPS::DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params, const XTypes::TypeInformation &type_info)
const InstanceHandle_t HANDLE_NIL
virtual void signal_liveliness(const DDS::DomainId_t domain_id, const OpenDDS::DCPS::GUID_t &part_id, DDS::LivelinessQosPolicyKind kind)
std::string String
void server_reflexive_address_period(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:118
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE
RtpsDiscoveryConfig_rch config() const
bool update_publication_qos(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &dwId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
LM_INFO
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC
void server_reflexive_indication_count(size_t x)
Definition: RTPS/ICE/Ice.h:127
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DDS::ReturnCode_t create_bit_topics(DomainParticipantImpl *participant)
Definition: Discovery.cpp:51
bool update_subscription_params(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &subId, const DDS::StringSeq &params)
sequence< octet > key
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
bool has_domain_participant(DDS::DomainId_t domain, const GUID_t &local, const GUID_t &remote) const
bool update_subscription_qos(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &drId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subQos)
virtual DDS::Subscriber_ptr create_subscriber(const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
void indication_period(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:100
virtual const ACE_Configuration_Section_Key & root_section(void) const
RtpsDiscovery(const RepoKey &key)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
sequence< TransportLocator > TransportLocatorSeq
DDS::Security::ParticipantCryptoHandle get_crypto_handle(DDS::DomainId_t domain, const DCPS::GUID_t &local_participant, const DCPS::GUID_t &remote_participant=GUID_UNKNOWN) const
bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t &myParticipantId, const GUID_t &ignoreId)
GuidGenerator guid_gen_
Guids will be unique within this RTPS configuration.
void deferred_triggered_check_ttl(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:136
void update_publication_locators(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &dwId, const DCPS::TransportLocatorSeq &transInfo)
void checklist_period(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:91
virtual DDS::ReturnCode_t get_default_datareader_qos(DDS::DataReaderQos &qos)
int interfaceName(const char *nic)
const DDS::StatusMask DEFAULT_STATUS_MASK
DomainParticipantMap participants_
void T_a(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:73
virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant_secure(DDS::DomainId_t domain, const DDS::DomainParticipantQos &qos, XTypes::TypeLookupService_rch tls, const OpenDDS::DCPS::GUID_t &guid, DDS::Security::IdentityHandle id, DDS::Security::PermissionsHandle perm, DDS::Security::ParticipantCryptoHandle part_crypto) OPENDDS_GCC_PRE53_DISABLE_OPTIMIZATION
void create_bit_dr(DDS::TopicDescription_ptr topic, const char *type, DCPS::SubscriberImpl *sub, const DDS::DataReaderQos &qos)
const char *const BUILT_IN_PUBLICATION_TOPIC
void nominated_ttl(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:109
#define OPENDDS_STRING
RcHandle< Spdp > ParticipantHandle
Definition: RtpsDiscovery.h:35
RtpsDiscoveryConfig_rch get_config() const
DOMAINID_TYPE_NATIVE DomainId_t
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
LM_DEBUG
#define Registered_Data_Types
ACE_Thread_Mutex participants_lock_
bool attach_participant(DDS::DomainId_t domainId, const GUID_t &participantId)
void spdp_rtps_relay_address(const ACE_INET_Addr &address)
DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId)
DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const GUID_t &participantId, const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, GUID_t &topicId)
static TimeDuration from_msec(const ACE_UINT64 &ms)
void request_remote_complete_type_objects(DDS::DomainId_t domain, const GUID_t &local_participant, const GUID_t &remote_entity, const XTypes::TypeInformation &remote_type_info, DCPS::TypeObjReqCond &cond)
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE
char ACE_TCHAR
sequence< TransportStatistics > TransportStatisticsSequence
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void change_password_period(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:145
Implements the DDS::DataReader interface.
void append_transport_statistics(DDS::DomainId_t domain, const DCPS::GUID_t &local_participant, DCPS::TransportStatisticsSequence &seq)
RtpsDiscoveryConfig::AddrVec AddrVec
Definition: RtpsDiscovery.h:57
RcHandle< DCPS::BitSubscriber > init_bit(DCPS::DomainParticipantImpl *participant)
const char *const BUILT_IN_PARTICIPANT_TOPIC
void fini_bit(DCPS::DomainParticipantImpl *participant)
LM_WARNING
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC
bool ignore_topic(DDS::DomainId_t domainId, const GUID_t &myParticipantId, const GUID_t &ignoreId)
bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t &participantId)
static const ACE_TCHAR RTPS_SECTION_NAME[]
const char *const name
Definition: debug.cpp:60
int set(const ACE_INET_Addr &)
bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t &participant, const DDS::DomainParticipantQos &qos)
ACE_TEXT("TCP_Factory")
bool convertToDouble(const String &s, T &value)
const char *const BUILT_IN_PARTICIPANT_TOPIC_TYPE
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
#define SUBSCRIBER_QOS_DEFAULT
u_short get_sedp_port(DDS::DomainId_t domain, const DCPS::GUID_t &local_participant) const
RcHandle< DCPS::TransportInst > sedp_transport_inst(DDS::DomainId_t domainId, const GUID_t &partId) const
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC
bool ignore_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &ignoreId)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual DDS::DomainParticipant_ptr get_participant()
Implements the DDS::TopicDescription interface.
const char *const BUILT_IN_TOPIC_TOPIC
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
static Configuration * instance()
Definition: Ice.cpp:109
void connectivity_check_ttl(const DCPS::TimeDuration &x)
Definition: RTPS/ICE/Ice.h:82
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void sedp_stun_server_address(const ACE_INET_Addr &address)
static TimeDuration from_double(double duration)
virtual DDS::ReturnCode_t enable()
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
Definition: ConfigUtils.cpp:41
void update_subscription_locators(DDS::DomainId_t domainId, const GUID_t &partId, const GUID_t &subId, const DCPS::TransportLocatorSeq &transInfo)
const ReturnCode_t RETCODE_OK
bool update_topic_qos(const GUID_t &topicId, DDS::DomainId_t domainId, const GUID_t &participantId, const DDS::TopicQos &qos)
u_short get_spdp_port(DDS::DomainId_t domain, const DCPS::GUID_t &local_participant) const
void sedp_rtps_relay_address(const ACE_INET_Addr &address)
#define ACE_ERROR_RETURN(X, Y)
static const String ip(const ACE_INET_Addr &addr)
Definition: LogAddr.cpp:15
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Definition: ConfigUtils.cpp:17
void spdp_stun_server_address(const ACE_INET_Addr &address)
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE
Defines the interface for Discovery callbacks into the Topic.
#define TheServiceParticipant
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
LM_ERROR
bool remove_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &publicationId)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual DDS::TopicDescription_ptr lookup_topicdescription(const char *name)
DCPS::TopicStatus assert_topic(GUID_t &topicId, DDS::DomainId_t domainId, const GUID_t &participantId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, DCPS::TopicCallbacks *topic_callbacks)
LivelinessQosPolicyKind
GUID_t add_publication(DDS::DomainId_t domainId, const GUID_t &participantId, const GUID_t &topicId, DCPS::DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
key EntityId_t entityId
Definition: DdsDcpsGuid.idl:59
const char *const BUILT_IN_TOPIC_TOPIC_TYPE
const char *const BUILT_IN_PUBLICATION_TOPIC_TYPE
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool convertToInteger(const String &s, T &value)
virtual OpenDDS::DCPS::GUID_t generate_participant_guid()