00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportRegistry.h"
00010 #include "TransportDebug.h"
00011 #include "TransportInst.h"
00012 #include "TransportExceptions.h"
00013 #include "TransportType.h"
00014 #include "dds/DCPS/Util.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/EntityImpl.h"
00017 #include "dds/DCPS/ConfigUtils.h"
00018 #include "dds/DCPS/SafetyProfileStreams.h"
00019
00020 #include "ace/Singleton.h"
00021 #include "ace/OS_NS_strings.h"
00022 #include "ace/Service_Config.h"
00023
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportRegistry.inl"
00026 #endif
00027
00028
00029 namespace {
00030 const ACE_TString OLD_TRANSPORT_PREFIX = ACE_TEXT("transport_");
00031
00032
00033 bool predicate(const OpenDDS::DCPS::TransportInst_rch& lhs,
00034 const OpenDDS::DCPS::TransportInst_rch& rhs)
00035 {
00036 return lhs->name() < rhs->name();
00037 }
00038
00039
00040 #ifdef OPENDDS_SAFETY_PROFILE
00041 const char FALLBACK_TYPE[] = "rtps_udp";
00042 #else
00043 const char FALLBACK_TYPE[] = "tcp";
00044 #endif
00045 }
00046
00047 namespace OpenDDS {
00048 namespace DCPS {
00049
00050 TransportRegistry*
00051 TransportRegistry::instance()
00052 {
00053 return ACE_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::instance();
00054 }
00055
00056 const char TransportRegistry::DEFAULT_CONFIG_NAME[] = "_OPENDDS_DEFAULT_CONFIG";
00057 const char TransportRegistry::DEFAULT_INST_PREFIX[] = "_OPENDDS_";
00058
00059 TransportRegistry::TransportRegistry()
00060 : global_config_(new TransportConfig(DEFAULT_CONFIG_NAME))
00061 {
00062 DBG_ENTRY_LVL("TransportRegistry", "TransportRegistry", 6);
00063 config_map_[DEFAULT_CONFIG_NAME] = global_config_;
00064
00065 lib_directive_map_["tcp"] = "dynamic OpenDDS_Tcp Service_Object * OpenDDS_Tcp:_make_TcpLoader()";
00066 lib_directive_map_["udp"] = "dynamic OpenDDS_Udp Service_Object * OpenDDS_Udp:_make_UdpLoader()";
00067 lib_directive_map_["multicast"] = "dynamic OpenDDS_Multicast Service_Object * OpenDDS_Multicast:_make_MulticastLoader()";
00068 lib_directive_map_["rtps_udp"] = "dynamic OpenDDS_Rtps_Udp Service_Object * OpenDDS_Rtps_Udp:_make_RtpsUdpLoader()";
00069 lib_directive_map_["shmem"] = "dynamic OpenDDS_Shmem Service_Object * OpenDDS_Shmem:_make_ShmemLoader()";
00070
00071
00072 lib_directive_map_["rtps_discovery"] = lib_directive_map_["rtps_udp"];
00073 lib_directive_map_["repository"] = "dynamic OpenDDS_InfoRepoDiscovery Service_Object * OpenDDS_InfoRepoDiscovery:_make_IRDiscoveryLoader()";
00074 }
00075
00076 int
00077 TransportRegistry::load_transport_configuration(const OPENDDS_STRING& file_name,
00078 ACE_Configuration_Heap& cf)
00079 {
00080 const ACE_Configuration_Section_Key& root = cf.root_section();
00081
00082
00083
00084 typedef std::pair<TransportConfig_rch, OPENDDS_VECTOR(OPENDDS_STRING) > ConfigInfo;
00085 OPENDDS_VECTOR(ConfigInfo) configInfoVec;
00086
00087
00088
00089 OPENDDS_LIST(TransportInst_rch) instances;
00090
00091 ACE_TString sect_name;
00092
00093 for (int index = 0;
00094 cf.enumerate_sections(root, index, sect_name) == 0;
00095 ++index) {
00096 if (ACE_OS::strcmp(sect_name.c_str(), TRANSPORT_SECTION_NAME) == 0) {
00097
00098 ACE_Configuration_Section_Key sect;
00099 if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00100 ACE_ERROR_RETURN((LM_ERROR,
00101 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00102 ACE_TEXT("failed to open section %s\n"),
00103 sect_name.c_str()),
00104 -1);
00105 } else {
00106
00107 ValueMap vm;
00108 if (pullValues(cf, sect, vm) > 0) {
00109
00110 ACE_ERROR_RETURN((LM_ERROR,
00111 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00112 ACE_TEXT("transport sections must have a section name\n"),
00113 sect_name.c_str()),
00114 -1);
00115 }
00116
00117
00118 KeyList keys;
00119 if (processSections(cf, sect, keys) != 0) {
00120 ACE_ERROR_RETURN((LM_ERROR,
00121 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00122 ACE_TEXT("too many nesting layers in [%s] section.\n"),
00123 sect_name.c_str()),
00124 -1);
00125 }
00126 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00127 OPENDDS_STRING transport_id = it->first;
00128 ACE_Configuration_Section_Key inst_sect = it->second;
00129
00130 ValueMap values;
00131 if (pullValues(cf, it->second, values) != 0) {
00132
00133 OPENDDS_STRING transport_type;
00134 ValueMap::const_iterator vm_it = values.find("transport_type");
00135 if (vm_it != values.end()) {
00136 transport_type = vm_it->second;
00137 } else {
00138 ACE_ERROR_RETURN((LM_ERROR,
00139 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00140 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00141 transport_id.c_str()),
00142 -1);
00143 }
00144
00145
00146
00147 TransportInst_rch inst = create_inst(transport_id, transport_type);
00148 if (inst == 0) {
00149 ACE_ERROR_RETURN((LM_ERROR,
00150 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00151 ACE_TEXT("Unable to create transport instance in [transport/%C] section.\n"),
00152 transport_id.c_str()),
00153 -1);
00154 }
00155 instances.push_back(inst);
00156 inst->load(cf, inst_sect);
00157 } else {
00158 ACE_ERROR_RETURN((LM_ERROR,
00159 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00160 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00161 transport_id.c_str()),
00162 -1);
00163 }
00164 }
00165 }
00166 } else if (ACE_OS::strcmp(sect_name.c_str(), CONFIG_SECTION_NAME) == 0) {
00167
00168 ACE_Configuration_Section_Key sect;
00169 if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00170 ACE_ERROR_RETURN((LM_ERROR,
00171 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00172 ACE_TEXT("failed to open section [%s]\n"),
00173 sect_name.c_str()),
00174 -1);
00175 } else {
00176
00177 ValueMap vm;
00178 if (pullValues(cf, sect, vm) > 0) {
00179
00180 ACE_ERROR_RETURN((LM_ERROR,
00181 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00182 ACE_TEXT("config sections must have a section name\n"),
00183 sect_name.c_str()),
00184 -1);
00185 }
00186
00187
00188 KeyList keys;
00189 if (processSections(cf, sect, keys) != 0) {
00190
00191 ACE_ERROR_RETURN((LM_ERROR,
00192 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00193 ACE_TEXT("too many nesting layers in [%s] section.\n"),
00194 sect_name.c_str()),
00195 -1);
00196 }
00197 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00198 OPENDDS_STRING config_id = it->first;
00199
00200
00201 TransportConfig_rch config = create_config(config_id);
00202 if (config == 0) {
00203 ACE_ERROR_RETURN((LM_ERROR,
00204 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00205 ACE_TEXT("Unable to create transport config in [config/%C] section.\n"),
00206 config_id.c_str()),
00207 -1);
00208 }
00209
00210 ValueMap values;
00211 pullValues(cf, it->second, values);
00212
00213 ConfigInfo configInfo;
00214 configInfo.first = config;
00215 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00216 OPENDDS_STRING name = it->first;
00217 OPENDDS_STRING value = it->second;
00218 if (name == "transports") {
00219 char delim = ',';
00220 size_t pos = 0;
00221 OPENDDS_STRING token;
00222 while ((pos = value.find(delim)) != OPENDDS_STRING::npos) {
00223 token = value.substr(0, pos);
00224 configInfo.second.push_back(token);
00225 value.erase(0, pos + 1);
00226 }
00227 configInfo.second.push_back(value);
00228
00229 configInfoVec.push_back(configInfo);
00230 } else if (name == "swap_bytes") {
00231 if ((value == "1") || (value == "true")) {
00232 config->swap_bytes_ = true;
00233 } else if ((value != "0") && (value != "false")) {
00234 ACE_ERROR_RETURN((LM_ERROR,
00235 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00236 ACE_TEXT("Illegal value for swap_bytes (%C) in [config/%C] section.\n"),
00237 value.c_str(), config_id.c_str()),
00238 -1);
00239 }
00240 } else if (name == "passive_connect_duration") {
00241 if (!convertToInteger(value,
00242 config->passive_connect_duration_)) {
00243 ACE_ERROR_RETURN((LM_ERROR,
00244 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00245 ACE_TEXT("Illegal integer value for passive_connect_duration (%s) in [config/%C] section.\n"),
00246 value.c_str(), config_id.c_str()),
00247 -1);
00248 }
00249 } else {
00250 ACE_ERROR_RETURN((LM_ERROR,
00251 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00252 ACE_TEXT("Unexpected entry (%C) in [config/%C] section.\n"),
00253 name.c_str(), config_id.c_str()),
00254 -1);
00255 }
00256 }
00257 if (configInfo.second.empty()) {
00258 ACE_ERROR_RETURN((LM_ERROR,
00259 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00260 ACE_TEXT("No transport instances listed in [config/%C] section.\n"),
00261 config_id.c_str()),
00262 -1);
00263 }
00264 }
00265 }
00266 } else if (ACE_OS::strncmp(sect_name.c_str(), OLD_TRANSPORT_PREFIX.c_str(),
00267 OLD_TRANSPORT_PREFIX.length()) == 0) {
00268 ACE_ERROR_RETURN((LM_ERROR,
00269 ACE_TEXT("(%P|%t) ERROR: ")
00270 ACE_TEXT("Obsolete transport configuration found (%s).\n"),
00271 sect_name.c_str()),
00272 -1);
00273 }
00274 }
00275
00276
00277 for (unsigned int i = 0; i < configInfoVec.size(); ++i) {
00278 TransportConfig_rch config = configInfoVec[i].first;
00279 OPENDDS_VECTOR(OPENDDS_STRING)& insts = configInfoVec[i].second;
00280 for (unsigned int j = 0; j < insts.size(); ++j) {
00281 TransportInst_rch inst = get_inst(insts[j]);
00282 if (inst == 0) {
00283 ACE_ERROR_RETURN((LM_ERROR,
00284 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00285 ACE_TEXT("The inst (%C) in [config/%C] section is undefined.\n"),
00286 insts[j].c_str(), config->name().c_str()),
00287 -1);
00288 }
00289 config->instances_.push_back(inst);
00290 }
00291 }
00292
00293
00294
00295 if (!instances.empty()) {
00296 TransportConfig_rch config = create_config(file_name);
00297 if (config == 0) {
00298 ACE_ERROR_RETURN((LM_ERROR,
00299 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00300 ACE_TEXT("Unable to create default transport config.\n"),
00301 file_name.c_str()),
00302 -1);
00303 }
00304 instances.sort(predicate);
00305 for (OPENDDS_LIST(TransportInst_rch)::const_iterator it = instances.begin();
00306 it != instances.end(); ++it) {
00307 config->instances_.push_back(*it);
00308 }
00309 }
00310
00311 return 0;
00312 }
00313
00314 void
00315 TransportRegistry::load_transport_lib(const OPENDDS_STRING& transport_type)
00316 {
00317 ACE_UNUSED_ARG(transport_type);
00318 #if !defined(ACE_AS_STATIC_LIBS)
00319 GuardType guard(lock_);
00320 LibDirectiveMap::iterator lib_iter = lib_directive_map_.find(transport_type);
00321 if (lib_iter != lib_directive_map_.end()) {
00322 ACE_TString directive = ACE_TEXT_CHAR_TO_TCHAR(lib_iter->second.c_str());
00323
00324
00325 guard.release();
00326 ACE_Service_Config::process_directive(directive.c_str());
00327 }
00328 #endif
00329 }
00330
00331 TransportInst_rch
00332 TransportRegistry::create_inst(const OPENDDS_STRING& name,
00333 const OPENDDS_STRING& transport_type)
00334 {
00335 GuardType guard(lock_);
00336 TransportType_rch type;
00337
00338 if (find(type_map_, transport_type, type) != 0) {
00339 #if !defined(ACE_AS_STATIC_LIBS)
00340 guard.release();
00341
00342 load_transport_lib(transport_type);
00343 guard.acquire();
00344
00345
00346 if (find(type_map_, transport_type, type) != 0) {
00347 #endif
00348 ACE_ERROR((LM_ERROR,
00349 ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00350 ACE_TEXT("transport_type=%C is not registered.\n"),
00351 transport_type.c_str()));
00352 return TransportInst_rch();
00353 #if !defined(ACE_AS_STATIC_LIBS)
00354 }
00355 #endif
00356 }
00357
00358 if (inst_map_.count(name)) {
00359 ACE_ERROR((LM_ERROR,
00360 ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00361 ACE_TEXT("name=%C is already in use.\n"),
00362 name.c_str()));
00363 return TransportInst_rch();
00364 }
00365 TransportInst_rch inst = type->new_inst(name);
00366 inst_map_[name] = inst;
00367 return inst;
00368 }
00369
00370
00371 TransportInst_rch
00372 TransportRegistry::get_inst(const OPENDDS_STRING& name) const
00373 {
00374 GuardType guard(lock_);
00375 InstMap::const_iterator found = inst_map_.find(name);
00376 if (found != inst_map_.end()) {
00377 return found->second;
00378 }
00379 return TransportInst_rch();
00380 }
00381
00382
00383 TransportConfig_rch
00384 TransportRegistry::create_config(const OPENDDS_STRING& name)
00385 {
00386 GuardType guard(lock_);
00387
00388 if (config_map_.count(name)) {
00389 ACE_ERROR((LM_ERROR,
00390 ACE_TEXT("(%P|%t) TransportRegistry::create_config: ")
00391 ACE_TEXT("name=%C is already in use.\n"),
00392 name.c_str()));
00393 return TransportConfig_rch();
00394 }
00395
00396 TransportConfig_rch inst = new TransportConfig(name);
00397 config_map_[name] = inst;
00398 return inst;
00399 }
00400
00401
00402 TransportConfig_rch
00403 TransportRegistry::get_config(const OPENDDS_STRING& name) const
00404 {
00405 GuardType guard(lock_);
00406 ConfigMap::const_iterator found = config_map_.find(name);
00407 if (found != config_map_.end()) {
00408 return found->second;
00409 }
00410 return TransportConfig_rch();
00411 }
00412
00413
00414 void
00415 TransportRegistry::bind_config(const TransportConfig_rch& cfg,
00416 DDS::Entity_ptr entity)
00417 {
00418 if (cfg.is_nil()) {
00419 throw Transport::NotFound();
00420 }
00421 EntityImpl* ei = dynamic_cast<EntityImpl*>(entity);
00422 if (!ei) {
00423 throw Transport::MiscProblem();
00424 }
00425 ei->transport_config(cfg);
00426 }
00427
00428
00429 TransportConfig_rch
00430 TransportRegistry::fix_empty_default()
00431 {
00432 DBG_ENTRY_LVL("TransportRegistry", "fix_empty_default", 6);
00433 GuardType guard(lock_);
00434 if (global_config_.is_nil()
00435 || !global_config_->instances_.empty()
00436 || global_config_->name() != DEFAULT_CONFIG_NAME) {
00437 return global_config_;
00438 }
00439 TransportConfig_rch global_config = global_config_;
00440 #if !defined(ACE_AS_STATIC_LIBS)
00441 guard.release();
00442 load_transport_lib(FALLBACK_TYPE);
00443 #endif
00444 return global_config;
00445 }
00446
00447
00448 void
00449 TransportRegistry::register_type(const TransportType_rch& type)
00450 {
00451 DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);
00452 int result;
00453 const OPENDDS_STRING name = type->name();
00454
00455 {
00456 GuardType guard(this->lock_);
00457 result = OpenDDS::DCPS::bind(type_map_, name, type);
00458 }
00459
00460
00461
00462
00463
00464 if (result == 1) {
00465 ACE_ERROR((LM_ERROR,
00466 ACE_TEXT("(%P|%t) ERROR: transport type=%C already registered ")
00467 ACE_TEXT("with TransportRegistry.\n"), name.c_str()));
00468 throw Transport::Duplicate();
00469
00470 } else if (result == -1) {
00471 ACE_ERROR((LM_ERROR,
00472 ACE_TEXT("(%P|%t) ERROR: Failed to bind transport type=%C to ")
00473 ACE_TEXT("type_map_.\n"),
00474 name.c_str()));
00475 throw Transport::MiscProblem();
00476 }
00477 }
00478
00479
00480 void
00481 TransportRegistry::release()
00482 {
00483 DBG_ENTRY_LVL("TransportRegistry", "release", 6);
00484 GuardType guard(lock_);
00485
00486 for (InstMap::iterator iter = inst_map_.begin(); iter != inst_map_.end(); ++iter) {
00487 iter->second->shutdown();
00488 }
00489
00490 type_map_.clear();
00491 inst_map_.clear();
00492 config_map_.clear();
00493 domain_default_config_map_.clear();
00494 global_config_ = 0;
00495 }
00496
00497
00498 }
00499 }