TransportRegistry.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportRegistry.h"
00010 #include "TransportDebug.h"
00011 #include "TransportInst.h"
00012 #include "TransportExceptions.h"
00013 #include "TransportType.h"
00014 #include "dds/DCPS/Util.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/EntityImpl.h"
00017 #include "dds/DCPS/ConfigUtils.h"
00018 #include "dds/DCPS/SafetyProfileStreams.h"
00019 
00020 #include "ace/Singleton.h"
00021 #include "ace/OS_NS_strings.h"
00022 #include "ace/Service_Config.h"
00023 
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportRegistry.inl"
00026 #endif /* __ACE_INLINE__ */
00027 
00028 
00029 namespace {
00030   const ACE_TString OLD_TRANSPORT_PREFIX = ACE_TEXT("transport_");
00031 
00032   /// Used for sorting
00033   bool predicate(const OpenDDS::DCPS::TransportInst_rch& lhs,
00034                  const OpenDDS::DCPS::TransportInst_rch& rhs)
00035   {
00036     return lhs->name() < rhs->name();
00037   }
00038 
00039   // transport type to try loading if none are loaded when DCPS attempts to use
00040 #ifdef OPENDDS_SAFETY_PROFILE
00041   const char FALLBACK_TYPE[] = "rtps_udp";
00042 #else
00043   const char FALLBACK_TYPE[] = "tcp";
00044 #endif
00045 }
00046 
00047 namespace OpenDDS {
00048 namespace DCPS {
00049 
00050 TransportRegistry*
00051 TransportRegistry::instance()
00052 {
00053   return ACE_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::instance();
00054 }
00055 
00056 const char TransportRegistry::DEFAULT_CONFIG_NAME[] = "_OPENDDS_DEFAULT_CONFIG";
00057 const char TransportRegistry::DEFAULT_INST_PREFIX[] = "_OPENDDS_";
00058 
00059 TransportRegistry::TransportRegistry()
00060   : global_config_(new TransportConfig(DEFAULT_CONFIG_NAME))
00061 {
00062   DBG_ENTRY_LVL("TransportRegistry", "TransportRegistry", 6);
00063   config_map_[DEFAULT_CONFIG_NAME] = global_config_;
00064 
00065   lib_directive_map_["tcp"]       = "dynamic OpenDDS_Tcp Service_Object * OpenDDS_Tcp:_make_TcpLoader()";
00066   lib_directive_map_["udp"]       = "dynamic OpenDDS_Udp Service_Object * OpenDDS_Udp:_make_UdpLoader()";
00067   lib_directive_map_["multicast"] = "dynamic OpenDDS_Multicast Service_Object * OpenDDS_Multicast:_make_MulticastLoader()";
00068   lib_directive_map_["rtps_udp"]  = "dynamic OpenDDS_Rtps_Udp Service_Object * OpenDDS_Rtps_Udp:_make_RtpsUdpLoader()";
00069   lib_directive_map_["shmem"]     = "dynamic OpenDDS_Shmem Service_Object * OpenDDS_Shmem:_make_ShmemLoader()";
00070 
00071   // load_transport_lib() is used for discovery as well:
00072   lib_directive_map_["rtps_discovery"] = lib_directive_map_["rtps_udp"];
00073   lib_directive_map_["repository"] = "dynamic OpenDDS_InfoRepoDiscovery Service_Object * OpenDDS_InfoRepoDiscovery:_make_IRDiscoveryLoader()";
00074 }
00075 
00076 int
00077 TransportRegistry::load_transport_configuration(const OPENDDS_STRING& file_name,
00078                                                 ACE_Configuration_Heap& cf)
00079 {
00080   const ACE_Configuration_Section_Key& root = cf.root_section();
00081 
00082   // Create a vector to hold configuration information so we can populate
00083   // them after the transports instances are created.
00084   typedef std::pair<TransportConfig_rch, OPENDDS_VECTOR(OPENDDS_STRING) > ConfigInfo;
00085   OPENDDS_VECTOR(ConfigInfo) configInfoVec;
00086 
00087   // Record the transport instances created, so we can place them
00088   // in the implicit transport configuration for this file.
00089   OPENDDS_LIST(TransportInst_rch) instances;
00090 
00091   ACE_TString sect_name;
00092 
00093   for (int index = 0;
00094        cf.enumerate_sections(root, index, sect_name) == 0;
00095        ++index) {
00096     if (ACE_OS::strcmp(sect_name.c_str(), TRANSPORT_SECTION_NAME) == 0) {
00097       // found the [transport/*] section, now iterate through subsections...
00098       ACE_Configuration_Section_Key sect;
00099       if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00100         ACE_ERROR_RETURN((LM_ERROR,
00101                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00102                           ACE_TEXT("failed to open section %s\n"),
00103                           sect_name.c_str()),
00104                          -1);
00105       } else {
00106         // Ensure there are no properties in this section
00107         ValueMap vm;
00108         if (pullValues(cf, sect, vm) > 0) {
00109           // There are values inside [transport]
00110           ACE_ERROR_RETURN((LM_ERROR,
00111                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00112                             ACE_TEXT("transport sections must have a section name\n"),
00113                             sect_name.c_str()),
00114                            -1);
00115         }
00116         // Process the subsections of this section (the individual transport
00117         // impls).
00118         KeyList keys;
00119         if (processSections(cf, sect, keys) != 0) {
00120           ACE_ERROR_RETURN((LM_ERROR,
00121                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00122                             ACE_TEXT("too many nesting layers in [%s] section.\n"),
00123                             sect_name.c_str()),
00124                            -1);
00125         }
00126         for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00127           OPENDDS_STRING transport_id = it->first;
00128           ACE_Configuration_Section_Key inst_sect = it->second;
00129 
00130           ValueMap values;
00131           if (pullValues(cf, it->second, values) != 0) {
00132             // Get the factory_id for the transport.
00133             OPENDDS_STRING transport_type;
00134             ValueMap::const_iterator vm_it = values.find("transport_type");
00135             if (vm_it != values.end()) {
00136               transport_type = vm_it->second;
00137             } else {
00138               ACE_ERROR_RETURN((LM_ERROR,
00139                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00140                                 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00141                                 transport_id.c_str()),
00142                                -1);
00143             }
00144             // Create the TransportInst object and load the transport
00145             // configuration in ACE_Configuration_Heap to the TransportInst
00146             // object.
00147             TransportInst_rch inst = create_inst(transport_id, transport_type);
00148             if (inst == 0) {
00149               ACE_ERROR_RETURN((LM_ERROR,
00150                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00151                                 ACE_TEXT("Unable to create transport instance in [transport/%C] section.\n"),
00152                                 transport_id.c_str()),
00153                                -1);
00154             }
00155             instances.push_back(inst);
00156             inst->load(cf, inst_sect);
00157           } else {
00158             ACE_ERROR_RETURN((LM_ERROR,
00159                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00160                               ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00161                               transport_id.c_str()),
00162                              -1);
00163           }
00164         }
00165       }
00166     } else if (ACE_OS::strcmp(sect_name.c_str(), CONFIG_SECTION_NAME) == 0) {
00167       // found the [config/*] section, now iterate through subsections...
00168       ACE_Configuration_Section_Key sect;
00169       if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00170         ACE_ERROR_RETURN((LM_ERROR,
00171                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00172                           ACE_TEXT("failed to open section [%s]\n"),
00173                           sect_name.c_str()),
00174                          -1);
00175       } else {
00176         // Ensure there are no properties in this section
00177         ValueMap vm;
00178         if (pullValues(cf, sect, vm) > 0) {
00179           // There are values inside [config]
00180           ACE_ERROR_RETURN((LM_ERROR,
00181                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00182                             ACE_TEXT("config sections must have a section name\n"),
00183                             sect_name.c_str()),
00184                            -1);
00185         }
00186         // Process the subsections of this section (the individual config
00187         // impls).
00188         KeyList keys;
00189         if (processSections(cf, sect, keys) != 0) {
00190           // Don't allow multiple layers of nesting ([config/x/y]).
00191           ACE_ERROR_RETURN((LM_ERROR,
00192                             ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00193                             ACE_TEXT("too many nesting layers in [%s] section.\n"),
00194                             sect_name.c_str()),
00195                            -1);
00196         }
00197         for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00198           OPENDDS_STRING config_id = it->first;
00199 
00200           // Create a TransportConfig object.
00201           TransportConfig_rch config = create_config(config_id);
00202           if (config == 0) {
00203             ACE_ERROR_RETURN((LM_ERROR,
00204                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00205                               ACE_TEXT("Unable to create transport config in [config/%C] section.\n"),
00206                               config_id.c_str()),
00207                              -1);
00208           }
00209 
00210           ValueMap values;
00211           pullValues(cf, it->second, values);
00212 
00213           ConfigInfo configInfo;
00214           configInfo.first = config;
00215           for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00216             OPENDDS_STRING name = it->first;
00217             OPENDDS_STRING value = it->second;
00218             if (name == "transports") {
00219               char delim = ',';
00220               size_t pos = 0;
00221               OPENDDS_STRING token;
00222               while ((pos = value.find(delim)) != OPENDDS_STRING::npos) {
00223                 token = value.substr(0, pos);
00224                 configInfo.second.push_back(token);
00225                 value.erase(0, pos + 1);
00226               }
00227               configInfo.second.push_back(value);
00228 
00229               configInfoVec.push_back(configInfo);
00230             } else if (name == "swap_bytes") {
00231               if ((value == "1") || (value == "true")) {
00232                 config->swap_bytes_ = true;
00233               } else if ((value != "0") && (value != "false")) {
00234                 ACE_ERROR_RETURN((LM_ERROR,
00235                                   ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00236                                   ACE_TEXT("Illegal value for swap_bytes (%C) in [config/%C] section.\n"),
00237                                   value.c_str(), config_id.c_str()),
00238                                  -1);
00239               }
00240             } else if (name == "passive_connect_duration") {
00241               if (!convertToInteger(value,
00242                                     config->passive_connect_duration_)) {
00243                 ACE_ERROR_RETURN((LM_ERROR,
00244                                   ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00245                                   ACE_TEXT("Illegal integer value for passive_connect_duration (%s) in [config/%C] section.\n"),
00246                                   value.c_str(), config_id.c_str()),
00247                                  -1);
00248               }
00249             } else {
00250               ACE_ERROR_RETURN((LM_ERROR,
00251                                 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00252                                 ACE_TEXT("Unexpected entry (%C) in [config/%C] section.\n"),
00253                                 name.c_str(), config_id.c_str()),
00254                                -1);
00255             }
00256           }
00257           if (configInfo.second.empty()) {
00258             ACE_ERROR_RETURN((LM_ERROR,
00259                               ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00260                               ACE_TEXT("No transport instances listed in [config/%C] section.\n"),
00261                               config_id.c_str()),
00262                              -1);
00263           }
00264         }
00265       }
00266     } else if (ACE_OS::strncmp(sect_name.c_str(), OLD_TRANSPORT_PREFIX.c_str(),
00267                                OLD_TRANSPORT_PREFIX.length()) == 0) {
00268       ACE_ERROR_RETURN((LM_ERROR,
00269                         ACE_TEXT("(%P|%t) ERROR: ")
00270                         ACE_TEXT("Obsolete transport configuration found (%s).\n"),
00271                         sect_name.c_str()),
00272                        -1);
00273     }
00274   }
00275 
00276   // Populate the configurations with instances
00277   for (unsigned int i = 0; i < configInfoVec.size(); ++i) {
00278     TransportConfig_rch config = configInfoVec[i].first;
00279     OPENDDS_VECTOR(OPENDDS_STRING)& insts = configInfoVec[i].second;
00280     for (unsigned int j = 0; j < insts.size(); ++j) {
00281       TransportInst_rch inst = get_inst(insts[j]);
00282       if (inst == 0) {
00283         ACE_ERROR_RETURN((LM_ERROR,
00284                           ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00285                           ACE_TEXT("The inst (%C) in [config/%C] section is undefined.\n"),
00286                           insts[j].c_str(), config->name().c_str()),
00287                          -1);
00288       }
00289       config->instances_.push_back(inst);
00290     }
00291   }
00292 
00293   // Create and populate the default configuration for this
00294   // file with all the instances from this file.
00295   if (!instances.empty()) {
00296     TransportConfig_rch config = create_config(file_name);
00297     if (config == 0) {
00298       ACE_ERROR_RETURN((LM_ERROR,
00299                         ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00300                         ACE_TEXT("Unable to create default transport config.\n"),
00301                         file_name.c_str()),
00302                        -1);
00303     }
00304     instances.sort(predicate);
00305     for (OPENDDS_LIST(TransportInst_rch)::const_iterator it = instances.begin();
00306          it != instances.end(); ++it) {
00307       config->instances_.push_back(*it);
00308     }
00309   }
00310 
00311   return 0;
00312 }
00313 
00314 void
00315 TransportRegistry::load_transport_lib(const OPENDDS_STRING& transport_type)
00316 {
00317   ACE_UNUSED_ARG(transport_type);
00318 #if !defined(ACE_AS_STATIC_LIBS)
00319   GuardType guard(lock_);
00320   LibDirectiveMap::iterator lib_iter = lib_directive_map_.find(transport_type);
00321   if (lib_iter != lib_directive_map_.end()) {
00322     ACE_TString directive = ACE_TEXT_CHAR_TO_TCHAR(lib_iter->second.c_str());
00323     // Release the lock, because loading a transport library will
00324     // recursively call this function to add its default inst.
00325     guard.release();
00326     ACE_Service_Config::process_directive(directive.c_str());
00327   }
00328 #endif
00329 }
00330 
00331 TransportInst_rch
00332 TransportRegistry::create_inst(const OPENDDS_STRING& name,
00333                                const OPENDDS_STRING& transport_type)
00334 {
00335   GuardType guard(lock_);
00336   TransportType_rch type;
00337 
00338   if (find(type_map_, transport_type, type) != 0) {
00339 #if !defined(ACE_AS_STATIC_LIBS)
00340     guard.release();
00341     // Not present, try to load library
00342     load_transport_lib(transport_type);
00343     guard.acquire();
00344 
00345     // Try to find it again
00346     if (find(type_map_, transport_type, type) != 0) {
00347 #endif
00348       ACE_ERROR((LM_ERROR,
00349                  ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00350                  ACE_TEXT("transport_type=%C is not registered.\n"),
00351                  transport_type.c_str()));
00352       return TransportInst_rch();
00353 #if !defined(ACE_AS_STATIC_LIBS)
00354     }
00355 #endif
00356   }
00357 
00358   if (inst_map_.count(name)) {
00359     ACE_ERROR((LM_ERROR,
00360                ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00361                ACE_TEXT("name=%C is already in use.\n"),
00362                name.c_str()));
00363     return TransportInst_rch();
00364   }
00365   TransportInst_rch inst = type->new_inst(name);
00366   inst_map_[name] = inst;
00367   return inst;
00368 }
00369 
00370 
00371 TransportInst_rch
00372 TransportRegistry::get_inst(const OPENDDS_STRING& name) const
00373 {
00374   GuardType guard(lock_);
00375   InstMap::const_iterator found = inst_map_.find(name);
00376   if (found != inst_map_.end()) {
00377     return found->second;
00378   }
00379   return TransportInst_rch();
00380 }
00381 
00382 
00383 TransportConfig_rch
00384 TransportRegistry::create_config(const OPENDDS_STRING& name)
00385 {
00386   GuardType guard(lock_);
00387 
00388   if (config_map_.count(name)) {
00389     ACE_ERROR((LM_ERROR,
00390                ACE_TEXT("(%P|%t) TransportRegistry::create_config: ")
00391                ACE_TEXT("name=%C is already in use.\n"),
00392                name.c_str()));
00393     return TransportConfig_rch();
00394   }
00395 
00396   TransportConfig_rch inst = new TransportConfig(name);
00397   config_map_[name] = inst;
00398   return inst;
00399 }
00400 
00401 
00402 TransportConfig_rch
00403 TransportRegistry::get_config(const OPENDDS_STRING& name) const
00404 {
00405   GuardType guard(lock_);
00406   ConfigMap::const_iterator found = config_map_.find(name);
00407   if (found != config_map_.end()) {
00408     return found->second;
00409   }
00410   return TransportConfig_rch();
00411 }
00412 
00413 
00414 void
00415 TransportRegistry::bind_config(const TransportConfig_rch& cfg,
00416                                DDS::Entity_ptr entity)
00417 {
00418   if (cfg.is_nil()) {
00419     throw Transport::NotFound();
00420   }
00421   EntityImpl* ei = dynamic_cast<EntityImpl*>(entity);
00422   if (!ei) {
00423     throw Transport::MiscProblem();
00424   }
00425   ei->transport_config(cfg);
00426 }
00427 
00428 
00429 TransportConfig_rch
00430 TransportRegistry::fix_empty_default()
00431 {
00432   DBG_ENTRY_LVL("TransportRegistry", "fix_empty_default", 6);
00433   GuardType guard(lock_);
00434   if (global_config_.is_nil()
00435       || !global_config_->instances_.empty()
00436       || global_config_->name() != DEFAULT_CONFIG_NAME) {
00437     return global_config_;
00438   }
00439   TransportConfig_rch global_config = global_config_;
00440 #if !defined(ACE_AS_STATIC_LIBS)
00441   guard.release();
00442   load_transport_lib(FALLBACK_TYPE);
00443 #endif
00444   return global_config;
00445 }
00446 
00447 
00448 void
00449 TransportRegistry::register_type(const TransportType_rch& type)
00450 {
00451   DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);
00452   int result;
00453   const OPENDDS_STRING name = type->name();
00454 
00455   {
00456     GuardType guard(this->lock_);
00457     result = OpenDDS::DCPS::bind(type_map_, name, type);
00458   }
00459 
00460   // Check to see if it worked.
00461   //
00462   // 0 means it worked, 1 means it is a duplicate (and didn't work), and
00463   // -1 means something bad happened.
00464   if (result == 1) {
00465     ACE_ERROR((LM_ERROR,
00466                ACE_TEXT("(%P|%t) ERROR: transport type=%C already registered ")
00467                ACE_TEXT("with TransportRegistry.\n"), name.c_str()));
00468     throw Transport::Duplicate();
00469 
00470   } else if (result == -1) {
00471     ACE_ERROR((LM_ERROR,
00472                ACE_TEXT("(%P|%t) ERROR: Failed to bind transport type=%C to ")
00473                ACE_TEXT("type_map_.\n"),
00474                name.c_str()));
00475     throw Transport::MiscProblem();
00476   }
00477 }
00478 
00479 
00480 void
00481 TransportRegistry::release()
00482 {
00483   DBG_ENTRY_LVL("TransportRegistry", "release", 6);
00484   GuardType guard(lock_);
00485 
00486   for (InstMap::iterator iter = inst_map_.begin(); iter != inst_map_.end(); ++iter) {
00487     iter->second->shutdown();
00488   }
00489 
00490   type_map_.clear();
00491   inst_map_.clear();
00492   config_map_.clear();
00493   domain_default_config_map_.clear();
00494   global_config_ = 0;
00495 }
00496 
00497 
00498 }
00499 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7