TransportRegistry.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportRegistry.h"
00010 #include "TransportDebug.h"
00011 #include "TransportInst.h"
00012 #include "TransportExceptions.h"
00013 #include "TransportType.h"
00014 #include "dds/DCPS/Util.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/EntityImpl.h"
00017 #include "dds/DCPS/ConfigUtils.h"
00018 #include "dds/DCPS/SafetyProfileStreams.h"
00019 
00020 #include "ace/Singleton.h"
00021 #include "ace/OS_NS_strings.h"
00022 #include "ace/Service_Config.h"
00023 
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportRegistry.inl"
00026 #endif /* __ACE_INLINE__ */
00027 
00028 
00029 namespace {
00030   const ACE_TString OLD_TRANSPORT_PREFIX = ACE_TEXT("transport_");
00031 
00032   /// Used for sorting
00033   bool predicate(const OpenDDS::DCPS::TransportInst_rch& lhs,
00034                  const OpenDDS::DCPS::TransportInst_rch& rhs)
00035   {
00036     return lhs->name() < rhs->name();
00037   }
00038 
00039   // transport type to try loading if none are loaded when DCPS attempts to use
00040 #ifdef OPENDDS_SAFETY_PROFILE
00041   const char FALLBACK_TYPE[] = "rtps_udp";
00042 #else
00043   const char FALLBACK_TYPE[] = "tcp";
00044 #endif
00045 }
00046 
00047 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00048 
00049 namespace OpenDDS {
00050 namespace DCPS {
00051 
00052 TransportRegistry*
00053 TransportRegistry::instance()
00054 {
00055   return ACE_Unmanaged_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::instance();
00056 }
00057 
00058 void
00059 TransportRegistry::close()
00060 {
00061   ACE_Unmanaged_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::close();
00062 }
00063 
00064 const char TransportRegistry::DEFAULT_CONFIG_NAME[] = "_OPENDDS_DEFAULT_CONFIG";
00065 const char TransportRegistry::DEFAULT_INST_PREFIX[] = "_OPENDDS_";
00066 
00067 TransportRegistry::TransportRegistry()
00068   : global_config_(make_rch<TransportConfig>(DEFAULT_CONFIG_NAME))
00069   , released_(false)
00070 {
00071   DBG_ENTRY_LVL("TransportRegistry", "TransportRegistry", 6);
00072   config_map_[DEFAULT_CONFIG_NAME] = global_config_;
00073 
00074   lib_directive_map_["tcp"]       = "dynamic OpenDDS_Tcp Service_Object * OpenDDS_Tcp:_make_TcpLoader()";
00075   lib_directive_map_["udp"]       = "dynamic OpenDDS_Udp Service_Object * OpenDDS_Udp:_make_UdpLoader()";
00076   lib_directive_map_["multicast"] = "dynamic OpenDDS_Multicast Service_Object * OpenDDS_Multicast:_make_MulticastLoader()";
00077   lib_directive_map_["rtps_udp"]  = "dynamic OpenDDS_Rtps_Udp Service_Object * OpenDDS_Rtps_Udp:_make_RtpsUdpLoader()";
00078   lib_directive_map_["shmem"]     = "dynamic OpenDDS_Shmem Service_Object * OpenDDS_Shmem:_make_ShmemLoader()";
00079 
00080   // load_transport_lib() is used for discovery as well:
00081   lib_directive_map_["rtps_discovery"] = lib_directive_map_["rtps_udp"];
00082   lib_directive_map_["repository"] = "dynamic OpenDDS_InfoRepoDiscovery Service_Object * OpenDDS_InfoRepoDiscovery:_make_IRDiscoveryLoader()";
00083 }
00084 
00085 int
00086 TransportRegistry::load_transport_configuration(const OPENDDS_STRING& file_name,
00087                                                 ACE_Configuration_Heap& cf)
00088 {
00089   const ACE_Configuration_Section_Key& root = cf.root_section();
00090 
00091   // Create a vector to hold configuration information so we can populate
00092   // them after the transports instances are created.
00093   typedef std::pair<TransportConfig_rch, OPENDDS_VECTOR(OPENDDS_STRING) > ConfigInfo;
00094   OPENDDS_VECTOR(ConfigInfo) configInfoVec;
00095 
00096   // Record the transport instances created, so we can place them
00097   // in the implicit transport configuration for this file.
00098   OPENDDS_LIST(TransportInst_rch) instances;
00099 
00100   ACE_TString sect_name;
00101 
00102   for (int index = 0;
00103        cf.enumerate_sections(root, index, sect_name) == 0;
00104        ++index) {
00105     if (ACE_OS::strcmp(sect_name.c_str(), TRANSPORT_SECTION_NAME) == 0) {
00106       // found the [transport/*] section, now iterate through subsections...
00107       ACE_Configuration_Section_Key sect;
00108       if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00109         ACE_ERROR_RETURN((LM_ERROR,
00110                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00111                           ACE_TEXT("failed to open section %s\n"),
00112                           sect_name.c_str()),
00113                          -1);
00114       } else {
00115         // Ensure there are no properties in this section
00116         ValueMap vm;
00117         if (pullValues(cf, sect, vm) > 0) {
00118           // There are values inside [transport]
00119           ACE_ERROR_RETURN((LM_ERROR,
00120                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00121                             ACE_TEXT("transport sections must have a section name\n"),
00122                             sect_name.c_str()),
00123                            -1);
00124         }
00125         // Process the subsections of this section (the individual transport
00126         // impls).
00127         KeyList keys;
00128         if (processSections(cf, sect, keys) != 0) {
00129           ACE_ERROR_RETURN((LM_ERROR,
00130                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00131                             ACE_TEXT("too many nesting layers in [%s] section.\n"),
00132                             sect_name.c_str()),
00133                            -1);
00134         }
00135         for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00136           OPENDDS_STRING transport_id = it->first;
00137           ACE_Configuration_Section_Key inst_sect = it->second;
00138 
00139           ValueMap values;
00140           if (pullValues(cf, it->second, values) != 0) {
00141             // Get the factory_id for the transport.
00142             OPENDDS_STRING transport_type;
00143             ValueMap::const_iterator vm_it = values.find("transport_type");
00144             if (vm_it != values.end()) {
00145               transport_type = vm_it->second;
00146             } else {
00147               ACE_ERROR_RETURN((LM_ERROR,
00148                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00149                                 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00150                                 transport_id.c_str()),
00151                                -1);
00152             }
00153             // Create the TransportInst object and load the transport
00154             // configuration in ACE_Configuration_Heap to the TransportInst
00155             // object.
00156             TransportInst_rch inst = create_inst(transport_id, transport_type);
00157             if (!inst) {
00158               ACE_ERROR_RETURN((LM_ERROR,
00159                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00160                                 ACE_TEXT("Unable to create transport instance in [transport/%C] section.\n"),
00161                                 transport_id.c_str()),
00162                                -1);
00163             }
00164             instances.push_back(inst);
00165             inst->load(cf, inst_sect);
00166           } else {
00167             ACE_ERROR_RETURN((LM_ERROR,
00168                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00169                               ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00170                               transport_id.c_str()),
00171                              -1);
00172           }
00173         }
00174       }
00175     } else if (ACE_OS::strcmp(sect_name.c_str(), CONFIG_SECTION_NAME) == 0) {
00176       // found the [config/*] section, now iterate through subsections...
00177       ACE_Configuration_Section_Key sect;
00178       if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00179         ACE_ERROR_RETURN((LM_ERROR,
00180                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00181                           ACE_TEXT("failed to open section [%s]\n"),
00182                           sect_name.c_str()),
00183                          -1);
00184       } else {
00185         // Ensure there are no properties in this section
00186         ValueMap vm;
00187         if (pullValues(cf, sect, vm) > 0) {
00188           // There are values inside [config]
00189           ACE_ERROR_RETURN((LM_ERROR,
00190                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00191                             ACE_TEXT("config sections must have a section name\n"),
00192                             sect_name.c_str()),
00193                            -1);
00194         }
00195         // Process the subsections of this section (the individual config
00196         // impls).
00197         KeyList keys;
00198         if (processSections(cf, sect, keys) != 0) {
00199           // Don't allow multiple layers of nesting ([config/x/y]).
00200           ACE_ERROR_RETURN((LM_ERROR,
00201                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00202                             ACE_TEXT("too many nesting layers in [%s] section.\n"),
00203                             sect_name.c_str()),
00204                            -1);
00205         }
00206         for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00207           OPENDDS_STRING config_id = it->first;
00208 
00209           // Create a TransportConfig object.
00210           TransportConfig_rch config = create_config(config_id);
00211           if (!config) {
00212             ACE_ERROR_RETURN((LM_ERROR,
00213                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00214                               ACE_TEXT("Unable to create transport config in [config/%C] section.\n"),
00215                               config_id.c_str()),
00216                              -1);
00217           }
00218 
00219           ValueMap values;
00220           pullValues(cf, it->second, values);
00221 
00222           ConfigInfo configInfo;
00223           configInfo.first = config;
00224           for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00225             OPENDDS_STRING name = it->first;
00226             OPENDDS_STRING value = it->second;
00227             if (name == "transports") {
00228               char delim = ',';
00229               size_t pos = 0;
00230               OPENDDS_STRING token;
00231               while ((pos = value.find(delim)) != OPENDDS_STRING::npos) {
00232                 token = value.substr(0, pos);
00233                 configInfo.second.push_back(token);
00234                 value.erase(0, pos + 1);
00235               }
00236               configInfo.second.push_back(value);
00237 
00238               configInfoVec.push_back(configInfo);
00239             } else if (name == "swap_bytes") {
00240               if ((value == "1") || (value == "true")) {
00241                 config->swap_bytes_ = true;
00242               } else if ((value != "0") && (value != "false")) {
00243                 ACE_ERROR_RETURN((LM_ERROR,
00244                                   ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00245                                   ACE_TEXT("Illegal value for swap_bytes (%C) in [config/%C] section.\n"),
00246                                   value.c_str(), config_id.c_str()),
00247                                  -1);
00248               }
00249             } else if (name == "passive_connect_duration") {
00250               if (!convertToInteger(value,
00251                                     config->passive_connect_duration_)) {
00252                 ACE_ERROR_RETURN((LM_ERROR,
00253                                   ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00254                                   ACE_TEXT("Illegal integer value for passive_connect_duration (%s) in [config/%C] section.\n"),
00255                                   value.c_str(), config_id.c_str()),
00256                                  -1);
00257               }
00258             } else {
00259               ACE_ERROR_RETURN((LM_ERROR,
00260                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00261                                 ACE_TEXT("Unexpected entry (%C) in [config/%C] section.\n"),
00262                                 name.c_str(), config_id.c_str()),
00263                                -1);
00264             }
00265           }
00266           if (configInfo.second.empty()) {
00267             ACE_ERROR_RETURN((LM_ERROR,
00268                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00269                               ACE_TEXT("No transport instances listed in [config/%C] section.\n"),
00270                               config_id.c_str()),
00271                              -1);
00272           }
00273         }
00274       }
00275     } else if (ACE_OS::strncmp(sect_name.c_str(), OLD_TRANSPORT_PREFIX.c_str(),
00276                                OLD_TRANSPORT_PREFIX.length()) == 0) {
00277       ACE_ERROR_RETURN((LM_ERROR,
00278                         ACE_TEXT("(%P|%t) ERROR: ")
00279                         ACE_TEXT("Obsolete transport configuration found (%s).\n"),
00280                         sect_name.c_str()),
00281                        -1);
00282     }
00283   }
00284 
00285   // Populate the configurations with instances
00286   for (unsigned int i = 0; i < configInfoVec.size(); ++i) {
00287     TransportConfig_rch config = configInfoVec[i].first;
00288     OPENDDS_VECTOR(OPENDDS_STRING)& insts = configInfoVec[i].second;
00289     for (unsigned int j = 0; j < insts.size(); ++j) {
00290       TransportInst_rch inst = get_inst(insts[j]);
00291       if (!inst) {
00292         ACE_ERROR_RETURN((LM_ERROR,
00293                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00294                           ACE_TEXT("The inst (%C) in [config/%C] section is undefined.\n"),
00295                           insts[j].c_str(), config->name().c_str()),
00296                          -1);
00297       }
00298       config->instances_.push_back(inst);
00299     }
00300   }
00301 
00302   // Create and populate the default configuration for this
00303   // file with all the instances from this file.
00304   if (!instances.empty()) {
00305     TransportConfig_rch config = create_config(file_name);
00306     if (!config) {
00307       ACE_ERROR_RETURN((LM_ERROR,
00308                         ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00309                         ACE_TEXT("Unable to create default transport config.\n"),
00310                         file_name.c_str()),
00311                        -1);
00312     }
00313     instances.sort(predicate);
00314     for (OPENDDS_LIST(TransportInst_rch)::const_iterator it = instances.begin();
00315          it != instances.end(); ++it) {
00316       config->instances_.push_back(*it);
00317     }
00318   }
00319 
00320   return 0;
00321 }
00322 
00323 void
00324 TransportRegistry::load_transport_lib(const OPENDDS_STRING& transport_type)
00325 {
00326   ACE_UNUSED_ARG(transport_type);
00327 #if !defined(ACE_AS_STATIC_LIBS)
00328   GuardType guard(lock_);
00329   LibDirectiveMap::iterator lib_iter = lib_directive_map_.find(transport_type);
00330   if (lib_iter != lib_directive_map_.end()) {
00331     ACE_TString directive = ACE_TEXT_CHAR_TO_TCHAR(lib_iter->second.c_str());
00332     // Release the lock, because loading a transport library will
00333     // recursively call this function to add its default inst.
00334     guard.release();
00335     ACE_Service_Config::process_directive(directive.c_str());
00336   }
00337 #endif
00338 }
00339 
00340 TransportInst_rch
00341 TransportRegistry::create_inst(const OPENDDS_STRING& name,
00342                                const OPENDDS_STRING& transport_type)
00343 {
00344   GuardType guard(lock_);
00345   TransportType_rch type;
00346 
00347   if (find(type_map_, transport_type, type) != 0) {
00348 #if !defined(ACE_AS_STATIC_LIBS)
00349     guard.release();
00350     // Not present, try to load library
00351     load_transport_lib(transport_type);
00352     guard.acquire();
00353 
00354     // Try to find it again
00355     if (find(type_map_, transport_type, type) != 0) {
00356 #endif
00357       ACE_ERROR((LM_ERROR,
00358                  ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00359                  ACE_TEXT("transport_type=%C is not registered.\n"),
00360                  transport_type.c_str()));
00361       return TransportInst_rch();
00362 #if !defined(ACE_AS_STATIC_LIBS)
00363     }
00364 #endif
00365   }
00366 
00367   if (inst_map_.count(name)) {
00368     ACE_ERROR((LM_ERROR,
00369                ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00370                ACE_TEXT("name=%C is already in use.\n"),
00371                name.c_str()));
00372     return TransportInst_rch();
00373   }
00374   TransportInst_rch inst (type->new_inst(name));
00375   inst_map_[name] = inst;
00376   return inst;
00377 }
00378 
00379 
00380 TransportInst_rch
00381 TransportRegistry::get_inst(const OPENDDS_STRING& name) const
00382 {
00383   GuardType guard(lock_);
00384   InstMap::const_iterator found = inst_map_.find(name);
00385   if (found != inst_map_.end()) {
00386     return found->second;
00387   }
00388   return TransportInst_rch();
00389 }
00390 
00391 
00392 TransportConfig_rch
00393 TransportRegistry::create_config(const OPENDDS_STRING& name)
00394 {
00395   GuardType guard(lock_);
00396 
00397   if (config_map_.count(name)) {
00398     ACE_ERROR((LM_ERROR,
00399                ACE_TEXT("(%P|%t) TransportRegistry::create_config: ")
00400                ACE_TEXT("name=%C is already in use.\n"),
00401                name.c_str()));
00402     return TransportConfig_rch();
00403   }
00404 
00405   TransportConfig_rch inst  (make_rch<TransportConfig>(name));
00406   config_map_[name] = inst;
00407   return inst;
00408 }
00409 
00410 
00411 TransportConfig_rch
00412 TransportRegistry::get_config(const OPENDDS_STRING& name) const
00413 {
00414   GuardType guard(lock_);
00415   ConfigMap::const_iterator found = config_map_.find(name);
00416   if (found != config_map_.end()) {
00417     return found->second;
00418   }
00419   return TransportConfig_rch();
00420 }
00421 
00422 
00423 void
00424 TransportRegistry::bind_config(const TransportConfig_rch& cfg,
00425                                DDS::Entity_ptr entity)
00426 {
00427   if (cfg.is_nil()) {
00428     throw Transport::NotFound();
00429   }
00430   EntityImpl* ei = dynamic_cast<EntityImpl*>(entity);
00431   if (!ei) {
00432     throw Transport::MiscProblem();
00433   }
00434   ei->transport_config(cfg);
00435 }
00436 
00437 
00438 TransportConfig_rch
00439 TransportRegistry::fix_empty_default()
00440 {
00441   DBG_ENTRY_LVL("TransportRegistry", "fix_empty_default", 6);
00442   GuardType guard(lock_);
00443   if (global_config_.is_nil()
00444       || !global_config_->instances_.empty()
00445       || global_config_->name() != DEFAULT_CONFIG_NAME) {
00446     return global_config_;
00447   }
00448   TransportConfig_rch global_config = global_config_;
00449 #if !defined(ACE_AS_STATIC_LIBS)
00450   guard.release();
00451   load_transport_lib(FALLBACK_TYPE);
00452 #endif
00453   return global_config;
00454 }
00455 
00456 
00457 void
00458 TransportRegistry::register_type(const TransportType_rch& type)
00459 {
00460   DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);
00461   int result;
00462   const OPENDDS_STRING name = type->name();
00463 
00464   {
00465     GuardType guard(this->lock_);
00466     result = OpenDDS::DCPS::bind(type_map_, name, type);
00467   }
00468 
00469   // Check to see if it worked.
00470   //
00471   // 0 means it worked, 1 means it is a duplicate (and didn't work), and
00472   // -1 means something bad happened.
00473   if (result == 1) {
00474     ACE_ERROR((LM_ERROR,
00475                ACE_TEXT("(%P|%t) ERROR: transport type=%C already registered ")
00476                ACE_TEXT("with TransportRegistry.\n"), name.c_str()));
00477     throw Transport::Duplicate();
00478 
00479   } else if (result == -1) {
00480     ACE_ERROR((LM_ERROR,
00481                ACE_TEXT("(%P|%t) ERROR: Failed to bind transport type=%C to ")
00482                ACE_TEXT("type_map_.\n"),
00483                name.c_str()));
00484     throw Transport::MiscProblem();
00485   }
00486 }
00487 
00488 
00489 void
00490 TransportRegistry::release()
00491 {
00492   DBG_ENTRY_LVL("TransportRegistry", "release", 6);
00493   GuardType guard(lock_);
00494   released_ = true;
00495 
00496   for (InstMap::iterator iter = inst_map_.begin(); iter != inst_map_.end(); ++iter) {
00497     iter->second->shutdown();
00498   }
00499 
00500   type_map_.clear();
00501   inst_map_.clear();
00502   config_map_.clear();
00503   domain_default_config_map_.clear();
00504   global_config_.reset();
00505 }
00506 
00507 bool
00508 TransportRegistry::released() const
00509 {
00510   GuardType guard(lock_);
00511   return released_;
00512 }
00513 
00514 }
00515 }
00516 
00517 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1