00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportRegistry.h"
00010 #include "TransportDebug.h"
00011 #include "TransportInst.h"
00012 #include "TransportExceptions.h"
00013 #include "TransportType.h"
00014 #include "dds/DCPS/Util.h"
00015 #include "dds/DCPS/Service_Participant.h"
00016 #include "dds/DCPS/EntityImpl.h"
00017 #include "dds/DCPS/ConfigUtils.h"
00018 #include "dds/DCPS/SafetyProfileStreams.h"
00019
00020 #include "ace/Singleton.h"
00021 #include "ace/OS_NS_strings.h"
00022 #include "ace/Service_Config.h"
00023
00024 #if !defined (__ACE_INLINE__)
00025 #include "TransportRegistry.inl"
00026 #endif
00027
00028
00029 namespace {
00030 const ACE_TString OLD_TRANSPORT_PREFIX = ACE_TEXT("transport_");
00031
00032
00033 bool predicate(const OpenDDS::DCPS::TransportInst_rch& lhs,
00034 const OpenDDS::DCPS::TransportInst_rch& rhs)
00035 {
00036 return lhs->name() < rhs->name();
00037 }
00038
00039
00040 #ifdef OPENDDS_SAFETY_PROFILE
00041 const char FALLBACK_TYPE[] = "rtps_udp";
00042 #else
00043 const char FALLBACK_TYPE[] = "tcp";
00044 #endif
00045 }
00046
00047 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00048
00049 namespace OpenDDS {
00050 namespace DCPS {
00051
00052 TransportRegistry*
00053 TransportRegistry::instance()
00054 {
00055 return ACE_Unmanaged_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::instance();
00056 }
00057
00058 void
00059 TransportRegistry::close()
00060 {
00061 ACE_Unmanaged_Singleton<TransportRegistry, ACE_Recursive_Thread_Mutex>::close();
00062 }
00063
00064 const char TransportRegistry::DEFAULT_CONFIG_NAME[] = "_OPENDDS_DEFAULT_CONFIG";
00065 const char TransportRegistry::DEFAULT_INST_PREFIX[] = "_OPENDDS_";
00066
00067 TransportRegistry::TransportRegistry()
00068 : global_config_(make_rch<TransportConfig>(DEFAULT_CONFIG_NAME))
00069 , released_(false)
00070 {
00071 DBG_ENTRY_LVL("TransportRegistry", "TransportRegistry", 6);
00072 config_map_[DEFAULT_CONFIG_NAME] = global_config_;
00073
00074 lib_directive_map_["tcp"] = "dynamic OpenDDS_Tcp Service_Object * OpenDDS_Tcp:_make_TcpLoader()";
00075 lib_directive_map_["udp"] = "dynamic OpenDDS_Udp Service_Object * OpenDDS_Udp:_make_UdpLoader()";
00076 lib_directive_map_["multicast"] = "dynamic OpenDDS_Multicast Service_Object * OpenDDS_Multicast:_make_MulticastLoader()";
00077 lib_directive_map_["rtps_udp"] = "dynamic OpenDDS_Rtps_Udp Service_Object * OpenDDS_Rtps_Udp:_make_RtpsUdpLoader()";
00078 lib_directive_map_["shmem"] = "dynamic OpenDDS_Shmem Service_Object * OpenDDS_Shmem:_make_ShmemLoader()";
00079
00080
00081 lib_directive_map_["rtps_discovery"] = lib_directive_map_["rtps_udp"];
00082 lib_directive_map_["repository"] = "dynamic OpenDDS_InfoRepoDiscovery Service_Object * OpenDDS_InfoRepoDiscovery:_make_IRDiscoveryLoader()";
00083 }
00084
00085 int
00086 TransportRegistry::load_transport_configuration(const OPENDDS_STRING& file_name,
00087 ACE_Configuration_Heap& cf)
00088 {
00089 const ACE_Configuration_Section_Key& root = cf.root_section();
00090
00091
00092
00093 typedef std::pair<TransportConfig_rch, OPENDDS_VECTOR(OPENDDS_STRING) > ConfigInfo;
00094 OPENDDS_VECTOR(ConfigInfo) configInfoVec;
00095
00096
00097
00098 OPENDDS_LIST(TransportInst_rch) instances;
00099
00100 ACE_TString sect_name;
00101
00102 for (int index = 0;
00103 cf.enumerate_sections(root, index, sect_name) == 0;
00104 ++index) {
00105 if (ACE_OS::strcmp(sect_name.c_str(), TRANSPORT_SECTION_NAME) == 0) {
00106
00107 ACE_Configuration_Section_Key sect;
00108 if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00109 ACE_ERROR_RETURN((LM_ERROR,
00110 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00111 ACE_TEXT("failed to open section %s\n"),
00112 sect_name.c_str()),
00113 -1);
00114 } else {
00115
00116 ValueMap vm;
00117 if (pullValues(cf, sect, vm) > 0) {
00118
00119 ACE_ERROR_RETURN((LM_ERROR,
00120 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00121 ACE_TEXT("transport sections must have a section name\n"),
00122 sect_name.c_str()),
00123 -1);
00124 }
00125
00126
00127 KeyList keys;
00128 if (processSections(cf, sect, keys) != 0) {
00129 ACE_ERROR_RETURN((LM_ERROR,
00130 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00131 ACE_TEXT("too many nesting layers in [%s] section.\n"),
00132 sect_name.c_str()),
00133 -1);
00134 }
00135 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00136 OPENDDS_STRING transport_id = it->first;
00137 ACE_Configuration_Section_Key inst_sect = it->second;
00138
00139 ValueMap values;
00140 if (pullValues(cf, it->second, values) != 0) {
00141
00142 OPENDDS_STRING transport_type;
00143 ValueMap::const_iterator vm_it = values.find("transport_type");
00144 if (vm_it != values.end()) {
00145 transport_type = vm_it->second;
00146 } else {
00147 ACE_ERROR_RETURN((LM_ERROR,
00148 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00149 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00150 transport_id.c_str()),
00151 -1);
00152 }
00153
00154
00155
00156 TransportInst_rch inst = create_inst(transport_id, transport_type);
00157 if (!inst) {
00158 ACE_ERROR_RETURN((LM_ERROR,
00159 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00160 ACE_TEXT("Unable to create transport instance in [transport/%C] section.\n"),
00161 transport_id.c_str()),
00162 -1);
00163 }
00164 instances.push_back(inst);
00165 inst->load(cf, inst_sect);
00166 } else {
00167 ACE_ERROR_RETURN((LM_ERROR,
00168 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00169 ACE_TEXT("missing transport_type in [transport/%C] section.\n"),
00170 transport_id.c_str()),
00171 -1);
00172 }
00173 }
00174 }
00175 } else if (ACE_OS::strcmp(sect_name.c_str(), CONFIG_SECTION_NAME) == 0) {
00176
00177 ACE_Configuration_Section_Key sect;
00178 if (cf.open_section(root, sect_name.c_str(), 0, sect) != 0) {
00179 ACE_ERROR_RETURN((LM_ERROR,
00180 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00181 ACE_TEXT("failed to open section [%s]\n"),
00182 sect_name.c_str()),
00183 -1);
00184 } else {
00185
00186 ValueMap vm;
00187 if (pullValues(cf, sect, vm) > 0) {
00188
00189 ACE_ERROR_RETURN((LM_ERROR,
00190 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00191 ACE_TEXT("config sections must have a section name\n"),
00192 sect_name.c_str()),
00193 -1);
00194 }
00195
00196
00197 KeyList keys;
00198 if (processSections(cf, sect, keys) != 0) {
00199
00200 ACE_ERROR_RETURN((LM_ERROR,
00201 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00202 ACE_TEXT("too many nesting layers in [%s] section.\n"),
00203 sect_name.c_str()),
00204 -1);
00205 }
00206 for (KeyList::const_iterator it = keys.begin(); it != keys.end(); ++it) {
00207 OPENDDS_STRING config_id = it->first;
00208
00209
00210 TransportConfig_rch config = create_config(config_id);
00211 if (!config) {
00212 ACE_ERROR_RETURN((LM_ERROR,
00213 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00214 ACE_TEXT("Unable to create transport config in [config/%C] section.\n"),
00215 config_id.c_str()),
00216 -1);
00217 }
00218
00219 ValueMap values;
00220 pullValues(cf, it->second, values);
00221
00222 ConfigInfo configInfo;
00223 configInfo.first = config;
00224 for (ValueMap::const_iterator it = values.begin(); it != values.end(); ++it) {
00225 OPENDDS_STRING name = it->first;
00226 OPENDDS_STRING value = it->second;
00227 if (name == "transports") {
00228 char delim = ',';
00229 size_t pos = 0;
00230 OPENDDS_STRING token;
00231 while ((pos = value.find(delim)) != OPENDDS_STRING::npos) {
00232 token = value.substr(0, pos);
00233 configInfo.second.push_back(token);
00234 value.erase(0, pos + 1);
00235 }
00236 configInfo.second.push_back(value);
00237
00238 configInfoVec.push_back(configInfo);
00239 } else if (name == "swap_bytes") {
00240 if ((value == "1") || (value == "true")) {
00241 config->swap_bytes_ = true;
00242 } else if ((value != "0") && (value != "false")) {
00243 ACE_ERROR_RETURN((LM_ERROR,
00244 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00245 ACE_TEXT("Illegal value for swap_bytes (%C) in [config/%C] section.\n"),
00246 value.c_str(), config_id.c_str()),
00247 -1);
00248 }
00249 } else if (name == "passive_connect_duration") {
00250 if (!convertToInteger(value,
00251 config->passive_connect_duration_)) {
00252 ACE_ERROR_RETURN((LM_ERROR,
00253 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00254 ACE_TEXT("Illegal integer value for passive_connect_duration (%s) in [config/%C] section.\n"),
00255 value.c_str(), config_id.c_str()),
00256 -1);
00257 }
00258 } else {
00259 ACE_ERROR_RETURN((LM_ERROR,
00260 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00261 ACE_TEXT("Unexpected entry (%C) in [config/%C] section.\n"),
00262 name.c_str(), config_id.c_str()),
00263 -1);
00264 }
00265 }
00266 if (configInfo.second.empty()) {
00267 ACE_ERROR_RETURN((LM_ERROR,
00268 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00269 ACE_TEXT("No transport instances listed in [config/%C] section.\n"),
00270 config_id.c_str()),
00271 -1);
00272 }
00273 }
00274 }
00275 } else if (ACE_OS::strncmp(sect_name.c_str(), OLD_TRANSPORT_PREFIX.c_str(),
00276 OLD_TRANSPORT_PREFIX.length()) == 0) {
00277 ACE_ERROR_RETURN((LM_ERROR,
00278 ACE_TEXT("(%P|%t) ERROR: ")
00279 ACE_TEXT("Obsolete transport configuration found (%s).\n"),
00280 sect_name.c_str()),
00281 -1);
00282 }
00283 }
00284
00285
00286 for (unsigned int i = 0; i < configInfoVec.size(); ++i) {
00287 TransportConfig_rch config = configInfoVec[i].first;
00288 OPENDDS_VECTOR(OPENDDS_STRING)& insts = configInfoVec[i].second;
00289 for (unsigned int j = 0; j < insts.size(); ++j) {
00290 TransportInst_rch inst = get_inst(insts[j]);
00291 if (!inst) {
00292 ACE_ERROR_RETURN((LM_ERROR,
00293 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00294 ACE_TEXT("The inst (%C) in [config/%C] section is undefined.\n"),
00295 insts[j].c_str(), config->name().c_str()),
00296 -1);
00297 }
00298 config->instances_.push_back(inst);
00299 }
00300 }
00301
00302
00303
00304 if (!instances.empty()) {
00305 TransportConfig_rch config = create_config(file_name);
00306 if (!config) {
00307 ACE_ERROR_RETURN((LM_ERROR,
00308 ACE_TEXT("(%P|%t) TransportRegistry::load_transport_configuration: ")
00309 ACE_TEXT("Unable to create default transport config.\n"),
00310 file_name.c_str()),
00311 -1);
00312 }
00313 instances.sort(predicate);
00314 for (OPENDDS_LIST(TransportInst_rch)::const_iterator it = instances.begin();
00315 it != instances.end(); ++it) {
00316 config->instances_.push_back(*it);
00317 }
00318 }
00319
00320 return 0;
00321 }
00322
00323 void
00324 TransportRegistry::load_transport_lib(const OPENDDS_STRING& transport_type)
00325 {
00326 ACE_UNUSED_ARG(transport_type);
00327 #if !defined(ACE_AS_STATIC_LIBS)
00328 GuardType guard(lock_);
00329 LibDirectiveMap::iterator lib_iter = lib_directive_map_.find(transport_type);
00330 if (lib_iter != lib_directive_map_.end()) {
00331 ACE_TString directive = ACE_TEXT_CHAR_TO_TCHAR(lib_iter->second.c_str());
00332
00333
00334 guard.release();
00335 ACE_Service_Config::process_directive(directive.c_str());
00336 }
00337 #endif
00338 }
00339
00340 TransportInst_rch
00341 TransportRegistry::create_inst(const OPENDDS_STRING& name,
00342 const OPENDDS_STRING& transport_type)
00343 {
00344 GuardType guard(lock_);
00345 TransportType_rch type;
00346
00347 if (find(type_map_, transport_type, type) != 0) {
00348 #if !defined(ACE_AS_STATIC_LIBS)
00349 guard.release();
00350
00351 load_transport_lib(transport_type);
00352 guard.acquire();
00353
00354
00355 if (find(type_map_, transport_type, type) != 0) {
00356 #endif
00357 ACE_ERROR((LM_ERROR,
00358 ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00359 ACE_TEXT("transport_type=%C is not registered.\n"),
00360 transport_type.c_str()));
00361 return TransportInst_rch();
00362 #if !defined(ACE_AS_STATIC_LIBS)
00363 }
00364 #endif
00365 }
00366
00367 if (inst_map_.count(name)) {
00368 ACE_ERROR((LM_ERROR,
00369 ACE_TEXT("(%P|%t) TransportRegistry::create_inst: ")
00370 ACE_TEXT("name=%C is already in use.\n"),
00371 name.c_str()));
00372 return TransportInst_rch();
00373 }
00374 TransportInst_rch inst (type->new_inst(name));
00375 inst_map_[name] = inst;
00376 return inst;
00377 }
00378
00379
00380 TransportInst_rch
00381 TransportRegistry::get_inst(const OPENDDS_STRING& name) const
00382 {
00383 GuardType guard(lock_);
00384 InstMap::const_iterator found = inst_map_.find(name);
00385 if (found != inst_map_.end()) {
00386 return found->second;
00387 }
00388 return TransportInst_rch();
00389 }
00390
00391
00392 TransportConfig_rch
00393 TransportRegistry::create_config(const OPENDDS_STRING& name)
00394 {
00395 GuardType guard(lock_);
00396
00397 if (config_map_.count(name)) {
00398 ACE_ERROR((LM_ERROR,
00399 ACE_TEXT("(%P|%t) TransportRegistry::create_config: ")
00400 ACE_TEXT("name=%C is already in use.\n"),
00401 name.c_str()));
00402 return TransportConfig_rch();
00403 }
00404
00405 TransportConfig_rch inst (make_rch<TransportConfig>(name));
00406 config_map_[name] = inst;
00407 return inst;
00408 }
00409
00410
00411 TransportConfig_rch
00412 TransportRegistry::get_config(const OPENDDS_STRING& name) const
00413 {
00414 GuardType guard(lock_);
00415 ConfigMap::const_iterator found = config_map_.find(name);
00416 if (found != config_map_.end()) {
00417 return found->second;
00418 }
00419 return TransportConfig_rch();
00420 }
00421
00422
00423 void
00424 TransportRegistry::bind_config(const TransportConfig_rch& cfg,
00425 DDS::Entity_ptr entity)
00426 {
00427 if (cfg.is_nil()) {
00428 throw Transport::NotFound();
00429 }
00430 EntityImpl* ei = dynamic_cast<EntityImpl*>(entity);
00431 if (!ei) {
00432 throw Transport::MiscProblem();
00433 }
00434 ei->transport_config(cfg);
00435 }
00436
00437
00438 TransportConfig_rch
00439 TransportRegistry::fix_empty_default()
00440 {
00441 DBG_ENTRY_LVL("TransportRegistry", "fix_empty_default", 6);
00442 GuardType guard(lock_);
00443 if (global_config_.is_nil()
00444 || !global_config_->instances_.empty()
00445 || global_config_->name() != DEFAULT_CONFIG_NAME) {
00446 return global_config_;
00447 }
00448 TransportConfig_rch global_config = global_config_;
00449 #if !defined(ACE_AS_STATIC_LIBS)
00450 guard.release();
00451 load_transport_lib(FALLBACK_TYPE);
00452 #endif
00453 return global_config;
00454 }
00455
00456
00457 void
00458 TransportRegistry::register_type(const TransportType_rch& type)
00459 {
00460 DBG_ENTRY_LVL("TransportRegistry", "register_type", 6);
00461 int result;
00462 const OPENDDS_STRING name = type->name();
00463
00464 {
00465 GuardType guard(this->lock_);
00466 result = OpenDDS::DCPS::bind(type_map_, name, type);
00467 }
00468
00469
00470
00471
00472
00473 if (result == 1) {
00474 ACE_ERROR((LM_ERROR,
00475 ACE_TEXT("(%P|%t) ERROR: transport type=%C already registered ")
00476 ACE_TEXT("with TransportRegistry.\n"), name.c_str()));
00477 throw Transport::Duplicate();
00478
00479 } else if (result == -1) {
00480 ACE_ERROR((LM_ERROR,
00481 ACE_TEXT("(%P|%t) ERROR: Failed to bind transport type=%C to ")
00482 ACE_TEXT("type_map_.\n"),
00483 name.c_str()));
00484 throw Transport::MiscProblem();
00485 }
00486 }
00487
00488
00489 void
00490 TransportRegistry::release()
00491 {
00492 DBG_ENTRY_LVL("TransportRegistry", "release", 6);
00493 GuardType guard(lock_);
00494 released_ = true;
00495
00496 for (InstMap::iterator iter = inst_map_.begin(); iter != inst_map_.end(); ++iter) {
00497 iter->second->shutdown();
00498 }
00499
00500 type_map_.clear();
00501 inst_map_.clear();
00502 config_map_.clear();
00503 domain_default_config_map_.clear();
00504 global_config_.reset();
00505 }
00506
00507 bool
00508 TransportRegistry::released() const
00509 {
00510 GuardType guard(lock_);
00511 return released_;
00512 }
00513
00514 }
00515 }
00516
00517 OPENDDS_END_VERSIONED_NAMESPACE_DECL