OpenDDS  Snapshot(2023/04/28-20:55)
Service_Participant.cpp
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #include <DCPS/DdsDcps_pch.h> //Only the _pch include should start with DCPS/
7 
8 #include "Service_Participant.h"
9 
10 #include "Logging.h"
11 #include "WaitSet.h"
13 #include "debug.h"
14 #include "BuiltInTopicUtils.h"
15 #include "DataDurabilityCache.h"
16 #include "GuidConverter.h"
17 #include "MonitorFactory.h"
18 #include "RecorderImpl.h"
19 #include "ReplayerImpl.h"
22 #include "StaticDiscovery.h"
23 #include "ThreadStatusManager.h"
24 #include "Qos_Helper.h"
25 #include "../Version.h"
26 #ifdef OPENDDS_SECURITY
28 #endif
29 
30 #include <ace/config.h>
31 #include <ace/Singleton.h>
32 #include <ace/Arg_Shifter.h>
33 #include <ace/Reactor.h>
34 #include <ace/Select_Reactor.h>
36 #include <ace/Service_Config.h>
38 #include <ace/Auto_Ptr.h>
39 #include <ace/Sched_Params.h>
40 #include <ace/Malloc_Allocator.h>
41 #include <ace/OS_NS_unistd.h>
42 #include <ace/Version.h>
43 #include <ace/Configuration.h>
44 #include <ace/OS_NS_sys_utsname.h>
45 
46 #include <cstring>
47 #ifdef OPENDDS_SAFETY_PROFILE
48 # include <stdio.h> // <cstdio> after FaceCTS bug 623 is fixed
49 #else
50 # include <fstream>
51 #endif
52 
53 #if !defined (__ACE_INLINE__)
54 #include "Service_Participant.inl"
55 #endif /* __ACE_INLINE__ */
56 
57 namespace {
58 
59 void set_log_file_name(const char* fname)
60 {
61 #ifdef OPENDDS_SAFETY_PROFILE
62  ACE_LOG_MSG->msg_ostream(fopen(fname, "a"), true);
63 #else
64  std::ofstream* output_stream = new std::ofstream(fname, ios::app);
65  if (output_stream->bad()) {
66  delete output_stream;
67  } else {
68  ACE_LOG_MSG->msg_ostream(output_stream, true);
69  }
70 #endif
73 }
74 
75 
76 void set_log_verbose(unsigned long verbose_logging)
77 {
78  // Code copied from TAO_ORB_Core::init() in
79  // TAO version 1.6a_p13.
80 
81  typedef void (ACE_Log_Msg::*PTMF)(u_long);
82  PTMF flagop = &ACE_Log_Msg::set_flags;
83  u_long value;
84 
85  switch (verbose_logging)
86  {
87  case 0:
88  flagop = &ACE_Log_Msg::clr_flags;
90  break;
91  case 1:
92  value = ACE_Log_Msg::VERBOSE_LITE; break;
93  default:
94  value = ACE_Log_Msg::VERBOSE; break;
95  }
96 
97  (ACE_LOG_MSG->*flagop)(value);
98 }
99 
100 
101 }
102 
104 
105 namespace OpenDDS {
106 namespace DCPS {
107 
109 
110 const size_t DEFAULT_NUM_CHUNKS = 20;
111 
112 const size_t DEFAULT_CHUNK_MULTIPLIER = 10;
113 
114 const int DEFAULT_FEDERATION_RECOVERY_DURATION = 900; // 15 minutes in seconds.
115 const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS = 1; // Wait only 1 second.
116 const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER = 2; // Exponential backoff.
117 const int DEFAULT_FEDERATION_LIVELINESS = 60; // 1 minute hearbeat.
118 
119 const int BIT_LOOKUP_DURATION_MSEC = 2000;
120 
121 static ACE_TString config_fname(ACE_TEXT(""));
122 
123 static const ACE_TCHAR DEFAULT_REPO_IOR[] = ACE_TEXT("file://repo.ior");
124 
125 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
126 static const char DEFAULT_PERSISTENT_DATA_DIR[] = "OpenDDS-durable-data-dir";
127 #endif
128 
129 static const ACE_TCHAR COMMON_SECTION_NAME[] = ACE_TEXT("common");
130 static const ACE_TCHAR DOMAIN_SECTION_NAME[] = ACE_TEXT("domain");
131 static const ACE_TCHAR DOMAIN_RANGE_SECTION_NAME[] = ACE_TEXT("DomainRange");
132 static const ACE_TCHAR REPO_SECTION_NAME[] = ACE_TEXT("repository");
133 static const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
134 
135 static bool got_debug_level = false;
136 static bool got_use_rti_serialization = false;
137 static bool got_info = false;
138 static bool got_chunks = false;
140 static bool got_liveliness_factor = false;
141 static bool got_bit_transport_port = false;
142 static bool got_bit_transport_ip = false;
143 static bool got_bit_lookup_duration_msec = false;
144 static bool got_global_transport_config = false;
145 static bool got_bit_flag = false;
146 
147 #if defined(OPENDDS_SECURITY)
148 static bool got_security_flag = false;
149 static bool got_security_debug = false;
150 static bool got_security_fake_encryption = false;
151 #endif
152 
153 static bool got_publisher_content_filter = false;
154 static bool got_transport_debug_level = false;
155 static bool got_pending_timeout = false;
156 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
157 static bool got_persistent_data_dir = false;
158 #endif
159 static bool got_default_discovery = false;
160 #ifndef DDS_DEFAULT_DISCOVERY_METHOD
161 # ifdef OPENDDS_SAFETY_PROFILE
162 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_RTPS
163 # else
164 # define DDS_DEFAULT_DISCOVERY_METHOD Discovery::DEFAULT_REPO
165 # endif
166 #endif
167 static bool got_log_fname = false;
168 static bool got_log_verbose = false;
169 static bool got_default_address = false;
170 static bool got_bidir_giop = false;
171 static bool got_thread_status_interval = false;
172 static bool got_monitor = false;
173 static bool got_type_object_encoding = false;
174 static bool got_log_level = false;
175 
177  :
178 #ifndef OPENDDS_SAFETY_PROFILE
179  ORB_argv_(false /*substitute_env_args*/),
180 #endif
181  time_source_()
182  , reactor_task_(false)
183  , defaultDiscovery_(DDS_DEFAULT_DISCOVERY_METHOD)
184  , n_chunks_(DEFAULT_NUM_CHUNKS)
185  , association_chunk_multiplier_(DEFAULT_CHUNK_MULTIPLIER)
186  , liveliness_factor_(80)
187  , bit_transport_port_(0)
188  , bit_enabled_(
189 #ifdef DDS_HAS_MINIMUM_BIT
190  false
191 #else
192  true
193 #endif
194  )
195 #ifdef OPENDDS_SECURITY
196  , security_enabled_(false)
197 #endif
198  , bit_lookup_duration_msec_(BIT_LOOKUP_DURATION_MSEC)
199  , global_transport_config_(ACE_TEXT(""))
200  , monitor_factory_(0)
201  , federation_recovery_duration_(DEFAULT_FEDERATION_RECOVERY_DURATION)
202  , federation_initial_backoff_seconds_(DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS)
203  , federation_backoff_multiplier_(DEFAULT_FEDERATION_BACKOFF_MULTIPLIER)
204  , federation_liveliness_(DEFAULT_FEDERATION_LIVELINESS)
206  , pool_size_(1024 * 1024 * 16)
207  , pool_granularity_(8)
208 #endif
209  , scheduler_(-1)
210  , priority_min_(0)
211  , priority_max_(0)
212  , publisher_content_filter_(true)
213 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
214  , persistent_data_dir_(DEFAULT_PERSISTENT_DATA_DIR)
215 #endif
216  , bidir_giop_(true)
217  , monitor_enabled_(false)
218  , shut_down_(false)
219  , default_configuration_file_(ACE_TEXT(""))
220  , type_object_encoding_(Encoding_Normal)
221  , network_interface_address_topic_(make_rch<InternalTopic<NetworkInterfaceAddress> >())
222  , printer_value_writer_indent_(4)
223 {
224  initialize();
225 }
226 
228 {
229  if (DCPS_debug_level >= 1) {
230  ACE_DEBUG((LM_DEBUG, "(%P|%t) Service_Participant::~Service_Participant\n"));
231  }
232 
233  {
235  if (dp_factory_servant_) {
236  const size_t count = dp_factory_servant_->participant_count();
237  if (count > 0 && log_level >= LogLevel::Warning) {
238  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: Service_Participant::~Service_Participant: "
239  "There are %B remaining domain participant(s). "
240  "It is recommended to delete them before shutdown.\n",
241  count));
242  }
243 
244  const DDS::ReturnCode_t cleanup_status = dp_factory_servant_->delete_all_participants();
245  if (cleanup_status) {
246  if (log_level >= LogLevel::Warning) {
247  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: Service_Participant::~Service_Participant: "
248  "delete_all_participants returned %C\n",
249  retcode_to_string(cleanup_status)));
250  }
251  }
252  }
253  }
254 
255  const DDS::ReturnCode_t shutdown_status = shutdown();
256  if (shutdown_status != DDS::RETCODE_OK && shutdown_status != DDS::RETCODE_ALREADY_DELETED) {
257  if (log_level >= LogLevel::Warning) {
258  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: Service_Participant::~Service_Participant: "
259  "shutdown returned %C\n",
260  retcode_to_string(shutdown_status)));
261  }
262  }
263 }
264 
267 {
268  // Hide the template instantiation to prevent multiple instances
269  // from being created.
270 
272 }
273 
274 const TimeSource&
276 {
277  return time_source_;
278 }
279 
282 {
283  return reactor_task_.get_reactor();
284 }
285 
288 {
289  return reactor_task_.get_reactor();
290 }
291 
294 {
296 }
297 
300 {
301  return reactor_task_.interceptor();
302 }
303 
306 {
307  return job_queue_;
308 }
309 
311 {
312  if (DCPS_debug_level >= 1) {
313  ACE_DEBUG((LM_DEBUG, "(%P|%t) Service_Participant::shutdown\n"));
314  }
315 
316  if (shut_down_) {
318  }
319 
320  if (monitor_factory_) {
322  monitor_factory_ = 0;
323  }
324 
325  {
327  if (dp_factory_servant_) {
328  const size_t count = dp_factory_servant_->participant_count();
329  if (count > 0) {
330  if (log_level >= LogLevel::Notice) {
331  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: Service_Participant::shutdown: "
332  "there are %B domain participant(s) that must be deleted before shutdown can occur\n",
333  count));
334  }
336  }
337  }
338  }
339 
340  if (shutdown_listener_) {
341  shutdown_listener_->notify_shutdown();
342  }
343 
345  try {
347  {
349 
350  shut_down_ = true;
351 
352  dp_factory_servant_.reset();
353 
354  domainRepoMap_.clear();
355 
356  {
363  }
364  }
365 
366  domain_ranges_.clear();
367 
369 
370  discoveryMap_.clear();
371 
372 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
373  transient_data_cache_.reset();
374  persistent_data_cache_.reset();
375 #endif
376 
377  discovery_types_.clear();
378  }
380 #ifdef OPENDDS_SECURITY
382 #endif
383  } catch (const CORBA::Exception& ex) {
384  if (log_level >= LogLevel::Error) {
385  ex._tao_print_exception("ERROR: Service_Participant::shutdown");
386  }
387  rc = DDS::RETCODE_ERROR;
388  }
389 
390  return rc;
391 }
392 
393 #ifdef ACE_USES_WCHAR
394 DDS::DomainParticipantFactory_ptr
396  char *argv[])
397 {
398  ACE_Argv_Type_Converter converter(argc, argv);
399  return get_domain_participant_factory(converter.get_argc(),
400  converter.get_TCHAR_argv());
401 }
402 #endif
403 
404 DDS::DomainParticipantFactory_ptr
406  ACE_TCHAR *argv[])
407 {
408  if (!dp_factory_servant_) {
410 
411  shut_down_ = false;
412 
413  if (!dp_factory_servant_) {
414  // This used to be a call to ORB_init(). Since the ORB is now managed
415  // by InfoRepoDiscovery, just save the -ORB* args for later use.
416  // The exceptions are -ORBLogFile and -ORBVerboseLogging, which
417  // are processed by the service participant. This allows log control
418  // even if an ORB is not being used.
419 #ifndef OPENDDS_SAFETY_PROFILE
420  ORB_argv_.add(ACE_TEXT("unused_arg_0"));
421 #endif
422  /* NOTE ABOUT ADDING NEW OPTIONS HERE ==================================
423  *
424  * The argument parsing here is simple. It will match substrings of
425  * options even if that isn't the whole option. For example; If
426  * "-DCPSSecurity" is checked before "-DCPSSecurityDebug" and
427  * "-DCPSSecurityDebug" is passed, it will match "-DCPSSecurity". Check
428  * to make sure the order is correct.
429  *
430  * TODO/TBD: Create or make use of a stricter command line argument
431  * parsing method/library. Something where we can define the format of
432  * the argument and it will handle it better. Maybe could be integrated
433  * into the config parsing, which has most of these options.
434  */
435  ACE_Arg_Shifter shifter(argc, argv);
436  while (shifter.is_anything_left()) {
437  if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBLogFile")) == 0) {
438  shifter.ignore_arg();
439  } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORBVerboseLogging")) == 0) {
440  shifter.ignore_arg();
441  } else if (shifter.cur_arg_strncasecmp(ACE_TEXT("-ORB")) < 0) {
442  shifter.ignore_arg();
443  } else {
444 #ifndef OPENDDS_SAFETY_PROFILE
445  ORB_argv_.add(shifter.get_current());
446 #endif
447  shifter.consume_arg();
448  if (shifter.is_parameter_next()) {
449 #ifndef OPENDDS_SAFETY_PROFILE
450  ORB_argv_.add(shifter.get_current(), true /*quote_arg*/);
451 #endif
452  shifter.consume_arg();
453  }
454  }
455  }
456 
457  if (parse_args(argc, argv) != 0) {
458  return DDS::DomainParticipantFactory::_nil();
459  }
460 
463  }
464 
465  if (config_fname.is_empty()) {
466  if (DCPS_debug_level) {
468  ACE_TEXT("(%P|%t) NOTICE: not using file configuration - no configuration ")
469  ACE_TEXT("file specified.\n")));
470  }
471 
472  } else {
473  // Load configuration only if the configuration
474  // file exists.
475  FILE* in = ACE_OS::fopen(config_fname.c_str(),
476  ACE_TEXT("r"));
477 
478  if (!in) {
480  ACE_TEXT("(%P|%t) WARNING: not using file configuration - ")
481  ACE_TEXT("can not open \"%s\" for reading. %p\n"),
482  config_fname.c_str(), ACE_TEXT("fopen")));
483 
484  } else {
485  ACE_OS::fclose(in);
486 
487  if (DCPS_debug_level > 1) {
489  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::get_domain_participant_factory ")
490  ACE_TEXT("Going to load configuration from <%s>\n"),
491  config_fname.c_str()));
492  }
493 
494  if (this->load_configuration() != 0) {
496  ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: ")
497  ACE_TEXT("load_configuration() failed.\n")));
498  return DDS::DomainParticipantFactory::_nil();
499  }
500  }
501  }
502 #if OPENDDS_POOL_ALLOCATOR
503  // For non-FACE tests, configure pool
504  configure_pool();
505 #endif
506 
507  if (log_level >= LogLevel::Info) {
508  ACE_DEBUG((LM_INFO, "(%P|%t) Service_Participant::get_domain_participant_factory: "
509  "This is OpenDDS " OPENDDS_VERSION " using ACE " ACE_VERSION "\n"));
510 
511  ACE_DEBUG((LM_INFO, "(%P|%t) Service_Participant::get_domain_participant_factory: "
512  "log_level: %C DCPS_debug_level: %u\n", log_level.get_as_string(), DCPS_debug_level));
513 
515  if (ACE_OS::uname(&uname) != -1) {
516  ACE_DEBUG((LM_INFO, "(%P|%t) Service_Participant::get_domain_participant_factory: "
517  "machine: %C, %C platform: %C, %C, %C\n",
518  uname.nodename, uname.machine, uname.sysname, uname.release, uname.version));
519  }
520 
521  ACE_DEBUG((LM_INFO, "(%P|%t) Service_Participant::get_domain_participant_factory: "
522  "compiler: %C version %d.%d.%d\n",
524  }
525 
526  // Establish the default scheduling mechanism and
527  // priority here. Sadly, the ORB is already
528  // initialized so we have no influence over its
529  // scheduling or thread priority(ies).
530 
531  /// @TODO: Move ORB initialization to after the
532  /// configuration file is processed and the
533  /// initial scheduling policy and priority are
534  /// established.
535  this->initializeScheduling();
536 
537  dp_factory_servant_ = make_rch<DomainParticipantFactoryImpl>();
538 
541  "Service_Participant");
542 
543  job_queue_ = make_rch<JobQueue>(reactor_task_.get_reactor());
544 
545  if (this->monitor_enabled_) {
546 #if !defined(ACE_AS_STATIC_LIBS)
547  ACE_TString directive = ACE_TEXT("dynamic OpenDDS_Monitor Service_Object * OpenDDS_monitor:_make_MonitorFactoryImpl()");
549 #endif
550  this->monitor_factory_ =
552 
553  if (this->monitor_factory_ == 0) {
554  if (this->monitor_enabled_) {
556  ACE_TEXT("ERROR: Service_Participant::get_domain_participant_factory, ")
557  ACE_TEXT("Unable to enable monitor factory.\n")));
558  }
559  }
560  }
561 
562  if (this->monitor_factory_ == 0) {
563  // Use the stubbed factory
565  this->monitor_factory_ =
566  ACE_Dynamic_Service<MonitorFactory>::instance ("OpenDDS_Monitor_Default");
567  }
568  if (this->monitor_enabled_) {
569  this->monitor_factory_->initialize();
570  }
571 
572  this->monitor_.reset(this->monitor_factory_->create_sp_monitor(this));
573  }
574 
575 #if defined OPENDDS_LINUX_NETWORK_CONFIG_MONITOR
576  if (DCPS_debug_level >= 1) {
578  "(%P|%t) Service_Participant::get_domain_participant_factory: Creating LinuxNetworkConfigMonitor\n"));
579  }
580  network_config_monitor_ = make_rch<LinuxNetworkConfigMonitor>(reactor_task_.interceptor());
581 #elif defined(OPENDDS_NETWORK_CONFIG_MODIFIER)
582  if (DCPS_debug_level >= 1) {
584  "(%P|%t) Service_Participant::get_domain_participant_factory: Creating NetworkConfigModifier\n"));
585  }
586  network_config_monitor_ = make_rch<NetworkConfigModifier>();
587 #else
588  if (DCPS_debug_level >= 1) {
590  "(%P|%t) Service_Participant::get_domain_participant_factory: Creating DefaultNetworkConfigMonitor\n"));
591  }
592  network_config_monitor_ = make_rch<DefaultNetworkConfigMonitor>();
593 #endif
594 
596  if (!network_config_monitor_->open()) {
597  bool open_failed = false;
598 #ifdef OPENDDS_NETWORK_CONFIG_MODIFIER
599  if (DCPS_debug_level >= 1) {
601  "(%P|%t) Service_Participant::get_domain_participant_factory: Creating NetworkConfigModifier\n"));
602  }
604  network_config_monitor_ = make_rch<NetworkConfigModifier>();
606  if (!network_config_monitor_->open()) {
607  open_failed = true;
608  }
609 #else
610  open_failed = true;
611 #endif
612  if (open_failed) {
613  if (log_level >= LogLevel::Error) {
615  "(%P|%t) ERROR: Service_Participant::get_domain_participant_factory: Could not open network config monitor\n"));
616  }
620  }
621  }
622  }
623 
624  return DDS::DomainParticipantFactory::_duplicate(dp_factory_servant_.in());
625 }
626 
628 {
629  // Process logging options first, so they are in effect if we need to log
630  // while processing other options.
631  ACE_Arg_Shifter log_arg_shifter(argc, argv);
632  while (log_arg_shifter.is_anything_left()) {
633  const ACE_TCHAR* currentArg = 0;
634 
635  if ((currentArg = log_arg_shifter.get_the_parameter(ACE_TEXT("-ORBLogFile"))) != 0) {
636  set_log_file_name(ACE_TEXT_ALWAYS_CHAR(currentArg));
637  log_arg_shifter.consume_arg();
638  got_log_fname = true;
639 
640  } else if ((currentArg = log_arg_shifter.get_the_parameter(ACE_TEXT("-ORBVerboseLogging"))) != 0) {
641  set_log_verbose(ACE_OS::atoi(currentArg));
642  log_arg_shifter.consume_arg();
643  got_log_verbose = true;
644 
645  } else {
646  log_arg_shifter.ignore_arg();
647  }
648  }
649 
650  ACE_Arg_Shifter arg_shifter(argc, argv);
651  while (arg_shifter.is_anything_left()) {
652  const ACE_TCHAR* currentArg = 0;
653 
654  if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDebugLevel"))) != 0) {
655  set_DCPS_debug_level(ACE_OS::atoi(currentArg));
656  arg_shifter.consume_arg();
657  got_debug_level = true;
658 
659  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSInfoRepo"))) != 0) {
660  this->set_repo_ior(currentArg, Discovery::DEFAULT_REPO);
661  arg_shifter.consume_arg();
662  got_info = true;
663 
664  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSRTISerialization"))) != 0) {
665  if (ACE_OS::atoi(currentArg) == 0) {
667  ACE_TEXT("(%P|%t) WARNING: Service_Participant::parse_args ")
668  ACE_TEXT("Argument ignored: DCPSRTISerialization is required to be enabled\n")));
669  }
670  arg_shifter.consume_arg();
671  got_use_rti_serialization = true;
672  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunks"))) != 0) {
673  n_chunks_ = ACE_OS::atoi(currentArg);
674  arg_shifter.consume_arg();
675  got_chunks = true;
676 
677  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSChunkAssociationMultiplier"))) != 0) {
679  arg_shifter.consume_arg();
680  got_chunk_association_multiplier = true;
681 
682  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSConfigFile"))) != 0) {
683  config_fname = currentArg;
684  arg_shifter.consume_arg();
685 
686  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLivelinessFactor"))) != 0) {
687  liveliness_factor_ = ACE_OS::atoi(currentArg);
688  arg_shifter.consume_arg();
689  got_liveliness_factor = true;
690 
691  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportPort"))) != 0) {
692  /// No need to guard this insertion as we are still single
693  /// threaded here.
694  this->bit_transport_port_ = ACE_OS::atoi(currentArg);
695  arg_shifter.consume_arg();
696  got_bit_transport_port = true;
697 
698  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitTransportIPAddress"))) != 0) {
699  /// No need to guard this insertion as we are still single
700  /// threaded here.
701  this->bit_transport_ip_ = currentArg;
702  arg_shifter.consume_arg();
703  got_bit_transport_ip = true;
704 
705  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBitLookupDurationMsec"))) != 0) {
707  arg_shifter.consume_arg();
708  got_bit_lookup_duration_msec = true;
709 
710  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSGlobalTransportConfig"))) != 0) {
711  global_transport_config_ = currentArg;
712  arg_shifter.consume_arg();
713  got_global_transport_config = true;
714 
715  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBit"))) != 0) {
716  bit_enabled_ = ACE_OS::atoi(currentArg);
717  arg_shifter.consume_arg();
718  got_bit_flag = true;
719 
720  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTransportDebugLevel"))) != 0) {
722  arg_shifter.consume_arg();
723  got_transport_debug_level = true;
724 
725 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
726  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPersistentDataDir"))) != 0) {
727  this->persistent_data_dir_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
728  arg_shifter.consume_arg();
729  got_persistent_data_dir = true;
730 #endif
731 
732  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPendingTimeout"))) != 0) {
734  arg_shifter.consume_arg();
735  got_pending_timeout = true;
736 
737  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSPublisherContentFilter"))) != 0) {
738  this->publisher_content_filter_ = ACE_OS::atoi(currentArg);
739  arg_shifter.consume_arg();
740  got_publisher_content_filter = true;
741 
742  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultDiscovery"))) != 0) {
743  this->defaultDiscovery_ = ACE_TEXT_ALWAYS_CHAR(currentArg);
744  arg_shifter.consume_arg();
745  got_default_discovery = true;
746 
747  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSBidirGIOP"))) != 0) {
748  bidir_giop_ = ACE_OS::atoi(currentArg);
749  arg_shifter.consume_arg();
750  got_bidir_giop = true;
751 
752  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSThreadStatusInterval"))) != 0) {
754  arg_shifter.consume_arg();
755  got_thread_status_interval = true;
756 
757  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationRecoveryDuration"))) != 0) {
758  this->federation_recovery_duration_ = ACE_OS::atoi(currentArg);
759  arg_shifter.consume_arg();
760 
761  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationInitialBackoffSeconds"))) != 0) {
763  arg_shifter.consume_arg();
764 
765  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationBackoffMultiplier"))) != 0) {
766  this->federation_backoff_multiplier_ = ACE_OS::atoi(currentArg);
767  arg_shifter.consume_arg();
768 
769  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-FederationLivelinessDuration"))) != 0) {
770  this->federation_liveliness_ = ACE_OS::atoi(currentArg);
771  arg_shifter.consume_arg();
772 
773  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSDefaultAddress"))) != 0) {
774  ACE_INET_Addr addr;
775  if (addr.set(u_short(0), currentArg)) {
777  ACE_TEXT("(%P|%t) ERROR: Service_Participant::parse_args: ")
778  ACE_TEXT("failed to parse default address %C\n"),
779  currentArg),
780  -1);
781  }
783  arg_shifter.consume_arg();
784  got_default_address = true;
785 
786  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSMonitor"))) != 0) {
787  this->monitor_enabled_ = ACE_OS::atoi(currentArg);
788  arg_shifter.consume_arg();
789  got_monitor = true;
790 
791 #if defined(OPENDDS_SECURITY)
792  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurityDebugLevel"))) != 0) {
794  arg_shifter.consume_arg();
795  got_security_debug = true;
796 
797  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurityDebug"))) != 0) {
798  security_debug.parse_flags(currentArg);
799  arg_shifter.consume_arg();
800  got_security_debug = true;
801 
802  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurityFakeEncryption"))) != 0) {
804  arg_shifter.consume_arg();
805  got_security_fake_encryption = true;
806 
807  // Must be last "-DCPSSecurity*" option, see comment above this arg parsing loop
808  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSSecurity"))) != 0) {
809  security_enabled_ = ACE_OS::atoi(currentArg);
810  arg_shifter.consume_arg();
811  got_security_flag = true;
812 
813 #endif
814  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSTypeObjectEncoding"))) != 0) {
816  arg_shifter.consume_arg();
817  got_type_object_encoding = true;
818 
819  } else if ((currentArg = arg_shifter.get_the_parameter(ACE_TEXT("-DCPSLogLevel"))) != 0) {
821  arg_shifter.consume_arg();
822  got_log_level = true;
823 
824  } else {
825  arg_shifter.ignore_arg();
826  }
827  }
828  // Indicates successful parsing of the command line
829  return 0;
830 }
831 
832 void
834 {
837 
839 
841 
845 
847 
849 
852 
854 
856 
858 
860 
862 
864 
866 
868 
869  // Will get interpreted based on how the type was annotated.
871 
873 
875 
879 
881 
883 
888 
890 
895 
900 }
901 
902 void
904 {
905  //
906  // Establish the scheduler if specified.
907  //
908  if (this->schedulerString_.length() == 0) {
909  if (DCPS_debug_level > 0) {
911  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::intializeScheduling() - ")
912  ACE_TEXT("no scheduling policy specified, not setting policy.\n")));
913  }
914 
915  } else {
916  //
917  // Translate the scheduling policy to a usable value.
918  //
919  int ace_scheduler = ACE_SCHED_OTHER;
920  this->scheduler_ = THR_SCHED_DEFAULT;
921 
922  if (this->schedulerString_ == ACE_TEXT("SCHED_RR")) {
923  this->scheduler_ = THR_SCHED_RR;
924  ace_scheduler = ACE_SCHED_RR;
925 
926  } else if (this->schedulerString_ == ACE_TEXT("SCHED_FIFO")) {
927  this->scheduler_ = THR_SCHED_FIFO;
928  ace_scheduler = ACE_SCHED_FIFO;
929 
930  } else if (this->schedulerString_ == ACE_TEXT("SCHED_OTHER")) {
931  this->scheduler_ = THR_SCHED_DEFAULT;
932  ace_scheduler = ACE_SCHED_OTHER;
933 
934  } else {
936  ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
937  ACE_TEXT("unrecognized scheduling policy: %s, set to SCHED_OTHER.\n"),
938  this->schedulerString_.c_str()));
939  }
940 
941  //
942  // Attempt to set the scheduling policy.
943  //
944 #ifdef ACE_WIN32
946  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::initializeScheduling() - ")
947  ACE_TEXT("scheduling is not implemented on Win32.\n")));
948  ACE_UNUSED_ARG(ace_scheduler);
949 #else
950  ACE_Sched_Params params(
951  ace_scheduler,
952  ACE_Sched_Params::priority_min(ace_scheduler),
955 
956  if (ACE_OS::sched_params(params) != 0) {
957  if (ACE_OS::last_error() == EPERM) {
959  ACE_TEXT("(%P|%t) WARNING: Service_Participant::initializeScheduling() - ")
960  ACE_TEXT("user is not superuser, requested scheduler not set.\n")));
961 
962  } else {
964  ACE_TEXT("(%P|%t) ERROR: Service_Participant::initializeScheduling() - ")
965  ACE_TEXT("sched_params failed: %m.\n")));
966  }
967 
968  // Reset the scheduler value(s) if we did not succeed.
969  this->scheduler_ = -1;
970  ace_scheduler = ACE_SCHED_OTHER;
971 
972  } else if (DCPS_debug_level > 0) {
974  ACE_TEXT("(%P|%t) Service_Participant::initializeScheduling() - ")
975  ACE_TEXT("scheduling policy set to %s(%d).\n"),
976  this->schedulerString_.c_str()));
977  }
978 
979  //
980  // Setup some scheduler specific information for later use.
981  //
984 #endif // ACE_WIN32
985  }
986 }
987 
988 #ifdef DDS_HAS_WCHAR
989 bool
990 Service_Participant::set_repo_ior(const wchar_t* ior,
992  bool attach_participant)
993 {
994  return set_repo_ior(ACE_Wide_To_Ascii(ior).char_rep(), key, attach_participant);
995 }
996 #endif
997 
998 bool
1000  Discovery::RepoKey key,
1001  bool attach_participant)
1002 {
1003  if (DCPS_debug_level > 0) {
1005  ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior: Repo[%C] == %C\n"),
1006  key.c_str(), ior));
1007  }
1008 
1009  // This is a global used for the bizarre commandline/configfile
1010  // processing done for this class.
1011  got_info = true;
1012 
1013  if (key == "-1") {
1015  }
1016 
1017  const OPENDDS_STRING repo_type = ACE_TEXT_ALWAYS_CHAR(REPO_SECTION_NAME);
1018  if (!discovery_types_.count(repo_type)) {
1019  // Re-use a transport registry function to attempt a dynamic load of the
1020  // library that implements the 'repo_type' (InfoRepoDiscovery)
1021  TheTransportRegistry->load_transport_lib(repo_type);
1022  }
1023 
1024  if (discovery_types_.count(repo_type)) {
1026  cf.open();
1028  ACE_TString section = REPO_SECTION_NAME;
1029  section += ACE_TEXT('\\');
1030  section += ACE_TEXT_CHAR_TO_TCHAR(key.c_str());
1031  cf.open_section(cf.root_section(), section.c_str(), true /*create*/, sect_key);
1032  cf.set_string_value(sect_key, ACE_TEXT("RepositoryIor"),
1033  ACE_TEXT_CHAR_TO_TCHAR(ior));
1034 
1035  discovery_types_[repo_type]->discovery_config(cf);
1036 
1037  this->remap_domains(key, key, attach_participant);
1038  return true;
1039  }
1040 
1042  ACE_TEXT("(%P|%t) Service_Participant::set_repo_ior ")
1043  ACE_TEXT("ERROR - no discovery type registered for ")
1044  ACE_TEXT("InfoRepoDiscovery\n")),
1045  false);
1046 }
1047 
1048 void
1050  Discovery::RepoKey newKey,
1051  bool attach_participant)
1052 {
1053  // Search the mappings for any domains mapped to this repository.
1054  OPENDDS_VECTOR(DDS::DomainId_t) domainList;
1055  {
1057 
1058  for (DomainRepoMap::const_iterator current = this->domainRepoMap_.begin();
1059  current != this->domainRepoMap_.end();
1060  ++current) {
1061  if (current->second == oldKey) {
1062  domainList.push_back(current->first);
1063  }
1064  }
1065  }
1066 
1067  // Remap the domains that were attached to this repository.
1068  for (unsigned int index = 0; index < domainList.size(); ++index) {
1069  // For mapped domains, attach their participants by setting the
1070  // mapping again.
1071  this->set_repo_domain(domainList[ index], newKey, attach_participant);
1072  }
1073 }
1074 
1075 void
1077  Discovery::RepoKey key,
1078  bool attach_participant)
1079 {
1080  typedef std::pair<Discovery_rch, GUID_t> DiscRepoPair;
1081  OPENDDS_VECTOR(DiscRepoPair) repoList;
1082  {
1084  DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
1085 
1086  if (key == "-1") {
1088  }
1089 
1090  if ((where == this->domainRepoMap_.end()) || (where->second != key)) {
1091  // Only assign entries into the map when they change the
1092  // contents.
1093  this->domainRepoMap_[ domain] = key;
1094 
1095  if (DCPS_debug_level > 0) {
1097  ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
1098  ACE_TEXT("Domain[ %d] = Repo[ %C].\n"),
1099  domain, key.c_str()));
1100  }
1101  }
1102 
1103  //
1104  // Make sure that we mark each DomainParticipant for this domain
1105  // using this repository as attached to this repository.
1106  //
1107  // @TODO: Move this note into user documentation.
1108  // N.B. Calling set_repo() or set_repo_ior() will result in this
1109  // code executing again with the new repository. It is best
1110  // to call those routines first when making changes.
1111  //
1112 
1113  // No servant means no participant. No worries.
1114  if (this->dp_factory_servant_) {
1115  // Map of domains to sets of participants.
1116  const DomainParticipantFactoryImpl::DPMap& participants
1117  = this->dp_factory_servant_->participants();
1118 
1119  // Extract the set of participants for the current domain.
1120  DomainParticipantFactoryImpl::DPMap::const_iterator
1121  which = participants.find(domain);
1122 
1123  if (which != participants.end()) {
1124  // Extract the repository to attach this domain to.
1125  RepoKeyDiscoveryMap::const_iterator disc_iter = this->discoveryMap_.find(key);
1126 
1127  if (disc_iter != this->discoveryMap_.end()) {
1128  for (DomainParticipantFactoryImpl::DPSet::const_iterator
1129  current = which->second.begin();
1130  current != which->second.end();
1131  ++current) {
1132  try {
1133  // Attach each DomainParticipant in this domain to this
1134  // repository.
1135  GUID_t id = (*current)->get_id();
1136  repoList.push_back(std::make_pair(disc_iter->second, id));
1137 
1138  if (DCPS_debug_level > 0) {
1140  ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
1141  ACE_TEXT("participant %C attached to Repo[ %C].\n"),
1142  LogGuid(id).c_str(),
1143  key.c_str()));
1144  }
1145 
1146  } catch (const CORBA::Exception& ex) {
1148  "ERROR: Service_Participant::set_repo_domain: failed to attach repository - ");
1149  return;
1150  }
1151  }
1152  }
1153  }
1154  }
1155  } // End of GUARD scope.
1156 
1157  // Make all of the remote calls after releasing the lock.
1158  for (unsigned int index = 0; index < repoList.size(); ++index) {
1159  if (DCPS_debug_level > 0) {
1161  ACE_TEXT("(%P|%t) Service_Participant::set_repo_domain: ")
1162  ACE_TEXT("(%d of %d) attaching domain %d participant %C to Repo[ %C].\n"),
1163  (1+index), repoList.size(), domain,
1164  LogGuid(repoList[ index].second).c_str(),
1165  key.c_str()));
1166  }
1167 
1168  if (attach_participant)
1169  {
1170  repoList[ index].first->attach_participant(domain, repoList[ index].second);
1171  }
1172  }
1173 }
1174 
1175 void
1177 {
1178  // Find the lost repository.
1179  RepoKeyDiscoveryMap::iterator initialLocation = this->discoveryMap_.find(key);
1180  RepoKeyDiscoveryMap::iterator current = initialLocation;
1181 
1182  if (current == this->discoveryMap_.end()) {
1184  ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
1185  ACE_TEXT("lost repository %C was not present, ")
1186  ACE_TEXT("finding another anyway.\n"),
1187  key.c_str()));
1188 
1189  } else {
1190  // Start with the repository *after* the lost one.
1191  ++current;
1192  }
1193 
1194  // Calculate the bounding end time for attempts.
1196  const MonotonicTimePoint recoveryFailedTime(MonotonicTimePoint::now() + td);
1197 
1198  // Backoff delay.
1199  int backoff = this->federation_initial_backoff_seconds();
1200 
1201  // Keep trying until the total recovery time specified is exceeded.
1202  while (recoveryFailedTime > MonotonicTimePoint::now()) {
1203 
1204  // Wrap to the beginning at the end of the list.
1205  if (current == this->discoveryMap_.end()) {
1206  // Continue to traverse the list.
1207  current = this->discoveryMap_.begin();
1208  }
1209 
1210  // Handle reaching the lost repository by waiting before trying
1211  // again.
1212  if (current == initialLocation) {
1213  if (DCPS_debug_level > 0) {
1215  ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
1216  ACE_TEXT("waiting %d seconds to traverse the ")
1217  ACE_TEXT("repository list another time ")
1218  ACE_TEXT("for lost key %C.\n"),
1219  backoff,
1220  key.c_str()));
1221  }
1222 
1223  // Wait to traverse the list and try again.
1224  ACE_OS::sleep(backoff);
1225 
1226  // Exponentially backoff delay.
1227  backoff *= this->federation_backoff_multiplier();
1228 
1229  // Don't increment current to allow us to reattach to the
1230  // original repository if it is restarted.
1231  }
1232 
1233  // Check the availability of the current repository.
1234  if (current->second->active()) {
1235 
1236  if (DCPS_debug_level > 0) {
1238  ACE_TEXT("(%P|%t) Service_Participant::repository_lost: ")
1239  ACE_TEXT("replacing repository %C with %C.\n"),
1240  key.c_str(),
1241  current->first.c_str()));
1242  }
1243 
1244  // If we reach here, the validate_connection() call succeeded
1245  // and the repository is reachable.
1246  this->remap_domains(key, current->first);
1247 
1248  // Now we are done. This is the only non-failure exit from
1249  // this method.
1250  return;
1251 
1252  } else {
1254  ACE_TEXT("(%P|%t) WARNING: Service_Participant::repository_lost: ")
1255  ACE_TEXT("repository %C was not available to replace %C, ")
1256  ACE_TEXT("looking for another.\n"),
1257  current->first.c_str(),
1258  key.c_str()));
1259  }
1260 
1261  // Move to the next candidate repository.
1262  ++current;
1263  }
1264 
1265  // If we reach here, we have exceeded the total recovery time
1266  // specified.
1267  OPENDDS_ASSERT(recoveryFailedTime.is_zero());
1268 }
1269 
1270 void
1272 {
1273  this->defaultDiscovery_ = key;
1274 }
1275 
1278 {
1279  return this->defaultDiscovery_;
1280 }
1281 
1284 {
1286 
1287  // Default to the Default InfoRepo-based discovery unless the user has
1288  // changed defaultDiscovery_ using the API or config file
1290  bool in_range = false;
1291  const Discovery::RepoKey instance_name = get_discovery_template_instance_name(domain);
1292  DomainRange dr_inst;
1293 
1294  RepoKeyDiscoveryMap::const_iterator location;
1295 
1296  // Find if this domain has a repo key (really a discovery key)
1297  // mapped to it.
1298  DomainRepoMap::const_iterator where = this->domainRepoMap_.find(domain);
1299  if (where != this->domainRepoMap_.end()) {
1300  repo = where->second;
1301  } else {
1302  // Is domain part of a DomainRange template?
1303  in_range = get_domain_range_info(domain, dr_inst);
1304  }
1305 
1306  // check to see if this domain has a discovery template
1307  // and if the template instance has already been loaded.
1308  if (!in_range && is_discovery_template(repo)) {
1309  location = this->discoveryMap_.find(instance_name);
1310  if (location == this->discoveryMap_.end()) {
1311  if (configure_discovery_template(domain, repo)) {
1312  repo = instance_name;
1313  }
1314  }
1315  }
1316 
1317  location = this->discoveryMap_.find(repo);
1318 
1319  if (location == this->discoveryMap_.end()) {
1320  if (in_range) {
1321  const int ret = configure_domain_range_instance(domain);
1322 
1323  // return the newly configured domain and return it
1324  if (!ret) {
1325  return this->discoveryMap_[instance_name];
1326  } else {
1327  if (DCPS_debug_level > 0) {
1329  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1330  ACE_TEXT("failed attempt to set RTPS discovery for domain range %d.\n"),
1331  domain));
1332  }
1333 
1334  return Discovery_rch();
1335  }
1336  } else if ((repo == Discovery::DEFAULT_REPO) ||
1337  (repo == "-1")) {
1338  // Set the default repository IOR if it hasn't already happened
1339  // by this point. This is why this can't be const.
1340  bool ok = this->set_repo_ior(DEFAULT_REPO_IOR, Discovery::DEFAULT_REPO);
1341 
1342  if (!ok) {
1343  if (DCPS_debug_level > 0) {
1345  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1346  ACE_TEXT("failed attempt to set default IOR for domain %d.\n"),
1347  domain));
1348  }
1349 
1350  } else {
1351  // Found the default!
1352  if (DCPS_debug_level > 4) {
1354  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1355  ACE_TEXT("returning default repository for domain %d.\n"),
1356  domain));
1357  }
1358 
1359  }
1360  return this->discoveryMap_[Discovery::DEFAULT_REPO];
1361 
1362  } else if (repo == Discovery::DEFAULT_RTPS) {
1363 
1365  cf.open();
1367  cf.open_section(cf.root_section(), RTPS_SECTION_NAME, true /*create*/, k);
1368 
1369  int status = load_discovery_configuration(cf, RTPS_SECTION_NAME);
1370 
1371  if (status != 0) {
1373  ACE_TEXT("(%P|%t) ERROR: Service_Participant::get_Discovery ")
1374  ACE_TEXT("failed attempt to load default RTPS discovery for domain %d.\n"),
1375  domain));
1376 
1377  return Discovery_rch();
1378  }
1379 
1380  // Try to find it again
1381  location = this->discoveryMap_.find(Discovery::DEFAULT_RTPS);
1382 
1383  if (location == this->discoveryMap_.end()) {
1384  // Unable to load DEFAULT_RTPS
1385  if (DCPS_debug_level > 0) {
1387  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1388  ACE_TEXT("failed attempt to set default RTPS discovery for domain %d.\n"),
1389  domain));
1390  }
1391 
1392  return Discovery_rch();
1393 
1394  } else {
1395  // Found the default!
1396  if (DCPS_debug_level > 4) {
1398  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1399  ACE_TEXT("returning default RTPS discovery for domain %d.\n"),
1400  domain));
1401  }
1402 
1403  return location->second;
1404  }
1405 
1406  } else {
1407  // Non-default repositories _must_ be loaded by application.
1408  if (DCPS_debug_level > 4) {
1410  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1411  ACE_TEXT("repository for domain %d was not set.\n"),
1412  domain));
1413  }
1414 
1415  return Discovery_rch();
1416  }
1417  }
1418 
1419  if (DCPS_debug_level > 4) {
1421  ACE_TEXT("(%P|%t) Service_Participant::get_discovery: ")
1422  ACE_TEXT("returning repository for domain %d, repo %C.\n"),
1423  domain, repo.c_str()));
1424  }
1425 
1426  return location->second;
1427 }
1428 
1431 {
1433 }
1434 
1435 int
1437 {
1438  return this->bit_transport_port_;
1439 }
1440 
1441 void
1443 {
1445  this->bit_transport_port_ = port;
1446  got_bit_transport_port = true;
1447 }
1448 
1449 int
1451 {
1453 }
1454 
1455 void
1457 {
1459  got_bit_lookup_duration_msec = true;
1460 }
1461 
1462 size_t
1464 {
1465  return n_chunks_;
1466 }
1467 
1468 void
1470 {
1471  n_chunks_ = chunks;
1472  got_chunks = true;
1473 }
1474 
1475 size_t
1477 {
1479 }
1480 
1481 void
1483 {
1484  association_chunk_multiplier_ = multiplier;
1485  got_chunk_association_multiplier = true;
1486 }
1487 
1488 void
1490 {
1491  liveliness_factor_ = factor;
1492  got_liveliness_factor = true;
1493 }
1494 
1495 int
1497 {
1498  return liveliness_factor_;
1499 }
1500 
1501 void
1503  Discovery::Config* cfg)
1504 {
1505  discovery_types_[section_name].reset(cfg);
1506 }
1507 
1508 int
1510 {
1512  int status = 0;
1513 
1514  if ((status = cf.open()) != 0)
1516  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1517  ACE_TEXT("open() returned %d\n"),
1518  status),
1519  -1);
1520 
1521  ACE_Ini_ImpExp import(cf);
1522  status = import.import_config(config_fname.c_str());
1523 
1524  if (status != 0) {
1526  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1527  ACE_TEXT("import_config () returned %d\n"),
1528  status),
1529  -1);
1530  } else {
1531  status = this->load_configuration(cf, config_fname.c_str());
1532  }
1533 
1534  return status;
1535 }
1536 
1537 int
1539  ACE_Configuration_Heap& config,
1540  const ACE_TCHAR* filename)
1541 {
1542  // Domain config is loaded after Discovery (see below). Since the domain
1543  // could be a domain_range that specifies the DiscoveryTemplate, check
1544  // for config templates before loading any config information.
1545  ACE_TString section_name;
1546 
1547  int status = this->load_common_configuration(config, filename);
1548 
1549  if (status != 0) {
1551  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1552  ACE_TEXT("load_common_configuration () returned %d\n"),
1553  status),
1554  -1);
1555  }
1556 
1557  // Register static discovery.
1558  this->add_discovery(static_rchandle_cast<Discovery>(StaticDiscovery::instance()));
1559 
1560  // load any discovery configuration templates before rtps discovery
1561  // this will populate the domain_range_templates_
1562  status = this->load_domain_ranges(config);
1563 
1564  // load any rtps_discovery templates
1565  status = this->load_discovery_templates(config);
1566 
1567  if (status != 0) {
1569  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1570  ACE_TEXT("load_domain_range_configuration() returned %d\n"),
1571  status),
1572  -1);
1573  }
1574 
1575  status = this->load_discovery_configuration(config, RTPS_SECTION_NAME);
1576 
1577  if (status != 0) {
1579  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1580  ACE_TEXT("load_discovery_configuration() returned %d\n"),
1581  status),
1582  -1);
1583  }
1584 
1585  status = this->load_discovery_configuration(config, REPO_SECTION_NAME);
1586 
1587  if (status != 0) {
1589  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1590  ACE_TEXT("load_discovery_configuration() returned %d\n"),
1591  status),
1592  -1);
1593  }
1594 
1595  // load any transport configuration templates before the transport config
1597 
1598  if (status != 0) {
1600  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1601  ACE_TEXT("load_transport_templates() returned %d\n"),
1602  status),
1603  -1);
1604  }
1605 
1607  ACE_TEXT_ALWAYS_CHAR(filename), config);
1608  if (this->global_transport_config_ != ACE_TEXT("")) {
1611  if (config) {
1613  } else if (TheTransportRegistry->config_has_transport_template(global_transport_config_)) {
1614  if (DCPS_debug_level > 0) {
1615  // This is not an error.
1617  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_configuration ")
1618  ACE_TEXT("DCPSGlobalTransportConfig %C is a transport_template\n"),
1619  this->global_transport_config_.c_str()));
1620  }
1621  } else {
1623  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1624  ACE_TEXT("Unable to locate specified global transport config: %C\n"),
1626  -1);
1627  }
1628  }
1629 
1630  if (status != 0) {
1632  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1633  ACE_TEXT("load_transport_configuration () returned %d\n"),
1634  status),
1635  -1);
1636  }
1637 
1638  // Needs to be loaded after the [rtps_discovery/*] and [repository/*]
1639  // sections to allow error reporting on bad discovery config names.
1640  // Also loaded after the transport configuration so that
1641  // DefaultTransportConfig within [domain/*] can use TransportConfig objects.
1642  status = this->load_domain_configuration(config, filename);
1643 
1644  if (status != 0) {
1646  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1647  ACE_TEXT("load_domain_configuration () returned %d\n"),
1648  status),
1649  -1);
1650  }
1651 
1652  if (status != 0) {
1654  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1655  ACE_TEXT("load_domain_configuration () returned %d\n"),
1656  status),
1657  -1);
1658  }
1659 
1660  // Needs to be loaded after transport configs and instances and domains.
1661  try {
1662  status = StaticDiscovery::instance()->load_configuration(config);
1663 
1664  if (status != 0) {
1666  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_configuration ")
1667  ACE_TEXT("load_discovery_configuration() returned %d\n"),
1668  status),
1669  -1);
1670  }
1671  } catch (const CORBA::BAD_PARAM& ex) {
1672  ex._tao_print_exception("Exception caught in Service_Participant::load_configuration: "
1673  "trying to load_discovery_configuration()");
1674  return -1;
1675  }
1676 
1677  return 0;
1678 }
1679 
1680 int
1682  const ACE_TCHAR* filename)
1683 {
1684  const ACE_Configuration_Section_Key &root = cf.root_section();
1686 
1687  if (cf.open_section(root, COMMON_SECTION_NAME, false, sect) != 0) {
1688  if (DCPS_debug_level > 0) {
1689  // This is not an error if the configuration file does not have
1690  // a common section. The code default configuration will be used.
1692  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_common_configuration ")
1693  ACE_TEXT("failed to open section %s\n"),
1694  COMMON_SECTION_NAME));
1695  }
1696 
1697  return 0;
1698 
1699  } else {
1700  const ACE_TCHAR* message =
1701  ACE_TEXT("(%P|%t) NOTICE: using %s value from command option (overrides value if it's in config file)\n");
1702 
1703  if (got_debug_level) {
1704  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSDebugLevel")));
1705  } else {
1706  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSDebugLevel"), DCPS_debug_level, int)
1707  }
1708 
1709  if (got_info) {
1710  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSInfoRepo")));
1711  } else {
1713  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSInfoRepo"), value)
1714  if (!value.empty()) {
1715  this->set_repo_ior(value.c_str(), Discovery::DEFAULT_REPO);
1716  }
1717  }
1718 
1719  if (got_use_rti_serialization) {
1720  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSRTISerialization")));
1721  } else {
1722  bool should_use = true;
1723  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSRTISerialization"), should_use, bool)
1724  if (!should_use) {
1726  ACE_TEXT("(%P|%t) WARNING: Service_Participant::load_common_configuration ")
1727  ACE_TEXT("Argument ignored: DCPSRTISerialization is required to be enabled\n")));
1728  }
1729  }
1730 
1731  if (got_chunks) {
1732  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSChunks")));
1733  } else {
1734  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunks"), this->n_chunks_, size_t)
1735  }
1736 
1737  if (got_chunk_association_multiplier) {
1738  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSChunkAssociationMultiplier")));
1739  } else {
1740  // This is legacy support for a misspelling of the config option.
1741  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMutltiplier"), this->association_chunk_multiplier_, size_t)
1742  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSChunkAssociationMultiplier"), this->association_chunk_multiplier_, size_t)
1743  }
1744 
1745  if (got_bit_transport_port) {
1746  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSBitTransportPort")));
1747  } else {
1748  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportPort"), this->bit_transport_port_, int)
1749  }
1750 
1751  if (got_bit_transport_ip) {
1752  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSBitTransportIPAddress")));
1753  } else {
1754  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSBitTransportIPAddress"), this->bit_transport_ip_)
1755  }
1756 
1757  if (got_liveliness_factor) {
1758  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSLivelinessFactor")));
1759  } else {
1760  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLivelinessFactor"), this->liveliness_factor_, int)
1761  }
1762 
1763  if (got_bit_lookup_duration_msec) {
1764  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSBitLookupDurationMsec")));
1765  } else {
1766  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBitLookupDurationMsec"), this->bit_lookup_duration_msec_, int)
1767  }
1768 
1769  if (got_global_transport_config) {
1770  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSGlobalTransportConfig")));
1771  } else {
1772  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSGlobalTransportConfig"), this->global_transport_config_);
1773  if (this->global_transport_config_ == ACE_TEXT("$file")) {
1774  // When the special string of "$file" is used, substitute the file name
1775  this->global_transport_config_ = filename;
1776  }
1777  }
1778 
1779  if (got_bit_flag) {
1780  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSBit")));
1781  } else {
1782  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBit"), this->bit_enabled_, int)
1783  }
1784 
1785  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSLogBits"), log_bits, bool);
1786 
1787 #if defined(OPENDDS_SECURITY)
1788  if (got_security_flag) {
1789  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSSecurity")));
1790  } else {
1791  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSSecurity"), this->security_enabled_, int)
1792  }
1793 
1794  if (got_security_debug) {
1795  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSSecurityDebug or DCPSSecurityDebugLevel")));
1796  } else {
1797  const ACE_TCHAR* debug_name = ACE_TEXT("DCPSSecurityDebug");
1798  const ACE_TCHAR* debug_level_name = ACE_TEXT("DCPSSecurityDebugLevel");
1799  bool got_value = false;
1800  ACE_TString debug_level_value;
1801  if (cf.get_string_value(sect, debug_level_name, debug_level_value) == -1) {
1802  ACE_TString debug_value;
1803  if (cf.get_string_value(sect, debug_name, debug_value) != -1) {
1804  if (debug_value != ACE_TEXT("")) {
1805  got_value = true;
1806  security_debug.parse_flags(debug_value.c_str());
1807  }
1808  }
1809  } else if (debug_level_value != ACE_TEXT("")) {
1810  got_value = true;
1811  security_debug.set_debug_level(ACE_OS::atoi(debug_level_value.c_str()));
1812  }
1813  if (!got_value && OpenDDS::DCPS::Transport_debug_level > 0) {
1815  ACE_TEXT("(%P|%t) NOTICE: DCPSSecurityDebug and DCPSSecurityDebugLevel ")
1816  ACE_TEXT("are not defined in config file or are blank - using code default.\n")));
1817  }
1818  }
1819 
1820  if (got_security_fake_encryption) {
1821  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSSecurityFakeEncryption")));
1822  } else {
1823  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSSecurityFakeEncryption"), security_debug.fake_encryption, int)
1824  }
1825 #endif
1826 
1827  if (got_transport_debug_level) {
1828  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSTransportDebugLevel")));
1829  } else {
1830  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSTransportDebugLevel"), OpenDDS::DCPS::Transport_debug_level, int)
1831  }
1832 
1833 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1834  if (got_persistent_data_dir) {
1835  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSPersistentDataDir")));
1836  } else {
1838  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSPersistentDataDir"), value)
1840  }
1841 #endif
1842 
1843  if (got_pending_timeout) {
1844  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSPendingTimeout")));
1845  } else {
1846  int timeout = 0;
1847  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPendingTimeout"), timeout, int)
1848  pending_timeout_ = TimeDuration(timeout);
1849  }
1850 
1851  if (got_publisher_content_filter) {
1852  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSPublisherContentFilter")));
1853  } else {
1854  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSPublisherContentFilter"),
1855  this->publisher_content_filter_, bool)
1856  }
1857 
1858  if (got_default_discovery) {
1860  if (cf.find_value(sect, ACE_TEXT("DCPSDefaultDiscovery"), type) != -1) {
1861  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSDefaultDiscovery")));
1862  }
1863  } else {
1864  GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultDiscovery"),
1865  this->defaultDiscovery_);
1866  }
1867 
1868  if (got_bidir_giop) {
1870  if (cf.find_value(sect, ACE_TEXT("DCPSBidirGIOP"), type) != -1) {
1871  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSBidirGIOP")));
1872  }
1873  } else {
1874  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSBidirGIOP"), bidir_giop_, bool)
1875  }
1876 
1877  if (got_thread_status_interval) {
1878  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSThreadStatusInterval")));
1879  } else {
1880  int interval = 0;
1881  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSThreadStatusInterval"), interval, int);
1883  }
1884 
1886  if (got_log_fname) {
1887  if (cf.find_value(sect, ACE_TEXT("ORBLogFile"), type) != -1) {
1888  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("ORBLogFile")));
1889  }
1890  } else {
1891  OPENDDS_STRING log_fname;
1892  GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("ORBLogFile"), log_fname);
1893  if (!log_fname.empty()) {
1894  set_log_file_name(log_fname.c_str());
1895  }
1896  }
1897 
1898  if (got_log_verbose) {
1899  if (cf.find_value(sect, ACE_TEXT("ORBVerboseLogging"), type) != -1) {
1900  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("ORBVerboseLogging")));
1901  }
1902  } else {
1903  unsigned long verbose_logging = 0;
1904  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("ORBVerboseLogging"), verbose_logging, unsigned long);
1905  set_log_verbose(verbose_logging);
1906  }
1907 
1908  if (got_default_address) {
1909  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSDefaultAddress")));
1910  } else {
1911  ACE_TString default_address_str;
1912  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("DCPSDefaultAddress"), default_address_str);
1913  ACE_INET_Addr addr;
1914  if (!default_address_str.empty() &&
1915  addr.set(u_short(0), default_address_str.c_str())) {
1917  ACE_TEXT("(%P|%t) ERROR: Service_Participant::load_common_configuration: ")
1918  ACE_TEXT("failed to parse default address %C\n"),
1919  default_address_str.c_str()),
1920  -1);
1921  }
1923  }
1924 
1925  if (got_monitor) {
1926  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSMonitor")));
1927  } else {
1928  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("DCPSMonitor"), monitor_enabled_, bool)
1929  }
1930 
1931  if (got_type_object_encoding) {
1932  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSTypeObjectEncoding")));
1933  } else {
1934  String str;
1935  GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSTypeObjectEncoding"), str);
1936  if (!str.empty()) {
1937  type_object_encoding(str.c_str());
1938  }
1939  }
1940 
1941  if (got_log_level) {
1942  ACE_DEBUG((LM_NOTICE, message, ACE_TEXT("DCPSLogLevel")));
1943  } else {
1944  String str;
1945  GET_CONFIG_STRING_VALUE(cf, sect, ACE_TEXT("DCPSLogLevel"), str);
1946  if (!str.empty()) {
1947  log_level.set_from_string(str.c_str());
1948  }
1949  }
1950 
1951  // These are not handled on the command line.
1952  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationRecoveryDuration"), this->federation_recovery_duration_, int)
1953  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationInitialBackoffSeconds"), this->federation_initial_backoff_seconds_, int)
1954  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationBackoffMultiplier"), this->federation_backoff_multiplier_, int)
1955  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("FederationLivelinessDuration"), this->federation_liveliness_, int)
1956 
1957 #if OPENDDS_POOL_ALLOCATOR
1958  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_size"), pool_size_, size_t)
1959  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("pool_granularity"), pool_granularity_, size_t)
1960 #endif
1961 
1962  //
1963  // Establish the scheduler if specified.
1964  //
1965  GET_CONFIG_TSTRING_VALUE(cf, sect, ACE_TEXT("scheduler"), this->schedulerString_)
1966 
1967  suseconds_t usec(0);
1968 
1969  GET_CONFIG_VALUE(cf, sect, ACE_TEXT("scheduler_slice"), usec, suseconds_t)
1970 
1971  if (usec > 0) {
1972  schedulerQuantum_ = TimeDuration(0, usec);
1973  }
1974  }
1975 
1976  return 0;
1977 }
1978 
1979 int
1981  const ACE_TCHAR* filename)
1982 {
1983  const ACE_Configuration_Section_Key& root = cf.root_section();
1984  ACE_Configuration_Section_Key domain_sect;
1985 
1986  if (cf.open_section(root, DOMAIN_SECTION_NAME, false, domain_sect) != 0) {
1987  if (DCPS_debug_level > 0) {
1988  // This is not an error if the configuration file does not have
1989  // any domain (sub)section. The code default configuration will be used.
1991  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_configuration(): ")
1992  ACE_TEXT("failed to open [%s] section - using code default.\n"),
1993  DOMAIN_SECTION_NAME));
1994  }
1995 
1996  return 0;
1997 
1998  } else {
1999  // Ensure there are no properties in this section
2000  ValueMap vm;
2001  if (pullValues(cf, domain_sect, vm) > 0) {
2002  // There are values inside [domain]
2004  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2005  ACE_TEXT("domain sections must have a subsection name\n")),
2006  -1);
2007  }
2008  // Process the subsections of this section (the individual domains)
2009  KeyList keys;
2010  if (processSections(cf, domain_sect, keys) != 0) {
2012  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2013  ACE_TEXT("too many nesting layers in the [domain] section.\n")),
2014  -1);
2015  }
2016 
2017  // Loop through the [domain/*] sections
2018  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2019  OPENDDS_STRING domain_name = it->first;
2020 
2021  ValueMap values;
2022  pullValues(cf, it->second, values);
2023  DDS::DomainId_t domainId = -1;
2024  Discovery::RepoKey repoKey;
2025  OPENDDS_STRING perDomainDefaultTportConfig;
2026  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2027  OPENDDS_STRING name = it->first;
2028  if (name == "DomainId") {
2029  OPENDDS_STRING value = it->second;
2030  if (!convertToInteger(value, domainId)) {
2032  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2033  ACE_TEXT("Illegal integer value for DomainId (%C) in [domain/%C] section.\n"),
2034  value.c_str(), domain_name.c_str()),
2035  -1);
2036  }
2037  if (DCPS_debug_level > 0) {
2039  ACE_TEXT("(%P|%t) [domain/%C]: DomainId == %d\n"),
2040  domain_name.c_str(), domainId));
2041  }
2042  } else if (name == "DomainRepoKey") {
2043  // We will still process this for backward compatibility, but
2044  // it can now be replaced by "DiscoveryConfig=REPO:<key>"
2045  repoKey = it->second;
2046  if (repoKey == "-1") {
2047  repoKey = Discovery::DEFAULT_REPO;
2048  }
2049 
2050  if (DCPS_debug_level > 0) {
2052  ACE_TEXT("(%P|%t) [domain/%C]: DomainRepoKey == %C\n"),
2053  domain_name.c_str(), repoKey.c_str()));
2054  }
2055  } else if (name == "DiscoveryConfig") {
2056  repoKey = it->second;
2057  } else if (name == "DefaultTransportConfig") {
2058  if (it->second == "$file") {
2059  // When the special string of "$file" is used, substitute the file name
2060  perDomainDefaultTportConfig = ACE_TEXT_ALWAYS_CHAR(filename);
2061 
2062  } else {
2063  perDomainDefaultTportConfig = it->second;
2064  }
2065 
2066  } else {
2068  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2069  ACE_TEXT("Unexpected entry (%C) in [domain/%C] section.\n"),
2070  name.c_str(), domain_name.c_str()),
2071  -1);
2072  }
2073  }
2074 
2075  if (domainId == -1) {
2076  // DomainId parameter is not set, try using the domain name as an ID
2077  if (!convertToInteger(domain_name, domainId)) {
2079  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2080  ACE_TEXT("Missing DomainId value in [domain/%C] section.\n"),
2081  domain_name.c_str()),
2082  -1);
2083  }
2084  }
2085 
2086  if (!perDomainDefaultTportConfig.empty()) {
2088  TransportConfig_rch tc = reg->get_config(perDomainDefaultTportConfig);
2089  if (tc.is_nil()) {
2091  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2092  ACE_TEXT("Unknown transport config %C in [domain/%C] section.\n"),
2093  perDomainDefaultTportConfig.c_str(), domain_name.c_str()), -1);
2094  } else {
2095  reg->domain_default_config(domainId, tc);
2096  }
2097  }
2098 
2099  // Check to see if the specified discovery configuration has been defined
2100  if (!repoKey.empty()) {
2101  if ((repoKey != Discovery::DEFAULT_REPO) &&
2102  (repoKey != Discovery::DEFAULT_RTPS) &&
2103  (repoKey != Discovery::DEFAULT_STATIC) &&
2104  (this->discoveryMap_.find(repoKey) == this->discoveryMap_.end())) {
2106  ACE_TEXT("(%P|%t) Service_Participant::load_domain_configuration(): ")
2107  ACE_TEXT("Specified configuration (%C) not found. Referenced in [domain/%C] section.\n"),
2108  repoKey.c_str(), domain_name.c_str()),
2109  -1);
2110  }
2111  this->set_repo_domain(domainId, repoKey);
2112  }
2113  }
2114  }
2115 
2116  return 0;
2117 }
2118 
2119 int
2121 {
2122  const ACE_Configuration_Section_Key& root = cf.root_section();
2123  ACE_Configuration_Section_Key domain_range_sect;
2124 
2125  if (cf.open_section(root, DOMAIN_RANGE_SECTION_NAME, false, domain_range_sect) != 0) {
2126  if (DCPS_debug_level > 0) {
2127  // This is not an error if the configuration file does not have
2128  // any domain range (sub)section.
2130  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_ranges(): ")
2131  ACE_TEXT("config does not have a [%s] section.\n"),
2132  DOMAIN_RANGE_SECTION_NAME));
2133  }
2134 
2135  return 0;
2136 
2137  } else {
2138  if (DCPS_debug_level > 0) {
2140  ACE_TEXT("(%P|%t) NOTICE: Service_Participant::load_domain_ranges(): ")
2141  ACE_TEXT("config has %s sections.\n"),
2142  DOMAIN_RANGE_SECTION_NAME));
2143  }
2144 
2145  // Ensure there are no properties in this section
2146  ValueMap vm;
2147  if (pullValues(cf, domain_range_sect, vm) > 0) {
2148  // There are values inside [DomainRange]
2150  ACE_TEXT("(%P|%t) Service_Participant::load_domain_ranges(): ")
2151  ACE_TEXT("[%s] sections must have a subsection range\n"),
2152  DOMAIN_RANGE_SECTION_NAME),
2153  -1);
2154  }
2155 
2156  // Process the subsections of this section (the ranges, m-n)
2157  KeyList keys;
2158  if (processSections(cf, domain_range_sect, keys) != 0) {
2160  ACE_TEXT("(%P|%t) Service_Participant::load_domain_ranges(): ")
2161  ACE_TEXT("too many nesting layers in the [%s] section.\n"),
2162  DOMAIN_RANGE_SECTION_NAME),
2163  -1);
2164  }
2165 
2166  // Loop through the [DomainRange/*] sections
2167  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
2168  OPENDDS_STRING domain_range = it->first;
2169 
2170  DomainRange range_element;
2171 
2172  int range_start = -1;
2173  int range_end = -1;
2174 
2175  if (parse_domain_range(domain_range, range_start, range_end) != 0) {
2177  ACE_TEXT("(%P|%t) Service_Participant::load_domain_ranges(): ")
2178  ACE_TEXT("Error parsing [%s/%C] section.\n"),
2179  DOMAIN_RANGE_SECTION_NAME,
2180  domain_range.c_str()),
2181  -1);
2182  }
2183 
2184  range_element.range_start = range_start;
2185  range_element.range_end = range_end;
2186 
2187  ValueMap values;
2188  if (pullValues(cf, it->second, values) > 0) {
2189  OPENDDS_STRING dt_name;
2190 
2191  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2192  OPENDDS_STRING name = it->first;
2193  if (name == "DiscoveryTemplate") {
2194  dt_name = it->second;
2195  if (DCPS_debug_level > 0) {
2197  ACE_TEXT("(%P|%t) [%s/%C]: DiscoveryTemplate name == %C\n"),
2198  DOMAIN_RANGE_SECTION_NAME, domain_range.c_str(), dt_name.c_str()));
2199  }
2200  range_element.discovery_template_name = dt_name;
2201  } else if (name == "DefaultTransportConfig") {
2202  range_element.transport_config_name = dt_name;
2203  range_element.domain_info[it->first] = it->second;
2204  } else {
2205  // key=val domain config option
2206  range_element.domain_info[it->first] = it->second;
2207  }
2208  }
2209  }
2210  if (this->global_transport_config_ != ACE_TEXT("")) {
2212  }
2213  domain_ranges_.push_back(range_element);
2214  }
2215  }
2216 
2217  return 0;
2218 }
2219 
2221 {
2223 
2224  if (discoveryMap_.find(name) == discoveryMap_.end()) {
2225  // create a cf that has [rtps_discovery/name+domainId]
2226  // copy sections adding customization
2227  DomainRange dr_inst;
2228 
2229  if (get_domain_range_info(domainId, dr_inst)) {
2231  dcf.open();
2232  const ACE_Configuration_Section_Key& root = dcf.root_section();
2233 
2234  // set the transport_config_name
2235  domain_to_transport_name_map_[domainId] = dr_inst.transport_config_name;
2236 
2237  // create domain instance
2239  dcf.open_section(root, DOMAIN_SECTION_NAME, true /* create */, dsect);
2241  dcf.open_section(dsect, ACE_TEXT_CHAR_TO_TCHAR(to_dds_string(domainId).c_str()), true /* create */, dsub_sect);
2242  dcf.set_string_value(dsub_sect, ACE_TEXT("DiscoveryConfig"), ACE_TEXT_CHAR_TO_TCHAR(name.c_str()));
2243  for (ValueMap::const_iterator it = dr_inst.domain_info.begin();
2244  it != dr_inst.domain_info.end();
2245  ++it) {
2246  dcf.set_string_value(dsub_sect, ACE_TEXT_CHAR_TO_TCHAR(it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(it->second.c_str()));
2247  if (DCPS_debug_level > 0) {
2249  ACE_TEXT("(%P|%t) Service_Participant::")
2250  ACE_TEXT("configure_domain_range_instance(): adding %C=%C\n"),
2251  it->first.c_str(), it->second.c_str()));
2252  }
2253  }
2254 
2255  ACE_TString cfg_name;
2256  if (get_transport_base_config_name(domainId, cfg_name)) {
2257  if (TransportRegistry::instance()->config_has_transport_template(cfg_name)) {
2258  // create transport instance add default transport config
2260  const OPENDDS_STRING config_instance_name = TransportRegistry::instance()->get_config_instance_name(domainId);
2261  dcf.set_string_value(dsub_sect, ACE_TEXT("DefaultTransportConfig"),
2262  ACE_TEXT_CHAR_TO_TCHAR(config_instance_name.c_str()));
2263  if (DCPS_debug_level > 0) {
2265  ACE_TEXT("(%P|%t) Service_Participant::")
2266  ACE_TEXT("configure_domain_range_instance(): setting DefaultTransportConfig=%C\n"),
2267  config_instance_name.c_str()));
2268  }
2269  }
2270  } else {
2272  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
2273  ACE_TEXT("configure_domain_range_instance(): ")
2274  ACE_TEXT("transport config not found for domain %d\n"),
2275  domainId),
2276  -1);
2277  }
2278 
2279  //create matching discovery instance
2281  dcf.open_section(root, RTPS_SECTION_NAME, true /* create */, sect);
2283  dcf.open_section(sect, ACE_TEXT_CHAR_TO_TCHAR(name.c_str()), true, sub_sect);
2284 
2285  ValueMap discovery_settings;
2286  if (process_customizations(domainId, dr_inst.discovery_template_name, discovery_settings)) {
2287  for (ValueMap::const_iterator ds_it = discovery_settings.begin(); ds_it != discovery_settings.end(); ++ds_it) {
2288  dcf.set_string_value(sub_sect, ACE_TEXT_CHAR_TO_TCHAR(ds_it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(ds_it->second.c_str()));
2289  }
2290  }
2291 
2292  // load discovery
2293  int status = this->load_discovery_configuration(dcf, RTPS_SECTION_NAME);
2294 
2295  if (status != 0) {
2297  ACE_TEXT("(%P|%t) ERROR: Service_Participant::configure_domain_range_instance(): ")
2298  ACE_TEXT("load_discovery_configuration() returned %d\n"),
2299  status),
2300  -1);
2301  }
2302 
2303  // load domain config
2304  status = this->load_domain_configuration(dcf, 0);
2305 
2306  if (status != 0) {
2308  ACE_TEXT("(%P|%t) ERROR: Service_Participant::configure_domain_range_instance(): ")
2309  ACE_TEXT("load_domain_configuration() returned %d\n"),
2310  status),
2311  -1);
2312  }
2313 
2314  if (DCPS_debug_level > 4) {
2316  ACE_TEXT("(%P|%t) Service_Participant::configure_domain_range_instance(): ")
2317  ACE_TEXT("configure domain %d.\n"),
2318  domainId));
2319  }
2320  }
2321 
2322  } else {
2323  // > 9 to limit number of messages.
2324  if (DCPS_debug_level > 9) {
2326  ACE_TEXT("(%P|%t) Service_Participant::configure_domain_range_instance(): ")
2327  ACE_TEXT("domain %d already configured.\n"),
2328  domainId));
2329  }
2330  }
2331  return 0;
2332 }
2333 
2334 
2335 bool
2337 {
2338  for (OPENDDS_VECTOR(DomainRange)::const_iterator i = domain_ranges_.begin(); i != domain_ranges_.end(); ++i) {
2339  if (domainId >= i->range_start && domainId <= i->range_end) {
2340  return true;
2341  }
2342  }
2343 
2344  return false;
2345 }
2346 
2347 bool
2349 {
2350  OPENDDS_MAP(DDS::DomainId_t, OPENDDS_STRING)::const_iterator it = domain_to_transport_name_map_.find(domainId);
2351  if ( it != domain_to_transport_name_map_.end()) {
2352  name = ACE_TEXT_CHAR_TO_TCHAR(it->second.c_str());
2353  return true;
2354  } else if (global_transport_config_ != ACE_TEXT("")) {
2355  name = global_transport_config_;
2356  return true;
2357  } else {
2358  return false;
2359  }
2360 }
2361 
2362 int
2364  const ACE_TCHAR* section_name)
2365 {
2366  const ACE_Configuration_Section_Key &root = cf.root_section();
2368  if (cf.open_section(root, section_name, false, sect) == 0) {
2369 
2370  const OPENDDS_STRING sect_name = ACE_TEXT_ALWAYS_CHAR(section_name);
2371  DiscoveryTypes::iterator iter =
2372  this->discovery_types_.find(sect_name);
2373 
2374  if (iter == this->discovery_types_.end()) {
2375  // See if we can dynamically load the required libraries
2376  TheTransportRegistry->load_transport_lib(sect_name);
2377  iter = this->discovery_types_.find(sect_name);
2378  }
2379 
2380  if (iter != this->discovery_types_.end()) {
2381  // discovery code is loaded, process options
2382  return iter->second->discovery_config(cf);
2383  } else {
2384  // No discovery code can be loaded, report an error
2386  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
2387  ACE_TEXT("load_discovery_configuration(): ")
2388  ACE_TEXT("Unable to load libraries for %s\n"),
2389  section_name),
2390  -1);
2391  }
2392  }
2393  return 0;
2394 }
2395 
2396 int
2398 {
2399  ValueMap discovery_settings;
2400  if (process_customizations(domainId, discovery_name, discovery_settings)) {
2402 
2403  if (discoveryMap_.find(name) == discoveryMap_.end()) {
2405  dcf.open();
2406  const ACE_Configuration_Section_Key& root = dcf.root_section();
2407 
2408  //create discovery instance
2410  dcf.open_section(root, RTPS_SECTION_NAME, true /* create */, sect);
2412  dcf.open_section(sect, ACE_TEXT_CHAR_TO_TCHAR(name.c_str()), true, sub_sect);
2413 
2414  for (ValueMap::const_iterator ds_it = discovery_settings.begin(); ds_it != discovery_settings.end(); ++ds_it) {
2415  dcf.set_string_value(sub_sect, ACE_TEXT_CHAR_TO_TCHAR(ds_it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(ds_it->second.c_str()));
2416  if (DCPS_debug_level > 0) {
2418  ACE_TEXT("(%P|%t) Service_Participant::configure_discovery_template(): ")
2419  ACE_TEXT("setting %C = %C\n"),
2420  ds_it->first.c_str(), ds_it->second.c_str()));
2421  }
2422  }
2423 
2424  // load discovery
2425  int status = this->load_discovery_configuration(dcf, RTPS_SECTION_NAME);
2426 
2427  if (status != 0) {
2429  ACE_TEXT("(%P|%t) ERROR: Service_Participant::configure_discovery_template(): ")
2430  ACE_TEXT("load_discovery_configuration() returned %d\n"),
2431  status),
2432  -1);
2433  }
2434  } else {
2435  // already configured. not necessarily an error
2436  if (DCPS_debug_level > 0) {
2438  ACE_TEXT("(%P|%t) Discovery config %C already exists\n"),
2439  name.c_str()));
2440  }
2441 
2442  }
2443  } else {
2445  ACE_TEXT("(%P|%t) ERROR: Service_Participant::configure_discovery_template(): ")
2446  ACE_TEXT("process_customizations() returned false\n")),
2447  -1);
2448  }
2449 
2450  return 0;
2451 }
2452 
2453 
2454 int
2456 {
2457  // open the rtps_discovery config sections
2458  cf.open();
2459  const ACE_Configuration_Section_Key& root = cf.root_section();
2461 
2462  if (cf.open_section(root, RTPS_SECTION_NAME, false, rtps_sect) == 0) {
2463  ValueMap vm;
2464  if (pullValues(cf, rtps_sect, vm) > 0) {
2465  // There are values inside [rtps_discovery]
2467  ACE_TEXT("(%P|%t) Service_Participant::load_discovery_templates(): ")
2468  ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
2469  -1);
2470  }
2471 
2472  // Process the subsections of this section (the individual domains)
2473  KeyList keys;
2474  if (processSections(cf, rtps_sect, keys) != 0) {
2476  ACE_TEXT("(%P|%t) Service_Participant::load_discovery_templates(): ")
2477  ACE_TEXT("too many nesting layers in the [rtps_discovery] section.\n")),
2478  -1);
2479  }
2480 
2481  // store the discovery information
2482  for (KeyList::const_iterator disc_it = keys.begin(); disc_it != keys.end(); ++disc_it) {
2483  DiscoveryInfo dinfo;
2484  dinfo.discovery_name = disc_it->first;
2485 
2486  ValueMap values;
2487  if (pullValues(cf, disc_it->second, values) > 0) {
2488  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2489  // check for customizations
2491  OPENDDS_STRING customization = it->second;
2492  if (DCPS_debug_level > 0) {
2494  ACE_TEXT("(%P|%t) Service_Participant::load_discovery_templates(): ")
2495  ACE_TEXT("loading customizations [%s/%C]\n"),
2497  customization.c_str()));
2498  }
2499 
2500  ACE_Configuration_Section_Key custom_sect;
2501  if (cf.open_section(root, CUSTOMIZATION_SECTION_NAME, false, custom_sect) == 0) {
2502  ValueMap vcm;
2503  if (pullValues(cf, custom_sect, vcm) > 0) {
2505  ACE_TEXT("(%P|%t) Service_Participant::load_discovery_templates(): ")
2506  ACE_TEXT("%s sections must have a subsection name\n"),
2508  -1);
2509  }
2510 
2511  // Process the subsections of the custom section
2512  KeyList keys;
2513  if (processSections(cf, custom_sect, keys) != 0) {
2515  ACE_TEXT("(%P|%t) Service_Participant::load_discovery_templates(): ")
2516  ACE_TEXT("too many nesting layers in the [%s] section.\n"),
2518  -1);
2519  }
2520 
2521  // add customizations to domain range
2522  for (KeyList::const_iterator iter = keys.begin(); iter != keys.end(); ++iter) {
2523  if (customization == iter->first) {
2524  ValueMap values;
2525  pullValues(cf, iter->second, values);
2526  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
2527  dinfo.customizations[it->first] = it->second;
2528  }
2529  }
2530  }
2531  }
2532  } else {
2533  dinfo.disc_info[it->first] = it->second;
2534  }
2535  }
2536  }
2537 
2538  discovery_infos_.push_back(dinfo);
2539  }
2540  }
2541 
2542  // return 0 even if no templates were loaded
2543  return 0;
2544 }
2545 
2546 int Service_Participant::parse_domain_range(const OPENDDS_STRING& range, int& start, int& end) {
2547  const std::size_t dash_pos = range.find("-", 0);
2548 
2549  if (dash_pos == std::string::npos || dash_pos == range.length() - 1) {
2550  start = end = -1;
2552  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2553  ACE_TEXT("%s missing '-' in [%s/%C] section.\n"),
2554  DOMAIN_RANGE_SECTION_NAME, DOMAIN_RANGE_SECTION_NAME, range.c_str()),
2555  -1);
2556  }
2557 
2558  if (!convertToInteger(range.substr(0, dash_pos), start)) {
2559  start = end = -1;
2561  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2562  ACE_TEXT("Illegal integer value for start %s (%C) in [%s/%C] section.\n"),
2563  DOMAIN_RANGE_SECTION_NAME, range.substr(0, dash_pos).c_str(),
2564  DOMAIN_RANGE_SECTION_NAME, range.c_str()),
2565  -1);
2566  }
2567  if (DCPS_debug_level > 0) {
2569  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2570  ACE_TEXT("(%P|%t) [%s/%C]: range_start == %d\n"),
2571  DOMAIN_RANGE_SECTION_NAME,
2572  range.c_str(), start));
2573  }
2574 
2575  if (!convertToInteger(range.substr(dash_pos + 1), end)) {
2577  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2578  ACE_TEXT("Illegal integer value for end %s (%C) in [%s/%C] section.\n"),
2579  DOMAIN_RANGE_SECTION_NAME, range.substr(0, dash_pos).c_str(),
2580  DOMAIN_RANGE_SECTION_NAME, range.c_str()),
2581  -1);
2582  }
2583 
2584  if (DCPS_debug_level > 0) {
2586  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2587  ACE_TEXT("(%P|%t) [%s/%C]: range_end == %d\n"),
2588  DOMAIN_RANGE_SECTION_NAME, range.c_str(), end));
2589  }
2590 
2591  if (end < start) {
2593  ACE_TEXT("(%P|%t) Service_Participant::parse_domain_range(): ")
2594  ACE_TEXT("Range End %d is less than range start %d in [%s/%C] section.\n"),
2595  end, start, DOMAIN_RANGE_SECTION_NAME, range.c_str()),
2596  -1);
2597  }
2598 
2599  return 0;
2600 }
2601 
2602 bool
2604 {
2605  return !domain_ranges_.empty();
2606 }
2607 
2608 bool
2610 {
2611  if (has_domain_range()) {
2612  for (OPENDDS_VECTOR(DomainRange)::iterator it = domain_ranges_.begin();
2613  it != domain_ranges_.end(); ++it) {
2614  if (id >= it->range_start && id <= it->range_end) {
2615  inst.range_start = it->range_start;
2616  inst.range_end = it->range_end;
2617  inst.discovery_template_name = it->discovery_template_name;
2618  inst.transport_config_name = it->transport_config_name;
2619  inst.domain_info = it->domain_info;
2620 
2621  if (DCPS_debug_level > 0) {
2623  ACE_TEXT("(%P|%t) Service_Participant::get_domain_range_info(): ")
2624  ACE_TEXT("Domain %d is in [%s/%d-%d]\n"),
2625  id, DOMAIN_RANGE_SECTION_NAME, it->range_start, it->range_end));
2626  }
2627 
2628  return true;
2629  }
2630  }
2631  }
2632  return false;
2633 }
2634 
2635 bool
2637 {
2638  // get the discovery info
2639  OPENDDS_VECTOR(DiscoveryInfo)::const_iterator dit;
2640  for (dit = discovery_infos_.begin(); dit != discovery_infos_.end(); ++dit) {
2641  if (discovery_name == dit->discovery_name) {
2642  break;
2643  }
2644  }
2645 
2646  if (dit != discovery_infos_.end()) {
2647  // add discovery info to customs
2648  for (ValueMap::const_iterator i = dit->disc_info.begin(); i != dit->disc_info.end(); ++i) {
2649  customs[i->first] = i->second;
2650  if (DCPS_debug_level > 0) {
2652  ACE_TEXT("(%P|%t) Service_Participant::")
2653  ACE_TEXT("process_customizations(): adding config %C=%C\n"),
2654  i->first.c_str(), i->second.c_str()));
2655  }
2656  }
2657 
2658  // update customs valuemap with any customizations
2659  for (ValueMap::const_iterator i = dit->customizations.begin(); i != dit->customizations.end(); ++i) {
2660  if (i->first == "InteropMulticastOverride" && i->second == "AddDomainId") {
2661  OPENDDS_STRING addr = customs["InteropMulticastOverride"];
2662  size_t pos = addr.find_last_of(".");
2663  if (pos != OPENDDS_STRING::npos) {
2664  OPENDDS_STRING custom = addr.substr(pos + 1);
2665  int val = 0;
2666  if (!convertToInteger(custom, val)) {
2668  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
2669  ACE_TEXT("process_customizations(): ")
2670  ACE_TEXT("could not convert %C to integer\n"),
2671  custom.c_str()),
2672  false);
2673  }
2674  val += id;
2675  addr = addr.substr(0, pos);
2676  addr += "." + to_dds_string(val);
2677  } else {
2679  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
2680  ACE_TEXT("process_customizations(): ")
2681  ACE_TEXT("could not AddDomainId for %s\n"),
2682  customs["InteropMulticastOverride"].c_str()),
2683  false);
2684  }
2685 
2686  customs["InteropMulticastOverride"] = addr;
2687  }
2688  }
2689  }
2690 
2691  return true;
2692 }
2693 
2696 {
2697  OpenDDS::DCPS::Discovery::RepoKey configured_name = "rtps_template_instance_";
2698  configured_name += to_dds_string(id);
2699  return configured_name;
2700 }
2701 
2702 bool
2704 {
2705  OPENDDS_VECTOR(DiscoveryInfo)::const_iterator i;
2706  for (i = discovery_infos_.begin(); i != discovery_infos_.end(); ++i) {
2707  if (i->discovery_name == name && !i->customizations.empty()) {
2708  return true;
2709  }
2710  }
2711 
2712  return false;
2713 }
2714 
2715 #if OPENDDS_POOL_ALLOCATOR
2716 void
2717 Service_Participant::configure_pool()
2718 {
2719  if (pool_size_) {
2720  SafetyProfilePool::instance()->configure_pool(pool_size_, pool_granularity_);
2721  SafetyProfilePool::instance()->install();
2722  }
2723 }
2724 #endif
2725 
2726 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2729  DDS::DurabilityQosPolicy const & durability)
2730 {
2731  DDS::DurabilityQosPolicyKind const kind =
2732  durability.kind;
2733 
2734  DataDurabilityCache * cache = 0;
2735 
2736  if (kind == DDS::TRANSIENT_DURABILITY_QOS) {
2737  {
2739 
2740  if (!this->transient_data_cache_) {
2741  this->transient_data_cache_.reset(new DataDurabilityCache(kind));
2742  }
2743  }
2744 
2745  cache = this->transient_data_cache_.get();
2746 
2747  } else if (kind == DDS::PERSISTENT_DURABILITY_QOS) {
2748  {
2750 
2751  try {
2752  if (!this->persistent_data_cache_) {
2753  this->persistent_data_cache_.reset(new DataDurabilityCache(kind,
2754  this->persistent_data_dir_));
2755  }
2756 
2757  } catch (const std::exception& ex) {
2758  if (DCPS_debug_level > 0) {
2760  ACE_TEXT("(%P|%t) WARNING: Service_Participant::get_data_durability_cache ")
2761  ACE_TEXT("failed to create PERSISTENT cache, falling back on ")
2762  ACE_TEXT("TRANSIENT behavior: %C\n"), ex.what()));
2763  }
2764 
2766  }
2767  }
2768 
2769  cache = this->persistent_data_cache_.get();
2770  }
2771 
2772  return cache;
2773 }
2774 #endif
2775 
2776 void
2778 {
2779  if (discovery) {
2781  this->discoveryMap_[discovery->key()] = discovery;
2782  }
2783 }
2784 
2785 void
2787 {
2788  shutdown_listener_ = listener;
2789 }
2790 
2791 const Service_Participant::RepoKeyDiscoveryMap&
2793 {
2794  return this->discoveryMap_;
2795 }
2796 
2797 const Service_Participant::DomainRepoMap&
2799 {
2800  return this->domainRepoMap_;
2801 }
2802 
2804 Service_Participant::create_recorder(DDS::DomainParticipant_ptr participant,
2805  DDS::Topic_ptr a_topic,
2806  const DDS::SubscriberQos& subscriber_qos,
2807  const DDS::DataReaderQos& datareader_qos,
2808  const RecorderListener_rch& a_listener)
2809 {
2810  DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
2811  if (participant_servant)
2812  return participant_servant->create_recorder(a_topic, subscriber_qos, datareader_qos, a_listener, 0);
2813  return 0;
2814 }
2815 
2818 {
2820  RecorderImpl* impl = dynamic_cast<RecorderImpl*>(recorder);
2821  if (impl){
2822  ret = impl->cleanup();
2823  impl->participant()->delete_recorder(recorder);
2824  }
2825  return ret;
2826 }
2827 
2829 Service_Participant::create_replayer(DDS::DomainParticipant_ptr participant,
2830  DDS::Topic_ptr a_topic,
2831  const DDS::PublisherQos& publisher_qos,
2832  const DDS::DataWriterQos& datawriter_qos,
2833  const ReplayerListener_rch& a_listener)
2834 {
2835  DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
2836  if (participant_servant)
2837  return participant_servant->create_replayer(a_topic, publisher_qos, datawriter_qos, a_listener, 0);
2838  return 0;
2839 }
2840 
2843 {
2845  ReplayerImpl* impl = static_cast<ReplayerImpl*>(replayer);
2846  if (impl) {
2847  ret = impl->cleanup();
2848  impl->participant()->delete_replayer(replayer);
2849  }
2850  return ret;
2851 }
2852 
2854  DDS::DomainParticipant_ptr participant,
2855  const char* topic_name,
2856  const char* type_name,
2857  bool type_has_keys,
2858  const DDS::TopicQos& qos,
2859  DDS::TopicListener_ptr a_listener,
2860  DDS::StatusMask mask)
2861 {
2862  DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
2863  if (!participant_servant) {
2864  return 0;
2865  }
2866  return participant_servant->create_typeless_topic(topic_name, type_name, type_has_keys, qos, a_listener, mask);
2867 }
2868 
2870 {
2872 }
2873 
2875 {
2876  return thread_status_manager_;
2877 }
2878 
2880 {
2881  return xtypes_lock_;
2882 }
2883 
2884 #ifdef OPENDDS_NETWORK_CONFIG_MODIFIER
2885 NetworkConfigModifier* Service_Participant::network_config_modifier()
2886 {
2887  return dynamic_cast<NetworkConfigModifier*>(network_config_monitor_.get());
2888 }
2889 #endif
2890 
2893 {
2895 }
2896 
2897 void
2899 {
2901 }
2902 
2905 {
2907 }
2908 
2909 void
2911 {
2913 }
2914 
2916 Service_Participant::get_type_information(DDS::DomainParticipant_ptr participant,
2917  const DDS::BuiltinTopicKey_t& key) const
2918 {
2919  DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
2920  if (participant_servant) {
2921  XTypes::TypeLookupService_rch tls = participant_servant->get_type_lookup_service();
2922  if (tls) {
2923  return tls->get_type_info(key);
2924  }
2925  }
2926 
2927  return XTypes::TypeInformation();
2928 }
2929 
2930 #ifndef OPENDDS_SAFETY_PROFILE
2932  DDS::DomainParticipant_ptr participant, const DDS::BuiltinTopicKey_t& key) const
2933 {
2934  return dynamic_cast<DomainParticipantImpl*>(participant)->get_dynamic_type(type, key);
2935 }
2936 #endif
2937 
2939 Service_Participant::get_type_object(DDS::DomainParticipant_ptr participant,
2940  const XTypes::TypeIdentifier& ti) const
2941 {
2942  DomainParticipantImpl* participant_servant = dynamic_cast<DomainParticipantImpl*>(participant);
2943  if (participant_servant) {
2944  XTypes::TypeLookupService_rch tls = participant_servant->get_type_lookup_service();
2945  if (tls) {
2946  return tls->get_type_object(ti);
2947  }
2948  }
2949 
2950  return XTypes::TypeObject();
2951 }
2952 
2953 void
2955 {
2956  struct NameValue {
2957  const char* name;
2959  };
2960  static const NameValue entries[] = {
2961  {"Normal", Encoding_Normal},
2962  {"WriteOldFormat", Encoding_WriteOldFormat},
2963  {"ReadOldFormat", Encoding_ReadOldFormat},
2964  };
2965  for (size_t i = 0; i < sizeof entries / sizeof entries[0]; ++i) {
2966  if (0 == std::strcmp(entries[i].name, encoding)) {
2967  type_object_encoding(entries[i].value);
2968  return;
2969  }
2970  }
2971  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: Service_Participant::type_object_encoding: "
2972  "invalid encoding %C\n", encoding));
2973 }
2974 
2975 } // namespace DCPS
2976 } // namespace OpenDDS
2977 
static const char * DEFAULT_RTPS
Definition: Discovery.h:86
DDS::GroupDataQosPolicy initial_GroupDataQosPolicy_
UserDataQosPolicy user_data
DDS::ResourceLimitsQosPolicy initial_ResourceLimitsQosPolicy_
#define TheTransportRegistry
PartitionQosPolicy partition
static const ACE_TCHAR DEFAULT_REPO_IOR[]
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
int create_transport_template_instance(DDS::DomainId_t domain, const ACE_TString &config_name)
int fclose(FILE *fp)
int bit_transport_port_
The builtin topic transport port number.
static int priority_min(const Policy, const int scope=ACE_SCOPE_THREAD)
#define OPENDDS_VERSION
Definition: Version.h:16
RcHandle< InternalTopic< NetworkInterfaceAddress > > network_interface_address_topic_
void thread_status_interval(const TimeDuration &thread_status_interval)
ReactorInterceptor_rch interceptor() const
void set_from_string(const char *name)
Definition: debug.cpp:73
Send raw data samples in the system.
Definition: Replayer.h:60
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual int set_string_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, const ACE_TString &value)
ACE_thread_t get_reactor_owner() const
Definition: ReactorTask.inl:29
const char * c_str(void) const
DDS::DataRepresentationQosPolicy initial_DataRepresentationQosPolicy_
static const char * DEFAULT_REPO
Key value for the default repository IOR.
Definition: Discovery.h:85
#define OPENDDS_POOL_ALLOCATOR
Definition: PoolAllocator.h:8
const LogLevel::Value value
Definition: debug.cpp:61
int load_discovery_templates(ACE_Configuration_Heap &cf)
TypeObjectEncoding type_object_encoding() const
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
long scheduler_
Scheduling policy value used for setting thread priorities.
DDS::LatencyBudgetQosPolicy initial_LatencyBudgetQosPolicy_
const TimeSource & time_source() const
int & federation_recovery_duration()
Accessors for FederationRecoveryDuration in seconds.
std::string String
static bool got_global_transport_config
static bool got_liveliness_factor
EntityFactoryQosPolicy entity_factory
void initializeScheduling()
Initialize the thread scheduling and initial priority.
static int priority_max(const Policy, const int scope=ACE_SCOPE_THREAD)
RcHandle< DomainParticipantFactoryImpl > dp_factory_servant_
if(!(yy_init))
GroupDataQosPolicy group_data
OpenDDS::DCPS::Discovery::RepoKey get_discovery_template_instance_name(DDS::DomainId_t id)
DomainRepoMap domainRepoMap_
The DomainId to RepoKey mapping.
LM_INFO
virtual int get_string_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, ACE_TString &value)
DDS::ReturnCode_t delete_recorder(Recorder_ptr recorder)
int priority_min_
Minimum priority value for the current scheduling policy.
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
void set_repo_domain(const DDS::DomainId_t domain, Discovery::RepoKey repo, bool attach_participant=true)
Bind DCPSInfoRepo IORs to domains.
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const int DEFAULT_FEDERATION_INITIAL_BACKOFF_SECONDS
bool monitor_enabled_
Enable Monitor functionality.
DDS::TypeConsistencyEnforcementQosPolicy initial_TypeConsistencyEnforcementQosPolicy_
ACE_TCHAR ** get_TCHAR_argv(void)
static const ACE_TCHAR CUSTOMIZATION_SECTION_NAME[]
DDS::OwnershipQosPolicy initial_OwnershipQosPolicy_
sequence< octet > key
DDS::LifespanQosPolicy initial_LifespanQosPolicy_
static int process_directive(const ACE_TCHAR directive[])
String to_dds_string(unsigned short to_convert)
const ReturnCode_t RETCODE_ALREADY_DELETED
static bool got_publisher_content_filter
#define GET_CONFIG_STRING_VALUE(CF, SECT, KEY, VALUE)
Definition: TransportDefs.h:76
static const ACE_TCHAR REPO_SECTION_NAME[]
int & federation_initial_backoff_seconds()
Accessors for FederationInitialBackoffSeconds.
DDS::WriterDataLifecycleQosPolicy initial_WriterDataLifecycleQosPolicy_
PresentationQosPolicyAccessScopeKind access_scope
NetworkConfigMonitor_rch network_config_monitor_
#define ACE_TEXT_ALWAYS_CHAR(STRING)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
#define ACE_SCOPE_THREAD
const char * c_str() const
#define DDS_DEFAULT_DISCOVERY_METHOD
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
Implementation of Recorder functionality.
Definition: RecorderImpl.h:45
virtual int import_config(const ACE_TCHAR *filename)
static TYPE * instance(const ACE_TCHAR *name)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
DDS::ReaderDataLifecycleQosPolicy initial_ReaderDataLifecycleQosPolicy_
const size_t DEFAULT_CHUNK_MULTIPLIER
virtual const ACE_Configuration_Section_Key & root_section(void) const
void connect(RcHandle< InternalTopic< NetworkInterfaceAddress > > topic)
static bool got_log_verbose
int sleep(u_int seconds)
const ACE_Time_Value & value() const
void domain_default_config(DDS::DomainId_t domain, const TransportConfig_rch &cfg)
static bool got_security_flag
int liveliness_factor_
The propagation delay factor.
int consume_arg(int number=1)
EntityFactoryQosPolicy entity_factory
static bool got_default_discovery
int priority_max_
Maximum priority value for the current scheduling policy.
bool get_domain_range_info(DDS::DomainId_t id, DomainRange &inst)
DomainParticipantImpl * participant()
Definition: RecorderImpl.h:129
virtual int find_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, VALUETYPE &type)
FILE * fopen(const char *filename, const char *mode)
ACE_Recursive_Thread_Mutex maps_lock_
Guard access to the internal maps.
EntityFactoryQosPolicy entity_factory
OPENDDS_STRING get_config_instance_name(DDS::DomainId_t id)
int load_domain_ranges(ACE_Configuration_Heap &cf)
static bool got_persistent_data_dir
Implementation of Replayer functionality.
Definition: ReplayerImpl.h:61
Replayer_ptr create_replayer(DDS::DomainParticipant_ptr participant, DDS::Topic_ptr a_topic, const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos, const ReplayerListener_rch &a_listener)
DDS::TransportPriorityQosPolicy initial_TransportPriorityQosPolicy_
static const ACE_TCHAR DOMAIN_SECTION_NAME[]
EntityFactoryQosPolicy entity_factory
const int BIT_LOOKUP_DURATION_MSEC
virtual Monitor * create_sp_monitor(Service_Participant *sp)
Factory function to create a service participant monitor object.
TransportConfig_rch global_config() const
#define EPERM
bool process_customizations(DDS::DomainId_t id, const OPENDDS_STRING &discovery_name, ValueMap &customs)
DDS::DurabilityQosPolicy initial_DurabilityQosPolicy_
DDS::LivelinessQosPolicy initial_LivelinessQosPolicy_
static const char DEFAULT_PERSISTENT_DATA_DIR[]
DomainParticipantImpl * participant()
Definition: ReplayerImpl.h:161
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
static bool got_chunk_association_multiplier
DDS::DestinationOrderQosPolicy initial_DestinationOrderQosPolicy_
#define OPENDDS_STRING
struct utsname ACE_utsname
const int DEFAULT_FEDERATION_LIVELINESS
static bool got_type_object_encoding
bool bidir_giop_
Enable TAO&#39;s Bidirectional GIOP?
static bool got_debug_level
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
int is_parameter_next(void) const
NetworkAddress default_address_
The default network address to use.
OpenDDS_Dcps_Export void set_DCPS_debug_level(unsigned int lvl)
Definition: debug.cpp:98
DOMAINID_TYPE_NATIVE DomainId_t
int is_anything_left(void) const
#define ACE_SCHED_FIFO
PresentationQosPolicy presentation
LM_DEBUG
static bool got_log_fname
#define ACE_VERSION
bool get_transport_base_config_name(DDS::DomainId_t domainId, ACE_TString &name) const
const CHAR_TYPE * get_the_parameter(const CHAR_TYPE *flag)
typedef OPENDDS_MAP(Discovery::RepoKey, Discovery_rch) RepoKeyDiscoveryMap
For internal OpenDDS Use (needed for monitor code)
static const char * DEFAULT_STATIC
Definition: Discovery.h:87
TimeDuration schedulerQuantum_
Scheduler time slice from configuration file.
DurabilityQosPolicyKind kind
int federation_recovery_duration_
The FederationRecoveryDuration value in seconds.
DDS::DomainParticipantFactory_ptr get_domain_participant_factory(int &argc=zero_argc, ACE_TCHAR *argv[]=0)
unique_ptr< DataDurabilityCache > transient_data_cache_
The TRANSIENT data durability cache.
ACE_TString schedulerString_
Scheduling policy value from configuration file.
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
bool is_empty(void) const
char ACE_TCHAR
void remap_domains(Discovery::RepoKey oldKey, Discovery::RepoKey newKey, bool attach_participant=true)
Rebind a domain from one repository to another.
const size_t DEFAULT_NUM_CHUNKS
int parse_domain_range(const OPENDDS_STRING &range, int &start, int &end)
TransportConfig_rch get_config(const OPENDDS_STRING &name) const
DDS::ReturnCode_t cleanup()
DDS::ReturnCode_t delete_replayer(Replayer_ptr replayer)
DWORD ACE_thread_t
DDS::Duration_t bit_autopurge_disposed_samples_delay() const
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
int federation_liveliness_
This FederationLivelinessDuration.
OPENDDS_VECTOR(DomainRange) domain_ranges_
#define GET_CONFIG_VALUE(CF, SECT, KEY, VALUE, TYPE)
Definition: TransportDefs.h:45
void clr_flags(u_long f)
XTypes::TypeInformation get_type_information(DDS::DomainParticipant_ptr participant, const DDS::BuiltinTopicKey_t &key) const
static bool got_security_debug
static ACE_Allocator * instance(void)
#define GET_CONFIG_TSTRING_VALUE(CF, SECT, KEY, VALUE)
LM_NOTICE
ACE_CString persistent_data_dir_
The PERSISTENT data durability directory.
virtual void initialize()
Initialize the monitor (required to report data)
DDS::DomainParticipantFactoryQos initial_DomainParticipantFactoryQos_
static bool got_thread_status_interval
DDS::ReliabilityQosPolicy initial_ReliabilityQosPolicy_
void parse_flags(const ACE_TCHAR *flags)
Definition: debug.cpp:134
bool log_bits
Definition: Logging.cpp:18
Replayer_ptr create_replayer(DDS::Topic_ptr a_topic, const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos, const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
const ACE_TCHAR * compiler_name(void)
bool set_repo_ior(const char *ior, Discovery::RepoKey key=Discovery::DEFAULT_REPO, bool attach_participant=true)
DurabilityQosPolicyKind
int load_transport_configuration(const OPENDDS_STRING &file_name, ACE_Configuration_Heap &cf)
static bool got_transport_debug_level
int configure_domain_range_instance(DDS::DomainId_t domainId)
DDS::Topic_ptr create_typeless_topic(const char *topic_name, const char *type_name, bool type_has_keys, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener, DDS::StatusMask mask)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
ThreadStatusManager & get_thread_status_manager()
Get the service participant&#39;s thread status manager.
void disconnect(RcHandle< InternalTopic< NetworkInterfaceAddress > > topic)
const int DEFAULT_FEDERATION_BACKOFF_MULTIPLIER
LM_WARNING
static void close()
Close the singleton instance of this class.
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
static bool got_bit_transport_ip
DDS::PartitionQosPolicy initial_PartitionQosPolicy_
static const ACE_TCHAR RTPS_SECTION_NAME[]
bool empty(void) const
static const ACE_TCHAR DOMAIN_RANGE_SECTION_NAME[]
DDS::HistoryQosPolicy initial_HistoryQosPolicy_
const char *const name
Definition: debug.cpp:60
DDS::ReturnCode_t get_dynamic_type(DDS::DynamicType_var &type, DDS::DomainParticipant_ptr participant, const DDS::BuiltinTopicKey_t &key) const
u_int compiler_minor_version(void)
int set(const ACE_INET_Addr &)
ACE_Thread_Mutex & get_static_xtypes_lock()
getter for lock that protects the static initialization of XTypes related data structures ...
void repository_lost(Discovery::RepoKey key)
Failover to a new repository.
DDS::TimeBasedFilterQosPolicy initial_TimeBasedFilterQosPolicy_
ACE_TEXT("TCP_Factory")
int atoi(const char *s)
DDS::Topic_ptr create_typeless_topic(DDS::DomainParticipant_ptr participant, const char *topic_name, const char *type_name, bool type_has_keys, const DDS::TopicQos &qos, DDS::TopicListener_ptr a_listener=0, DDS::StatusMask mask=0)
unique_ptr< Monitor > monitor_
Pointer to the monitor object for this object.
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
int load_transport_templates(ACE_Configuration_Heap &cf)
unsigned long StatusMask
ACE_TString bit_transport_ip_
The builtin topic transport address.
Recorder_ptr create_recorder(DDS::DomainParticipant_ptr participant, DDS::Topic_ptr a_topic, const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos, const RecorderListener_rch &a_listener)
static TYPE * instance(void)
u_int compiler_major_version(void)
static bool got_log_level
void set_shutdown_listener(RcHandle< ShutdownListener > listener)
#define ACE_LOG_MSG
void add_discovery(Discovery_rch discovery)
int last_error(void)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::DomainParticipantQos initial_DomainParticipantQos_
XTypes::TypeLookupService_rch get_type_lookup_service()
OpenDDS_Dcps_Export LogLevel log_level
int load_common_configuration(ACE_Configuration_Heap &cf, const ACE_TCHAR *filename)
bool belongs_to_domain_range(DDS::DomainId_t domainId) const
DDS::ReturnCode_t cleanup()
DDS::DeadlineQosPolicy initial_DeadlineQosPolicy_
static bool got_default_address
DDS::EntityFactoryQosPolicy initial_EntityFactoryQosPolicy_
static bool got_bit_flag
static bool got_bit_lookup_duration_msec
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
ReactorInterceptor_rch interceptor() const
Definition: ReactorTask.inl:65
ACE_Reactor_Timer_Interface * timer()
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
int federation_initial_backoff_seconds_
The FederationInitialBackoffSeconds value.
XTypes::TypeObject get_type_object(DDS::DomainParticipant_ptr participant, const XTypes::TypeIdentifier &ti) const
void default_configuration_file(const ACE_TCHAR *path)
void set_debug_level(unsigned level)
Definition: debug.cpp:182
Discovery_rch get_discovery(const DDS::DomainId_t domain)
Accessor of the Discovery object for a given domain.
static bool got_use_rti_serialization
const ReturnCode_t RETCODE_ERROR
AtomicBool shut_down_
Used to track state of service participant.
int load_domain_configuration(ACE_Configuration_Heap &cf, const ACE_TCHAR *filename)
int configure_discovery_template(DDS::DomainId_t domainId, const OPENDDS_STRING &discovery_name)
static bool got_bit_transport_port
int federation_backoff_multiplier_
This FederationBackoffMultiplier.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ACE_SCHED_OTHER
static TransportRegistry * instance()
Return a singleton instance of this class.
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
Definition: ConfigUtils.cpp:41
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14
DDS::DurabilityServiceQosPolicy initial_DurabilityServiceQosPolicy_
const ReturnCode_t RETCODE_OK
bool is_discovery_template(const OPENDDS_STRING &name)
int cur_arg_strncasecmp(const CHAR_TYPE *flag)
const int DEFAULT_FEDERATION_RECOVERY_DURATION
static bool got_bidir_giop
DDS::Duration_t bit_autopurge_nowriter_samples_delay() const
int open(const ACE_TCHAR *file_name, void *base_address=ACE_DEFAULT_BASE_ADDR, size_t default_map_size=ACE_DEFAULT_CONFIG_SECTION_SIZE)
Recorder_ptr create_recorder(DDS::Topic_ptr a_topic, const DDS::SubscriberQos &subscriber_qos, const DDS::DataReaderQos &datareader_qos, const RecorderListener_rch &a_listener, DDS::StatusMask mask)
const CHAR_TYPE * get_current(void) const
const DomainRepoMap & domainRepoMap() const
#define ACE_ERROR_RETURN(X, Y)
unique_ptr< DataDurabilityCache > persistent_data_cache_
The PERSISTENT data durability cache.
size_type length(void) const
static ACE_TString config_fname(ACE_TEXT(""))
void set_flags(u_long f)
DataRepresentationIdSeq value
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Definition: ConfigUtils.cpp:17
DDS::UserDataQosPolicy initial_UserDataQosPolicy_
The initial values of qos policies.
DDS::OwnershipStrengthQosPolicy initial_OwnershipStrengthQosPolicy_
void initialize()
Initialize default qos.
GroupDataQosPolicy group_data
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
DDS::PresentationQosPolicy initial_PresentationQosPolicy_
ACE_Thread_Mutex xtypes_lock_
Thread mutex used to protect the static initialization of XTypes data structures. ...
int load_discovery_configuration(ACE_Configuration_Heap &cf, const ACE_TCHAR *section_name)
int open_reactor_task(void *, ThreadStatusManager *thread_status_manager=0, const String &name="")
Definition: ReactorTask.cpp:79
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
const char * get_as_string() const
Definition: debug.cpp:87
LM_ERROR
PartitionQosPolicy partition
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
const RepoKeyDiscoveryMap & discoveryMap() const
DataDurabilityCache * get_data_durability_cache(DDS::DurabilityQosPolicy const &durability)
static void close()
Close the singleton instance of this class.
static bool got_pending_timeout
void _tao_print_exception(const char *info, FILE *f=stdout) const
Definition: Exception.cpp:82
int ignore_arg(int number=1)
bool publisher_content_filter_
Allow the publishing side to do content filtering?
void register_discovery_type(const char *section_name, Discovery::Config *cfg)
PresentationQosPolicy presentation
static Service_Participant * instance()
Return a singleton instance of this class.
bool fake_encryption
Disable all encryption for security, even the required builtin encryption.
Definition: debug.h:146
int uname(ACE_utsname *name)
int & federation_backoff_multiplier()
Accessors for FederationBackoffMultiplier.
RcHandle< ShutdownListener > shutdown_listener_
void set_default_discovery(const Discovery::RepoKey &defaultDiscovery)
OpenDDS_Dcps_Export SecurityDebug security_debug
Definition: debug.cpp:32
static const ACE_TCHAR COMMON_SECTION_NAME[]
static bool got_security_fake_encryption
int add(const CHAR_TYPE *next_arg, bool quote_arg=false)
int parse_args(int &argc, ACE_TCHAR *argv[])
#define ACE_SCHED_RR
RepoKeyDiscoveryMap discoveryMap_
The RepoKey to Discovery object mapping.
bool convertToInteger(const String &s, T &value)
u_int compiler_beta_version(void)
int sched_params(const ACE_Sched_Params &, ACE_id_t id=ACE_SELF)
static StaticDiscovery_rch instance()