OpenDDS  Snapshot(2023/04/28-20:55)
TransportRegistry.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #include "TransportRegistry.h"
10 #include "TransportDebug.h"
11 #include "TransportInst.h"
12 #include "TransportExceptions.h"
13 #include "TransportType.h"
14 #include "dds/DCPS/GuidConverter.h"
15 #include "dds/DCPS/Util.h"
17 #include "dds/DCPS/EntityImpl.h"
19 #include <dds/DdsDcpsInfrastructureC.h>
20 
21 
22 #include "ace/Singleton.h"
23 #include "ace/OS_NS_strings.h"
24 #include "ace/Service_Config.h"
25 
26 #if !defined (__ACE_INLINE__)
27 #include "TransportRegistry.inl"
28 #endif /* __ACE_INLINE__ */
29 
30 
31 namespace {
32  /// Used for sorting
33  bool predicate(const OpenDDS::DCPS::TransportInst_rch& lhs,
35  {
36  return lhs->name() < rhs->name();
37  }
38 
39  // transport type to try loading if none are loaded when DCPS attempts to use
40 #ifdef OPENDDS_SAFETY_PROFILE
41  const char FALLBACK_TYPE[] = "rtps_udp";
42 #else
43  const char FALLBACK_TYPE[] = "tcp";
44 #endif
45 }
46 
48 
49 namespace OpenDDS {
50 namespace DCPS {
51 
52 TransportRegistry*
54 {
56 }
57 
58 void
60 {
62 }
63 
64 const char TransportRegistry::DEFAULT_CONFIG_NAME[] = "_OPENDDS_DEFAULT_CONFIG";
65 const char TransportRegistry::DEFAULT_INST_PREFIX[] = "_OPENDDS_";
66 
67 // transport template customizations
68 const OPENDDS_STRING TransportRegistry::CUSTOM_ADD_DOMAIN_TO_IP = "add_domain_id_to_ip_addr";
69 const OPENDDS_STRING TransportRegistry::CUSTOM_ADD_DOMAIN_TO_PORT = "add_domain_id_to_port";
70 
73  , released_(false)
74 {
75  DBG_ENTRY_LVL("TransportRegistry", "TransportRegistry", 6);
77 
78  lib_directive_map_["tcp"] = "dynamic OpenDDS_Tcp Service_Object * OpenDDS_Tcp:_make_TcpLoader()";
79  lib_directive_map_["udp"] = "dynamic OpenDDS_Udp Service_Object * OpenDDS_Udp:_make_UdpLoader()";
80  lib_directive_map_["multicast"] = "dynamic OpenDDS_Multicast Service_Object * OpenDDS_Multicast:_make_MulticastLoader()";
81  lib_directive_map_["rtps_udp"] = "dynamic OpenDDS_Rtps_Udp Service_Object * OpenDDS_Rtps_Udp:_make_RtpsUdpLoader()";
82  lib_directive_map_["shmem"] = "dynamic OpenDDS_Shmem Service_Object * OpenDDS_Shmem:_make_ShmemLoader()";
83 
84  // load_transport_lib() is used for discovery as well:
85  lib_directive_map_["rtps_discovery"] = lib_directive_map_["rtps_udp"];
86  lib_directive_map_["repository"] = "dynamic OpenDDS_InfoRepoDiscovery Service_Object * OpenDDS_InfoRepoDiscovery:_make_IRDiscoveryLoader()";
87 }
88 
89 int
92 {
94 
95  // Create a vector to hold configuration information so we can populate
96  // them after the transports instances are created.
97  typedef std::pair<TransportConfig_rch, OPENDDS_VECTOR(OPENDDS_STRING) > ConfigInfo;
98  OPENDDS_VECTOR(ConfigInfo) configInfoVec;
99 
100  // Record the transport instances created, so we can place them
101  // in the implicit transport configuration for this file.
102  OPENDDS_LIST(TransportInst_rch) instances;
103 
104  ACE_TString sect_name;
105 
106  for (int index = 0;
107  cf.enumerate_sections(root, index, sect_name) == 0;
108  ++index) {
109  if (ACE_OS::strcmp(sect_name.c_str(), TRANSPORT_SECTION_NAME) == 0 ||
111  // found the [transport/*] or [transport_template/*] section,
112  // now iterate through subsections...
114  if (cf.open_section(root, sect_name.c_str(), false, sect) != 0) {
116  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
117  ACE_TEXT("failed to open section %C\n"),
118  sect_name.c_str()),
119  -1);
120  } else {
121  // Ensure there are no properties in this section
122  ValueMap vm;
123  if (pullValues(cf, sect, vm) > 0) {
124  // There are values inside [transport]
126  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
127  ACE_TEXT("transport sections must have a section name\n"),
128  sect_name.c_str()),
129  -1);
130  }
131  // Process the subsections of this section (the individual transport
132  // impls).
133  KeyList keys;
134  if (processSections(cf, sect, keys) != 0) {
136  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
137  ACE_TEXT("too many nesting layers in [%C] section.\n"),
138  sect_name.c_str()),
139  -1);
140  }
141  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
142  ACE_TString transport_id = ACE_TEXT_CHAR_TO_TCHAR(it->first.c_str());
143  ACE_Configuration_Section_Key inst_sect = it->second;
144 
145  ValueMap values;
146  if (pullValues(cf, it->second, values) != 0) {
147  // Get the factory_id for the transport.
148  OPENDDS_STRING transport_type;
149  ValueMap::const_iterator vm_it = values.find("transport_type");
150  if (vm_it != values.end()) {
151  transport_type = vm_it->second;
152  } else {
154  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
155  ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
156  transport_id.c_str()),
157  -1);
158  }
159 
160  // Create the TransportInst object and load the transport
161  // configuration in ACE_Configuration_Heap to the TransportInst
162  // object.
163  const OPENDDS_STRING tid_str = transport_id.c_str() ? ACE_TEXT_ALWAYS_CHAR(transport_id.c_str()) : "";
164  TransportInst_rch inst = create_inst(tid_str, transport_type);
165  if (!inst) {
167  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
168  ACE_TEXT("Unable to create transport instance in [transport/%C] section.\n"),
169  transport_id.c_str()),
170  -1);
171  }
172 
173  instances.push_back(inst);
174  inst->load(cf, inst_sect);
175 
176  // store the transport info
177  TransportEntry entry;
178  entry.transport_name = transport_id;
179  entry.transport_info = values;
180  transports_.push_back(entry);
181  } else {
183  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
184  ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
185  transport_id.c_str()),
186  -1);
187  }
188  }
189  }
190  } else if (ACE_OS::strcmp(sect_name.c_str(), CONFIG_SECTION_NAME) == 0) {
191  // found the [config/*] section, now iterate through subsections...
193  if (cf.open_section(root, sect_name.c_str(), false, sect) != 0) {
195  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
196  ACE_TEXT("failed to open section [%C]\n"),
197  sect_name.c_str()),
198  -1);
199  } else {
200  // Ensure there are no properties in this section
201  ValueMap vm;
202  if (pullValues(cf, sect, vm) > 0) {
203  // There are values inside [config]
205  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
206  ACE_TEXT("config sections must have a section name\n"),
207  sect_name.c_str()),
208  -1);
209  }
210  // Process the subsections of this section (the individual config
211  // impls).
212  KeyList keys;
213  if (processSections(cf, sect, keys) != 0) {
214  // Don't allow multiple layers of nesting ([config/x/y]).
216  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
217  ACE_TEXT("too many nesting layers in [%C] section.\n"),
218  sect_name.c_str()),
219  -1);
220  }
221  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
222  OPENDDS_STRING config_id = it->first;
223 
224  // Create a TransportConfig object.
225  TransportConfig_rch config = create_config(config_id);
226  if (!config) {
228  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
229  ACE_TEXT("Unable to create transport config in [config/%C] section.\n"),
230  config_id.c_str()),
231  -1);
232  }
233 
234  ValueMap values;
235  pullValues(cf, it->second, values);
236 
237  ConfigInfo configInfo;
238  configInfo.first = config;
239  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
240  OPENDDS_STRING name = it->first;
241  OPENDDS_STRING value = it->second;
242  if (name == "transports") {
243  char delim = ',';
244  size_t pos = 0;
245  OPENDDS_STRING token;
246  while ((pos = value.find(delim)) != OPENDDS_STRING::npos) {
247  token = value.substr(0, pos);
248  configInfo.second.push_back(token);
249  value.erase(0, pos + 1);
250  }
251 
252  // does this config specify a transport_template?
253  for (OPENDDS_VECTOR(TransportTemplate)::iterator it = transport_templates_.begin(); it != transport_templates_.end(); ++it) {
254  if (it->transport_template_name == value) {
255  it->config_name = config_id;
256  break;
257  }
258  }
259 
260  // store the config name for the transport entry
261  for (OPENDDS_VECTOR(TransportEntry)::iterator it = transports_.begin(); it != transports_.end(); ++it) {
262  if (!ACE_OS::strcmp(ACE_TEXT_ALWAYS_CHAR(it->transport_name.c_str()), value.c_str())) {
263  it->config_name = ACE_TEXT_CHAR_TO_TCHAR(config_id.c_str());
264  break;
265  }
266  }
267 
268  configInfo.second.push_back(value);
269 
270  configInfoVec.push_back(configInfo);
271  } else if (name == "swap_bytes") {
272  if ((value == "1") || (value == "true")) {
273  config->swap_bytes_ = true;
274  } else if ((value != "0") && (value != "false")) {
276  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
277  ACE_TEXT("Illegal value for swap_bytes (%C) in [config/%C] section.\n"),
278  value.c_str(), config_id.c_str()),
279  -1);
280  }
281  } else if (name == "passive_connect_duration") {
282  if (!convertToInteger(value,
283  config->passive_connect_duration_)) {
285  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
286  ACE_TEXT("Illegal integer value for passive_connect_duration (%C) in [config/%C] section.\n"),
287  value.c_str(), config_id.c_str()),
288  -1);
289  }
290  } else {
292  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
293  ACE_TEXT("Unexpected entry (%C) in [config/%C] section.\n"),
294  name.c_str(), config_id.c_str()),
295  -1);
296  }
297  }
298  if (configInfo.second.empty()) {
300  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
301  ACE_TEXT("No transport instances listed in [config/%C] section.\n"),
302  config_id.c_str()),
303  -1);
304  }
305  }
306  }
307  }
308  }
309 
310  // Populate the configurations with instances
311  for (unsigned int i = 0; i < configInfoVec.size(); ++i) {
312  TransportConfig_rch config = configInfoVec[i].first;
313  OPENDDS_VECTOR(OPENDDS_STRING)& insts = configInfoVec[i].second;
314  for (unsigned int j = 0; j < insts.size(); ++j) {
315  TransportInst_rch inst = get_inst(insts[j]);
316  if (!inst) {
318  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
319  ACE_TEXT("The inst (%C) in [config/%C] section is undefined.\n"),
320  insts[j].c_str(), config->name().c_str()),
321  -1);
322  }
323  config->instances_.push_back(inst);
324  }
325  }
326 
327  // Create and populate the default configuration for this
328  // file with all the instances from this file.
329  if (!instances.empty()) {
330  TransportConfig_rch config = create_config(file_name);
331  if (!config) {
333  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
334  ACE_TEXT("Unable to create default transport config.\n"),
335  file_name.c_str()),
336  -1);
337  }
338  instances.sort(predicate);
339  for (OPENDDS_LIST(TransportInst_rch)::const_iterator it = instances.begin();
340  it != instances.end(); ++it) {
341  config->instances_.push_back(*it);
342  }
343  }
344 
345  return 0;
346 }
347 
348 int
350 {
351  const ACE_Configuration_Section_Key& root = cf.root_section();
352  ACE_Configuration_Section_Key transport_sect;
353 
354  if (cf.open_section(root, TRANSPORT_TEMPLATE_SECTION_NAME, false, transport_sect) != 0) {
355  if (DCPS_debug_level > 0) {
356  // This is not an error if the configuration file does not have
357  // any domain range (sub)section.
359  ACE_TEXT("(%P|%t) NOTICE: TransportRegistry::load_transport_templates(): ")
360  ACE_TEXT("config does not have a [%s] section.\n"),
362  }
363 
364  return 0;
365 
366  } else {
367  if (DCPS_debug_level > 0) {
369  ACE_TEXT("(%P|%t) NOTICE: TransportRegistry::load_transport_templates(): ")
370  ACE_TEXT("config has %s sections.\n"),
372  }
373 
374  // Ensure there are no properties in this section
375  ValueMap vm;
376  if (pullValues(cf, transport_sect, vm) > 0) {
377  // There are values inside [transport_template]
379  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::load_transport_templates(): ")
380  ACE_TEXT("%s sections must have a subsection name\n"),
382  -1);
383  }
384  // Process the subsections of this section (the individual domains)
385  KeyList keys;
386  if (processSections(cf, transport_sect, keys) != 0) {
388  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::load_transport_templates(): ")
389  ACE_TEXT("too many nesting layers in the [%s] section.\n"),
391  -1);
392  }
393 
394  // Loop through the [transport_template/*] sections
395  for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
396  TransportTemplate element;
397  element.instantiate_per_participant = false;
398  element.transport_template_name = it->first;
399 
400  if (DCPS_debug_level > 0) {
402  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_templates(): ")
403  ACE_TEXT("processing [%s/%C]\n"),
405  }
406 
407  ValueMap values;
408  pullValues(cf, it->second, values);
409  OPENDDS_STRING rule;
410  OPENDDS_STRING customization;
411 
412  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
413  OPENDDS_STRING name = it->first;
414  if (name == "instantiation_rule") {
415  rule = it->second;
416  if (rule == "per_participant") {
417  element.instantiate_per_participant = true;
418  }
419  if (DCPS_debug_level > 0) {
420  OPENDDS_STRING flag = element.instantiate_per_participant ? "true" : "false";
422  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_templates(): ")
423  ACE_TEXT("[%s/%C]: instantiantion rule == %C\n"),
424  TRANSPORT_TEMPLATE_SECTION_NAME, element.transport_template_name.c_str(), flag.c_str()));
425  }
426  } else if (name == ACE_TEXT_ALWAYS_CHAR(CUSTOMIZATION_SECTION_NAME)) {
427  customization = it->second;
428  if (DCPS_debug_level > 0) {
430  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_templates(): ")
431  ACE_TEXT("[%s/%C]: customization == %C\n"),
432  TRANSPORT_TEMPLATE_SECTION_NAME, element.transport_template_name.c_str(), customization.c_str()));
433  }
434 
435  ACE_Configuration_Section_Key custom_sect;
436  if (cf.open_section(root, CUSTOMIZATION_SECTION_NAME, false, custom_sect) == 0) {
437  ValueMap vcm;
438 
439  if (pullValues(cf, custom_sect, vcm) > 0) {
441  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::load_transport_templates(): ")
442  ACE_TEXT("%s sections must have a subsection name\n"),
444  -1);
445  }
446 
447  // Process the subsections of the custom section
448  KeyList keys;
449  if (processSections(cf, custom_sect, keys) != 0) {
451  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_templates(): ")
452  ACE_TEXT("too many nesting layers in the [%s] section.\n"),
454  -1);
455  }
456 
457  // add customizations to domain range
458  for (KeyList::const_iterator iter = keys.begin(); iter != keys.end(); ++iter) {
459  if (customization == iter->first) {
460  ValueMap values;
461  pullValues(cf, iter->second, values);
462 
463  for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
464  element.customizations[it->first] = it->second;
465  }
466  }
467  }
468  }
469  } else {
470  element.transport_info[it->first] = it->second;
471  }
472  }
473 
474  transport_templates_.push_back(element);
475  }
476  }
477 
478  return 0;
479 }
480 
481 void
483 {
484  GuardType guard(lock_);
485  if (!load_transport_lib_i(transport_type)) {
487  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_lib: ")
488  ACE_TEXT("could not load transport_type=%C.\n"),
489  transport_type.c_str()));
490  }
491 }
492 
495 {
496  TransportType_rch type;
497  if (find(type_map_, transport_type, type) == 0) {
498  return type;
499  }
500 
501 #if !defined(ACE_AS_STATIC_LIBS)
502  // Attempt to load it.
503  LibDirectiveMap::iterator lib_iter = lib_directive_map_.find(transport_type);
504  if (lib_iter == lib_directive_map_.end()) {
506  ACE_TEXT("(%P|%t) TransportRegistry::load_transport_lib_i: ")
507  ACE_TEXT("no directive for transport_type=%C.\n"),
508  transport_type.c_str()));
509  return type;
510  }
511 
512  ACE_TString directive = ACE_TEXT_CHAR_TO_TCHAR(lib_iter->second.c_str());
513  // Release the lock because the transport will call back into the registry.
515  {
516  ACE_Guard<ACE_Reverse_Lock<LockType> > guard(rev_lock);
517  if (0 != ACE_Service_Config::process_directive(directive.c_str())) {
518  return TransportType_rch();
519  }
520  }
521 #endif
522 
523  find(type_map_, transport_type, type);
524  return type;
525 }
526 
529  const OPENDDS_STRING& transport_type)
530 {
531  GuardType guard(lock_);
532 
533  TransportType_rch type = load_transport_lib_i(transport_type);
534  if (!type) {
536  ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
537  ACE_TEXT("transport_type=%C is not registered.\n"),
538  transport_type.c_str()));
539  return TransportInst_rch();
540  }
541 
542  if (inst_map_.count(name)) {
544  ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
545  ACE_TEXT("name=%C is already in use.\n"),
546  name.c_str()));
547  return TransportInst_rch();
548  }
549  TransportInst_rch inst (type->new_inst(name));
550  inst_map_[name] = inst;
551  return inst;
552 }
553 
554 
557 {
558  GuardType guard(lock_);
559  InstMap::const_iterator found = inst_map_.find(name);
560  if (found != inst_map_.end()) {
561  return found->second;
562  }
563  return TransportInst_rch();
564 }
565 
566 
569 {
570  GuardType guard(lock_);
571 
572  if (config_map_.count(name)) {
574  ACE_TEXT("(%P|%t) TransportRegistry::create_config: ")
575  ACE_TEXT("name=%C is already in use.\n"),
576  name.c_str()));
577  return TransportConfig_rch();
578  }
579 
580  TransportConfig_rch inst (make_rch<TransportConfig>(name));
581  config_map_[name] = inst;
582  return inst;
583 }
584 
585 
588 {
589  GuardType guard(lock_);
590  ConfigMap::const_iterator found = config_map_.find(name);
591  if (found != config_map_.end()) {
592  return found->second;
593  }
594  return TransportConfig_rch();
595 }
596 
597 
598 void
600  DDS::Entity_ptr entity)
601 {
602  if (cfg.is_nil()) {
603  throw Transport::NotFound();
604  }
605  EntityImpl* ei = dynamic_cast<EntityImpl*>(entity);
606  if (!ei) {
607  throw Transport::MiscProblem();
608  }
609 
610  const DDS::DomainId_t domain_id = ei->get_domain_id();
611 
612  // if domain is in a domain range and config is a transport template,
613  // get the correct config.
614  if (TheServiceParticipant->belongs_to_domain_range(domain_id)) {
615  bool found = false;
616  for (OPENDDS_VECTOR(TransportTemplate)::const_iterator i = transport_templates_.begin(); i != transport_templates_.end(); ++i) {
617  if (cfg->name() == i->config_name) {
618  found = true;
619  break;
620  }
621  }
622 
623  if (found)
624  {
625  ACE_TString cfg_name = ACE_TEXT_CHAR_TO_TCHAR(cfg->name().c_str());
626  // create if not already created
627  int ret = create_transport_template_instance(domain_id, cfg_name);
628 
629  if (ret == 0) {
630  // get guid and create unique name for transport instance
631  GUID_t guid = ei->get_id();
632  if (guid == GUID_UNKNOWN) {
634  ACE_TEXT("(%P|%t) TransportRegistry::bind_config: ")
635  ACE_TEXT("GUID_UNKNOWN. Can not bind entity to a domain template instance.\n")));
637  }
638  OPENDDS_STRING transport_inst_name = GuidConverter(guid).uniqueParticipantId();
639  OPENDDS_STRING transport_config_name;
640 
641  if (cfg_name.c_str() != 0) {
642  transport_config_name = ACE_TEXT_ALWAYS_CHAR(cfg_name.c_str());
643  } else {
645  ACE_TEXT("(%P|%t) TransportRegistry::bind_config: ")
646  ACE_TEXT("Config name is null.\n")));
648  }
649 
650  bool success = create_new_transport_instance_for_participant(domain_id, transport_config_name, transport_inst_name);
651 
652  if (success) {
653  // update config
654  TransportConfig_rch new_cfg = get_config(transport_config_name);
655  update_config_template_instance_info(new_cfg->name(), transport_inst_name);
656  ei->transport_config(new_cfg);
657  return;
658  } else {
660  ACE_TEXT("(%P|%t) TransportRegistry::bind_config: ")
661  ACE_TEXT("Failed to create new transport template instance.\n")));
663  }
664  }
665  }
666  }
667 
668  ei->transport_config(cfg);
669 }
670 
671 
672 void
674 {
675  ConfigTemplateToInstanceMap::iterator i = config_template_to_instance_map_.find(config_name);
676  if (i == config_template_to_instance_map_.end()) {
677  if (DCPS_debug_level > 4) {
679  ACE_TEXT("(%P|%t) TransportRegistry::remove_transport_template_instance: ")
680  ACE_TEXT("%C is not a transport template instance.\n"),
681  config_name.c_str()));
682  }
683  return;
684  }
685 
686  const OPENDDS_STRING inst_name = i->second;
687  remove_config(config_name);
688  remove_inst(inst_name);
689 
690  if (DCPS_debug_level > 0) {
692  ACE_TEXT("(%P|%t) DomainParticipantFactoryImpl::delete_participant ")
693  ACE_TEXT("deleted TransportRegistry's dynamically created config %C and instance %C\n"),
694  config_name.c_str(), inst_name.c_str()));
695  }
696 
698 }
699 
700 
703 {
704  DBG_ENTRY_LVL("TransportRegistry", "fix_empty_default", 6);
705  GuardType guard(lock_);
706  if (global_config_.is_nil()
707  || !global_config_->instances_.empty()
709  return global_config_;
710  }
712  load_transport_lib_i(FALLBACK_TYPE);
713  return global_config;
714 }
715 
716 
717 bool
719 {
720  DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);
721  const OPENDDS_STRING name = type->name();
722 
723  GuardType guard(this->lock_);
724  if (type_map_.count(name)) {
725  return false;
726  }
727 
728  type_map_[name] = type;
729 
730  if (name == "rtps_udp") {
731  type_map_["rtps_discovery"] = type;
732  }
733 
734  return true;
735 }
736 
737 bool
739 {
740  // check per_participant
741  TransportTemplate templ;
742  if (get_transport_template_info(ACE_TEXT_CHAR_TO_TCHAR(transport_config_name.c_str()), templ)) {
743  if (!templ.instantiate_per_participant) {
745  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::")
746  ACE_TEXT("create_new_transport_instance_for_participant: ")
747  ACE_TEXT("transport_template missing instantiation_rule=per_participant\n")),
748  false);
749  }
750  }
751 
752  TransportConfig_rch cfg = get_config(transport_config_name);
753 
754  OPENDDS_STRING inst_name = cfg->instances_[0]->name() + "_" + to_dds_string(id) + "_" + transport_instance_name;
755  OPENDDS_STRING config_name = cfg->name() + "_" + to_dds_string(id) + "_" + transport_instance_name;
756 
757  // assign new config and inst names
758  transport_config_name = config_name;
759  transport_instance_name = inst_name;
760 
762  OpenDDS::DCPS::TransportInst_rch inst = create_inst(inst_name, "rtps_udp");
763 
766 
767  ach.open();
768  ach.open_section(ach.root_section(), ACE_TEXT("the_transport_setup"), true, sect_key);
769 
770  if (TheServiceParticipant->belongs_to_domain_range(id) ||
771  config_has_transport_template(ACE_TEXT_CHAR_TO_TCHAR(transport_config_name.c_str()))) {
772  TransportTemplate tr_inst;
773 
774  if (get_transport_template_info(ACE_TEXT_CHAR_TO_TCHAR(cfg->name().c_str()), tr_inst)) {
775  ValueMap customs;
776 
777  if (!process_customizations(id, tr_inst, customs)) {
779  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::")
780  ACE_TEXT("create_new_transport_instance_for_participant ")
781  ACE_TEXT("could not process_customizations\n")),
782  false);
783  }
784 
785  // write
786  for (ValueMap::const_iterator it = customs.begin(); it != customs.end(); ++it) {
787  ach.set_string_value(sect_key, ACE_TEXT_CHAR_TO_TCHAR(it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(it->second.c_str()));
788  }
789  } else {
791  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::")
792  ACE_TEXT("create_new_transport_instance_for_participant ")
793  ACE_TEXT("could not find transport_template for config %C\n"),
794  cfg->name().c_str()),
795  false);
796  }
797  } else {
798  TransportEntry tr_inst;
799  get_transport_info(ACE_TEXT_CHAR_TO_TCHAR(cfg->name().c_str()), tr_inst);
800 
801  for (ValueMap::const_iterator it = tr_inst.transport_info.begin();
802  it != tr_inst.transport_info.end(); ++it) {
803  ach.set_string_value(sect_key, ACE_TEXT_CHAR_TO_TCHAR(it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(it->second.c_str()));
804  if (DCPS_debug_level > 0) {
806  ACE_TEXT("(%P|%t) TransportRegistry::")
807  ACE_TEXT("create_new_transport_entry_for_participant adding %C=%C\n"),
808  it->first.c_str(), it->second.c_str()));
809  }
810  }
811  }
812 
813  inst->load(ach, sect_key);
814  config->instances_.push_back(inst);
815 
816  return true;
817 }
818 
819 void
821 {
822  config_template_to_instance_map_[config_name] = inst_name;
823 }
824 
825 void
827 {
828  DBG_ENTRY_LVL("TransportRegistry", "release", 6);
829  GuardType guard(lock_);
830  released_ = true;
831 
832  for (InstMap::iterator iter = inst_map_.begin(); iter != inst_map_.end(); ++iter) {
833  iter->second->shutdown();
834  }
835 
837  transport_templates_.clear();
838  transports_.clear();
839  type_map_.clear();
840  inst_map_.clear();
841  config_map_.clear();
844 }
845 
846 bool
848 {
849  GuardType guard(lock_);
850  return released_;
851 }
852 
855 {
856  OpenDDS::DCPS::Discovery::RepoKey configured_name = "transport_template_instance_";
857  configured_name += to_dds_string(id);
858  return configured_name;
859 }
860 
863 {
864  OpenDDS::DCPS::Discovery::RepoKey configured_name = "templ_config_";
865  configured_name += to_dds_string(id);
866  return configured_name;
867 }
868 
869 int
871 {
872  OPENDDS_STRING transport_inst_name = get_transport_template_instance_name(domain);
873  OPENDDS_STRING config_inst_name = get_config_instance_name(domain);
874 
875  // has it already been created?
876  ConfigMap::const_iterator i = config_map_.find(config_inst_name);
877  if (i != config_map_.end()) {
878  return 0; // already created
879  }
880 
881  if (has_transport_templates()) {
882  TransportTemplate tr_inst;
883 
884  if (get_transport_template_info(config_name, tr_inst)) {
886  tcf.open();
887  const ACE_Configuration_Section_Key& root = tcf.root_section();
888 
889  // create config
891  tcf.open_section(root, ACE_TEXT("config"), true /* create */, csect);
893  tcf.open_section(csect, ACE_TEXT_CHAR_TO_TCHAR(config_inst_name.c_str()), true /* create */, csub_sect);
894  tcf.set_string_value(csub_sect, ACE_TEXT("transports"), ACE_TEXT_CHAR_TO_TCHAR(transport_inst_name.c_str()));
895 
896  // create matching transport section
898  tcf.open_section(root, ACE_TEXT("transport"), true /* create */, tsect);
900  tcf.open_section(tsect, ACE_TEXT_CHAR_TO_TCHAR(transport_inst_name.c_str()), true /* create */, tsub_sect);
901 
902  ValueMap customs;
903 
904  if (!process_customizations(domain, tr_inst, customs)) {
906  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::")
907  ACE_TEXT("create_transport_template_instance ")
908  ACE_TEXT("could not process_customizations\n")),
909  false);
910  }
911 
912  // write
913  for (ValueMap::const_iterator it = customs.begin(); it != customs.end(); ++it) {
914  tcf.set_string_value(tsub_sect, ACE_TEXT_CHAR_TO_TCHAR(it->first.c_str()), ACE_TEXT_CHAR_TO_TCHAR(it->second.c_str()));
915 
916  if (DCPS_debug_level > 0) {
918  ACE_TEXT("(%P|%t) TransportRegistry::")
919  ACE_TEXT("create_transport_template_instance adding %C=%C\n"),
920  it->first.c_str(), it->second.c_str()));
921  }
922  }
923 
924  // load transport
925  int status = this->load_transport_configuration("transport_config_" + to_dds_string(domain), tcf);
926 
927  if (status != 0) {
929  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::create_transport_template_instance ")
930  ACE_TEXT("load_transport_configuration() returned %d\n"),
931  status),
932  -1);
933  }
934 
935  }
936 
937  }
938 
939  return 0;
940 }
941 
942 bool
944 {
945  for (OPENDDS_VECTOR(TransportTemplate)::const_iterator i = transport_templates_.begin(); i != transport_templates_.end(); ++i) {
946  if (!ACE_OS::strcmp(ACE_TEXT_ALWAYS_CHAR(config_name.c_str()), i->config_name.c_str())) {
947  return true;
948  }
949  }
950 
951  return false;
952 }
953 
954 bool
956 {
957  bool ret = false;
958  if (has_transport_templates()) {
959  for (OPENDDS_VECTOR(TransportTemplate)::const_iterator i = transport_templates_.begin(); i != transport_templates_.end(); ++i) {
960  if (!ACE_OS::strcmp(ACE_TEXT_ALWAYS_CHAR(config_name.c_str()), i->config_name.c_str())) {
961  inst.transport_template_name = i->transport_template_name;
962  inst.config_name = i->config_name;
963  inst.instantiate_per_participant = i->instantiate_per_participant;
964  inst.customizations = i->customizations;
965  inst.transport_info = i->transport_info;
966 
967  ret = true;
968  break;
969  }
970  }
971  }
972 
973  if (DCPS_debug_level > 0) {
975  ACE_TEXT("(%P|%t) TransportRegistry::get_transport_template_info: ")
976  ACE_TEXT("%C config %s\n"),
977  ret ? "found" : "did not find", config_name.c_str()));
978  }
979 
980  return ret;
981 }
982 
983 bool TransportRegistry::process_customizations(const DDS::DomainId_t id, const TransportTemplate& tr_inst, ValueMap& customs)
984 {
985  for (ValueMap::const_iterator it = tr_inst.transport_info.begin();
986  it != tr_inst.transport_info.end(); ++it) {
987  // customization.
988  ValueMap::const_iterator idx = tr_inst.customizations.find(it->first);
989  if (idx != tr_inst.customizations.end()) {
990  OPENDDS_STRING addr = it->second;
991  OPENDDS_STRING custom = idx->second;
992 
993  // check for 'add_domain_id_to_ip_addr' and 'add_domain_id_to_port'
994  bool add_to_ip = false;
995  bool add_to_port = false;
996 
997  OPENDDS_STRING val1;
998  OPENDDS_STRING val2;
999 
1000  size_t comma_pos = custom.find(',');
1001  if (comma_pos != OPENDDS_STRING::npos) {
1002  val1 = custom.substr(0, comma_pos);
1003  // remove spaces
1004  val1.erase(std::remove(val1.begin(), val1.end(), ' '), val1.end());
1005 
1006  val2 = custom.substr(comma_pos + 1);
1007  val2.erase(std::remove(val2.begin(), val2.end(), ' '), val2.end());
1008 
1009  add_to_ip = (val1 == CUSTOM_ADD_DOMAIN_TO_IP || val2 == CUSTOM_ADD_DOMAIN_TO_IP);
1010  add_to_port = (val1 == CUSTOM_ADD_DOMAIN_TO_PORT || val2 == CUSTOM_ADD_DOMAIN_TO_PORT);
1011 
1012  } else {
1013  custom.erase(std::remove(custom.begin(), custom.end(), ' '), custom.end());
1014 
1015  add_to_ip = (custom == CUSTOM_ADD_DOMAIN_TO_IP);
1016  add_to_port = (custom == CUSTOM_ADD_DOMAIN_TO_PORT);
1017  }
1018 
1019  if (!add_to_ip && !add_to_port) {
1021  ACE_TEXT("(%P|%t) TransportRegistry::process_customizations: ")
1022  ACE_TEXT("%C customization is not supported. Supported values are %C and %C\n"),
1023  custom.c_str(), CUSTOM_ADD_DOMAIN_TO_IP.c_str(), CUSTOM_ADD_DOMAIN_TO_PORT.c_str()),
1024  false);
1025  }
1026 
1027  // only add_domain_id_to_ip_addr and add_domain_id_to_port are supported at this time.
1028  if (add_to_ip) {
1029  size_t pos = addr.find_last_of(".");
1030  if (pos != OPENDDS_STRING::npos) {
1031  OPENDDS_STRING custom = addr.substr(pos + 1);
1032 
1033  OPENDDS_STRING port = "";
1034  OPENDDS_STRING last_octet = "";
1035 
1036  size_t cpos = custom.find(":");
1037  if (cpos != OPENDDS_STRING::npos) {
1038  port = custom.substr(cpos);
1039  last_octet = custom.substr(0, cpos);
1040  } else {
1041  last_octet = custom;
1042  }
1043  int val = 0;
1044 
1045  if (!convertToInteger(last_octet, val)) {
1047  ACE_TEXT("(%P|%t) ERROR: TransportRegistry::")
1048  ACE_TEXT("process_customizations ")
1049  ACE_TEXT("could not convert %C to integer\n"),
1050  custom.c_str()),
1051  false);
1052  }
1053 
1054  val += id;
1055  addr = addr.substr(0, pos);
1056  addr += "." + to_dds_string(val);
1057  if (port != "") {
1058  addr += port;
1059  }
1060  } else {
1062  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
1063  ACE_TEXT("process_customizations ")
1064  ACE_TEXT("could not add_domain_id_to_ip_addr for address %C\n"),
1065  idx->second.c_str()),
1066  false);
1067  }
1068 
1069  if (DCPS_debug_level > 0) {
1071  ACE_TEXT("(%P|%t) TransportRegistry::")
1072  ACE_TEXT("process_customizations processing add_domain_id_to_ip_addr: %C=%C\n"),
1073  it->first.c_str(), addr.c_str()));
1074  }
1075  }
1076 
1077  if (add_to_port) {
1078  size_t pos = addr.find_last_of(":");
1079  if (pos == OPENDDS_STRING::npos) {
1080  // use default port + domainId. See 9.6.1.3 in the RTPS 2.2 protocol specification.
1081  const int PB = 7400;
1082  const int DG = 250;
1083  const int D2 = 1;
1084  int rtpsPort = PB + DG * id + D2;
1085  rtpsPort += id;
1086  addr += ":" + to_dds_string(rtpsPort);
1087  } else {
1088  // address has a port supplied
1089  int rtpsPort = -1;
1090  if (convertToInteger(addr.substr(pos + 1), rtpsPort)) {
1091  addr = addr.substr(0, pos);
1092  rtpsPort += id;
1093  addr += ":" + to_dds_string(rtpsPort);
1094  } else {
1096  ACE_TEXT("(%P|%t) ERROR: Service_Participant::")
1097  ACE_TEXT("process_customizations ")
1098  ACE_TEXT("could not add_domain_id_to_port for %C.\n"),
1099  idx->second.c_str()),
1100  false);
1101  }
1102 
1103  if (DCPS_debug_level > 0) {
1105  ACE_TEXT("(%P|%t) TransportRegistry::")
1106  ACE_TEXT("process_customizations processing add_domain_id_to_port: %C ==> %C\n"),
1107  it->first.c_str(), addr.c_str()));
1108  }
1109 
1110  }
1111 
1112  if (DCPS_debug_level > 0) {
1114  ACE_TEXT("(%P|%t) TransportRegistry::")
1115  ACE_TEXT("process_customizations processing add_domain_id_to_port: %C=%C\n"),
1116  it->first.c_str(), addr.c_str()));
1117  }
1118  }
1119 
1120  customs[idx->first] = addr.c_str();
1121  } else {
1122  customs[it->first] = it->second.c_str();
1123  if (DCPS_debug_level > 0) {
1125  ACE_TEXT("(%P|%t) TransportRegistry::")
1126  ACE_TEXT("process_customizations adding %C=%C\n"),
1127  it->first.c_str(), it->second.c_str()));
1128  }
1129  }
1130  }
1131 
1132  return true;
1133 }
1134 
1136 {
1137  return !transport_templates_.empty();
1138 }
1139 
1140 bool
1142 {
1143  bool ret = false;
1144  if (has_transports()) {
1145  for (OPENDDS_VECTOR(TransportEntry)::const_iterator i = transports_.begin(); i != transports_.end(); ++i) {
1146  if (!ACE_OS::strcmp(ACE_TEXT_ALWAYS_CHAR(config_name.c_str()), ACE_TEXT_ALWAYS_CHAR(i->config_name.c_str()))) {
1147  inst.transport_name = i->transport_name;
1148  inst.config_name = i->config_name;
1149  inst.transport_info = i->transport_info;
1150 
1151  ret = true;
1152  break;
1153  }
1154  }
1155  }
1156 
1157  if (DCPS_debug_level > 0) {
1159  ACE_TEXT("(%P|%t) TransportRegistry::get_transport_info: ")
1160  ACE_TEXT("%C config %s\n"),
1161  ret ? "found" : "did not find", config_name.c_str()));
1162  }
1163 
1164  return ret;
1165 }
1166 
1168 {
1169  return !transports_.empty();
1170 }
1171 
1172 }
1173 }
1174 
TransportInst_rch get_inst(const OPENDDS_STRING &name) const
#define ACE_DEBUG(X)
int create_transport_template_instance(DDS::DomainId_t domain, const ACE_TString &config_name)
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
bool create_new_transport_instance_for_participant(DDS::DomainId_t id, OPENDDS_STRING &transport_config_name, OPENDDS_STRING &transport_instance_name)
#define ACE_ERROR(X)
OPENDDS_STRING get_transport_template_instance_name(DDS::DomainId_t id)
virtual int set_string_value(const ACE_Configuration_Section_Key &key, const ACE_TCHAR *name, const ACE_TString &value)
const char * c_str(void) const
const LogLevel::Value value
Definition: debug.cpp:61
void remove_inst(const TransportInst_rch &inst)
void load_transport_lib(const OPENDDS_STRING &transport_type)
void update_config_template_instance_info(const OPENDDS_STRING &config_name, const OPENDDS_STRING &inst_name)
void remove_transport_template_instance(const OPENDDS_STRING &config_name)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
static const ACE_TCHAR CUSTOMIZATION_SECTION_NAME[]
static int process_directive(const ACE_TCHAR directive[])
String to_dds_string(unsigned short to_convert)
static const char DEFAULT_CONFIG_NAME[]
#define ACE_TEXT_ALWAYS_CHAR(STRING)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
const OPENDDS_STRING & name() const
Definition: TransportInst.h:70
virtual const ACE_Configuration_Section_Key & root_section(void) const
RcHandle< TransportConfig > TransportConfig_rch
OPENDDS_STRING get_config_instance_name(DDS::DomainId_t id)
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
bool config_has_transport_template(const ACE_TString &config_name) const
void bind_config(const OPENDDS_STRING &name, DDS::Entity_ptr entity)
Conversion processing and value testing utilities for RTPS GUID_t types.
Definition: GuidConverter.h:62
bool get_transport_template_info(const ACE_TString &config_name, TransportTemplate &inst)
static TYPE * instance(void)
TransportConfig_rch global_config() const
ConfigTemplateToInstanceMap config_template_to_instance_map_
#define OPENDDS_STRING
virtual void transport_config(const TransportConfig_rch &cfg)
Definition: EntityImpl.cpp:122
bool get_transport_info(const ACE_TString &config_name, TransportEntry &inst)
DOMAINID_TYPE_NATIVE DomainId_t
LM_DEBUG
TransportType_rch load_transport_lib_i(const OPENDDS_STRING &transport_type)
virtual GUID_t get_id() const
Definition: EntityImpl.h:54
TransportConfig_rch get_config(const OPENDDS_STRING &name) const
static const ACE_TCHAR CONFIG_SECTION_NAME[]
static const ACE_TCHAR TRANSPORT_SECTION_NAME[]
RcHandle< TransportType > TransportType_rch
LM_NOTICE
int load_transport_configuration(const OPENDDS_STRING &file_name, ACE_Configuration_Heap &cf)
void remove_config(const TransportConfig_rch &cfg)
static void close()
Close the singleton instance of this class.
TransportInst_rch create_inst(const OPENDDS_STRING &name, const OPENDDS_STRING &transport_type)
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
TransportConfig_rch create_config(const OPENDDS_STRING &name)
OPENDDS_STRING uniqueParticipantId() const
OPENDDS_STRING name() const
int strcmp(const char *s, const char *t)
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
int load_transport_templates(ACE_Configuration_Heap &cf)
TransportConfig_rch fix_empty_default()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual int load(ACE_Configuration_Heap &cf, ACE_Configuration_Section_Key &sect)
virtual DDS::DomainId_t get_domain_id()
Definition: EntityImpl.h:52
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual int open_section(const ACE_Configuration_Section_Key &base, const ACE_TCHAR *sub_section, bool create, ACE_Configuration_Section_Key &result)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
static const char DEFAULT_INST_PREFIX[]
static TransportRegistry * instance()
Return a singleton instance of this class.
int processSections(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, KeyList &subsections)
Definition: ConfigUtils.cpp:41
int remove(Container &c, const ValueType &v)
Definition: Util.h:121
int open(const ACE_TCHAR *file_name, void *base_address=ACE_DEFAULT_BASE_ADDR, size_t default_map_size=ACE_DEFAULT_CONFIG_SECTION_SIZE)
OPENDDS_VECTOR(TransportTemplate) transport_templates_
virtual int enumerate_sections(const ACE_Configuration_Section_Key &key, int index, ACE_TString &name)
static const ACE_TCHAR TRANSPORT_TEMPLATE_SECTION_NAME[]
#define ACE_ERROR_RETURN(X, Y)
static void close(void)
int pullValues(ACE_Configuration_Heap &cf, const ACE_Configuration_Section_Key &key, ValueMap &values)
Definition: ConfigUtils.cpp:17
#define TheServiceParticipant
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
static const OPENDDS_STRING CUSTOM_ADD_DOMAIN_TO_PORT
typedef OPENDDS_LIST(SubsectionPair) KeyList
bool register_type(const TransportType_rch &type)
bool convertToInteger(const String &s, T &value)
bool process_customizations(const DDS::DomainId_t id, const TransportTemplate &tr_inst, ValueMap &customs)
static const OPENDDS_STRING CUSTOM_ADD_DOMAIN_TO_IP
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71