00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsDiscovery.h"
00009
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dds/DCPS/ConfigUtils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Marked_Default_Qos.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018
00019 #include "ace/Reactor.h"
00020 #include "ace/Select_Reactor.h"
00021
00022 #include <cstdlib>
00023
00024 namespace {
00025 u_short get_default_d0(u_short fallback)
00026 {
00027 #if !defined ACE_LACKS_GETENV && !defined ACE_LACKS_ENV
00028 const char* from_env = std::getenv("OPENDDS_RTPS_DEFAULT_D0");
00029 if (from_env) {
00030 return static_cast<u_short>(std::atoi(from_env));
00031 }
00032 #endif
00033 return fallback;
00034 }
00035 }
00036
00037 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00038
00039 namespace OpenDDS {
00040 namespace RTPS {
00041
00042 RtpsDiscovery::RtpsDiscovery(const RepoKey& key)
00043 : DCPS::PeerDiscovery<Spdp>(key)
00044 , resend_period_(30 )
00045 , pb_(7400)
00046 , dg_(250)
00047 , pg_(2)
00048 , d0_(get_default_d0(0))
00049 , d1_(10)
00050 , dx_(2)
00051 , ttl_(1)
00052 , sedp_multicast_(true)
00053 , default_multicast_group_("239.255.0.1")
00054 {
00055 }
00056
00057 RtpsDiscovery::~RtpsDiscovery()
00058 {
00059 }
00060
00061 namespace {
00062 const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00063 }
00064
00065 int
00066 RtpsDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00067 {
00068 const ACE_Configuration_Section_Key &root = cf.root_section();
00069 ACE_Configuration_Section_Key rtps_sect;
00070
00071 if (cf.open_section(root, RTPS_SECTION_NAME, 0, rtps_sect) == 0) {
00072
00073
00074 DCPS::ValueMap vm;
00075 if (DCPS::pullValues(cf, rtps_sect, vm) > 0) {
00076
00077 ACE_ERROR_RETURN((LM_ERROR,
00078 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00079 ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
00080 -1);
00081 }
00082
00083 DCPS::KeyList keys;
00084 if (DCPS::processSections(cf, rtps_sect, keys) != 0) {
00085 ACE_ERROR_RETURN((LM_ERROR,
00086 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00087 ACE_TEXT("too many nesting layers in the [rtps] section.\n")),
00088 -1);
00089 }
00090
00091
00092 for (DCPS::KeyList::const_iterator it = keys.begin();
00093 it != keys.end(); ++it) {
00094 const OPENDDS_STRING& rtps_name = it->first;
00095
00096 int resend = 0;
00097 u_short pb = 0, dg = 0, pg = 0, d0 = 0, d1 = 0, dx = 0;
00098 unsigned char ttl = 0;
00099 AddrVec spdp_send_addrs;
00100 OPENDDS_STRING default_multicast_group = "239.255.0.1" ;
00101 OPENDDS_STRING mi, sla, gi;
00102 OPENDDS_STRING spdpaddr;
00103 bool has_resend = false, has_pb = false, has_dg = false, has_pg = false,
00104 has_d0 = false, has_d1 = false, has_dx = false, has_sm = false,
00105 has_ttl = false, sm = false;
00106
00107
00108 if (!TheServiceParticipant->default_address().empty()) {
00109 spdpaddr = TheServiceParticipant->default_address().c_str();
00110 }
00111
00112 DCPS::ValueMap values;
00113 DCPS::pullValues(cf, it->second, values);
00114 for (DCPS::ValueMap::const_iterator it = values.begin();
00115 it != values.end(); ++it) {
00116 const OPENDDS_STRING& name = it->first;
00117 if (name == "ResendPeriod") {
00118 const OPENDDS_STRING& value = it->second;
00119 has_resend = DCPS::convertToInteger(value, resend);
00120 if (!has_resend) {
00121 ACE_ERROR_RETURN((LM_ERROR,
00122 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00123 ACE_TEXT("Invalid entry (%C) for ResendPeriod in ")
00124 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00125 value.c_str(), rtps_name.c_str()), -1);
00126 }
00127 } else if (name == "PB") {
00128 const OPENDDS_STRING& value = it->second;
00129 has_pb = DCPS::convertToInteger(value, pb);
00130 if (!has_pb) {
00131 ACE_ERROR_RETURN((LM_ERROR,
00132 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00133 ACE_TEXT("Invalid entry (%C) for PB in ")
00134 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00135 value.c_str(), rtps_name.c_str()), -1);
00136 }
00137 } else if (name == "DG") {
00138 const OPENDDS_STRING& value = it->second;
00139 has_dg = DCPS::convertToInteger(value, dg);
00140 if (!has_dg) {
00141 ACE_ERROR_RETURN((LM_ERROR,
00142 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00143 ACE_TEXT("Invalid entry (%C) for DG in ")
00144 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00145 value.c_str(), rtps_name.c_str()), -1);
00146 }
00147 } else if (name == "PG") {
00148 const OPENDDS_STRING& value = it->second;
00149 has_pg = DCPS::convertToInteger(value, pg);
00150 if (!has_pg) {
00151 ACE_ERROR_RETURN((LM_ERROR,
00152 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00153 ACE_TEXT("Invalid entry (%C) for PG in ")
00154 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00155 value.c_str(), rtps_name.c_str()), -1);
00156 }
00157 } else if (name == "D0") {
00158 const OPENDDS_STRING& value = it->second;
00159 has_d0 = DCPS::convertToInteger(value, d0);
00160 if (!has_d0) {
00161 ACE_ERROR_RETURN((LM_ERROR,
00162 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00163 ACE_TEXT("Invalid entry (%C) for D0 in ")
00164 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00165 value.c_str(), rtps_name.c_str()), -1);
00166 }
00167 } else if (name == "D1") {
00168 const OPENDDS_STRING& value = it->second;
00169 has_d1 = DCPS::convertToInteger(value, d1);
00170 if (!has_d1) {
00171 ACE_ERROR_RETURN((LM_ERROR,
00172 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00173 ACE_TEXT("Invalid entry (%C) for D1 in ")
00174 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00175 value.c_str(), rtps_name.c_str()), -1);
00176 }
00177 } else if (name == "DX") {
00178 const OPENDDS_STRING& value = it->second;
00179 has_dx = DCPS::convertToInteger(value, dx);
00180 if (!has_dx) {
00181 ACE_ERROR_RETURN((LM_ERROR,
00182 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00183 ACE_TEXT("Invalid entry (%C) for DX in ")
00184 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00185 value.c_str(), rtps_name.c_str()), -1);
00186 }
00187 } else if (name == "TTL") {
00188 const OPENDDS_STRING& value = it->second;
00189 unsigned short ttl_us;
00190 has_ttl = DCPS::convertToInteger(value, ttl_us);
00191 ttl = static_cast<unsigned char>(ttl_us);
00192 if (!has_ttl || ttl_us > UCHAR_MAX) {
00193 ACE_ERROR_RETURN((LM_ERROR,
00194 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00195 ACE_TEXT("Invalid entry (%C) for TTL in ")
00196 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00197 value.c_str(), rtps_name.c_str()), -1);
00198 }
00199 } else if (name == "SedpMulticast") {
00200 const OPENDDS_STRING& value = it->second;
00201 int smInt;
00202 has_sm = DCPS::convertToInteger(value, smInt);
00203 if (!has_sm) {
00204 ACE_ERROR_RETURN((LM_ERROR,
00205 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
00206 ACE_TEXT("Invalid entry (%C) for SedpMulticast in ")
00207 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00208 value.c_str(), rtps_name.c_str()), -1);
00209 }
00210 sm = bool(smInt);
00211 } else if (name == "MulticastInterface") {
00212 mi = it->second;
00213 } else if (name == "SedpLocalAddress") {
00214 sla = it->second;
00215 } else if (name == "SpdpLocalAddress") {
00216 spdpaddr = it->second;
00217 } else if (name == "GuidInterface") {
00218 gi = it->second;
00219 } else if (name == "InteropMulticastOverride") {
00220
00221 default_multicast_group = it->second;
00222 } else if (name == "SpdpSendAddrs") {
00223 const OPENDDS_STRING& value = it->second;
00224 size_t i = 0;
00225 do {
00226 i = value.find_first_not_of(' ', i);
00227 const size_t n = value.find_first_of(", ", i);
00228 spdp_send_addrs.push_back(value.substr(i, (n == OPENDDS_STRING::npos) ? n : n - i));
00229 i = value.find(',', i);
00230 } while (i++ != OPENDDS_STRING::npos);
00231 } else {
00232 ACE_ERROR_RETURN((LM_ERROR,
00233 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00234 ACE_TEXT("Unexpected entry (%C) in [rtps_discovery/%C] section.\n"),
00235 name.c_str(), rtps_name.c_str()),
00236 -1);
00237 }
00238 }
00239
00240 RtpsDiscovery_rch discovery (OpenDDS::DCPS::make_rch<RtpsDiscovery>(rtps_name));
00241 if (has_resend) discovery->resend_period(ACE_Time_Value(resend));
00242 if (has_pb) discovery->pb(pb);
00243 if (has_dg) discovery->dg(dg);
00244 if (has_pg) discovery->pg(pg);
00245 if (has_d0) discovery->d0(d0);
00246 if (has_d1) discovery->d1(d1);
00247 if (has_dx) discovery->dx(dx);
00248 if (has_ttl) discovery->ttl(ttl);
00249 if (has_sm) discovery->sedp_multicast(sm);
00250 discovery->multicast_interface(mi);
00251 discovery->default_multicast_group(default_multicast_group);
00252 discovery->spdp_send_addrs().swap(spdp_send_addrs);
00253 discovery->sedp_local_address(sla);
00254 discovery->guid_interface(gi);
00255 discovery->spdp_local_address(spdpaddr);
00256 TheServiceParticipant->add_discovery(discovery);
00257 }
00258 }
00259
00260
00261
00262 const DCPS::Service_Participant::RepoKeyDiscoveryMap& discoveryMap = TheServiceParticipant->discoveryMap();
00263 if (discoveryMap.find(Discovery::DEFAULT_RTPS) == discoveryMap.end()) {
00264 TheServiceParticipant->add_discovery(OpenDDS::DCPS::make_rch<RtpsDiscovery>(Discovery::DEFAULT_RTPS));
00265 }
00266
00267 return 0;
00268 }
00269
00270
00271
00272 OpenDDS::DCPS::RepoId
00273 RtpsDiscovery::generate_participant_guid() {
00274 OpenDDS::DCPS::RepoId id = GUID_UNKNOWN;
00275 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, id);
00276 if (!guid_interface_.empty()) {
00277 if (guid_gen_.interfaceName(guid_interface_.c_str()) != 0) {
00278 if (DCPS::DCPS_debug_level) {
00279 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant()"
00280 " - attempt to use specific network interface's MAC addr for"
00281 " GUID generation failed.\n"));
00282 }
00283 }
00284 }
00285 guid_gen_.populate(id);
00286 id.entityId = ENTITYID_PARTICIPANT;
00287 return id;
00288 }
00289
00290 DCPS::AddDomainStatus
00291 RtpsDiscovery::add_domain_participant(DDS::DomainId_t domain,
00292 const DDS::DomainParticipantQos& qos)
00293 {
00294 DCPS::AddDomainStatus ads = {OpenDDS::DCPS::RepoId(), false };
00295 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00296 if (!guid_interface_.empty()) {
00297 if (guid_gen_.interfaceName(guid_interface_.c_str()) != 0) {
00298 if (DCPS::DCPS_debug_level) {
00299 ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant()"
00300 " - attempt to use specific network interface's MAC addr for"
00301 " GUID generation failed.\n"));
00302 }
00303 }
00304 }
00305 guid_gen_.populate(ads.id);
00306 ads.id.entityId = ENTITYID_PARTICIPANT;
00307 try {
00308 const DCPS::RcHandle<Spdp> spdp (DCPS::make_rch<Spdp>(domain, ref(ads.id), qos, this));
00309
00310 participants_[domain][ads.id] = spdp;
00311 } catch (const std::exception& e) {
00312 ads.id = GUID_UNKNOWN;
00313 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsDiscovery::add_domain_participant() - "
00314 "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00315 e.what()));
00316 }
00317 return ads;
00318 }
00319
00320 #if defined(OPENDDS_SECURITY)
00321 DCPS::AddDomainStatus
00322 RtpsDiscovery::add_domain_participant_secure(DDS::DomainId_t domain,
00323 const DDS::DomainParticipantQos& qos,
00324 const OpenDDS::DCPS::RepoId& guid,
00325 DDS::Security::IdentityHandle id,
00326 DDS::Security::PermissionsHandle perm,
00327 DDS::Security::ParticipantCryptoHandle part_crypto)
00328 {
00329 DCPS::AddDomainStatus ads = {guid, false };
00330 ads.id.entityId = ENTITYID_PARTICIPANT;
00331 try {
00332 const DCPS::RcHandle<Spdp> spdp (DCPS::make_rch<Spdp>(domain, ads.id, qos, this, id, perm, part_crypto));
00333 participants_[domain][ads.id] = spdp;
00334 } catch (const std::exception& e) {
00335 ads.id = GUID_UNKNOWN;
00336 ACE_ERROR((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant_secure() - "
00337 "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00338 e.what()));
00339 }
00340 return ads;
00341 }
00342 #endif
00343
00344 void
00345 RtpsDiscovery::signal_liveliness(const DDS::DomainId_t domain_id,
00346 const OpenDDS::DCPS::RepoId& part_id,
00347 DDS::LivelinessQosPolicyKind kind)
00348 {
00349 get_part(domain_id, part_id)->signal_liveliness(kind);
00350 }
00351
00352 RtpsDiscovery::StaticInitializer::StaticInitializer()
00353 {
00354 TheServiceParticipant->register_discovery_type("rtps_discovery", new Config);
00355 }
00356
00357 }
00358 }
00359
00360 OPENDDS_END_VERSIONED_NAMESPACE_DECL