RtpsDiscovery.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsDiscovery.h"
00009 
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dds/DCPS/ConfigUtils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Marked_Default_Qos.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018 
00019 #include "ace/Reactor.h"
00020 #include "ace/Select_Reactor.h"
00021 
00022 #include <cstdlib>
00023 
00024 namespace {
00025   u_short get_default_d0(u_short fallback)
00026   {
00027 #if !defined ACE_LACKS_GETENV && !defined ACE_LACKS_ENV
00028     const char* from_env = std::getenv("OPENDDS_RTPS_DEFAULT_D0");
00029     if (from_env) {
00030       return static_cast<u_short>(std::atoi(from_env));
00031     }
00032 #endif
00033     return fallback;
00034   }
00035 }
00036 
00037 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00038 
00039 namespace OpenDDS {
00040 namespace RTPS {
00041 
00042 RtpsDiscovery::RtpsDiscovery(const RepoKey& key)
00043   : DCPS::PeerDiscovery<Spdp>(key)
00044   , resend_period_(30 /*seconds*/) // see RTPS v2.1 9.6.1.4.2
00045   , pb_(7400) // see RTPS v2.1 9.6.1.3 for PB, DG, PG, D0, D1 defaults
00046   , dg_(250)
00047   , pg_(2)
00048   , d0_(get_default_d0(0))
00049   , d1_(10)
00050   , dx_(2)
00051   , ttl_(1)
00052   , sedp_multicast_(true)
00053   , default_multicast_group_("239.255.0.1")
00054 {
00055 }
00056 
00057 RtpsDiscovery::~RtpsDiscovery()
00058 {
00059 }
00060 
00061 namespace {
00062   const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00063 }
00064 
00065 int
00066 RtpsDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00067 {
00068   const ACE_Configuration_Section_Key &root = cf.root_section();
00069   ACE_Configuration_Section_Key rtps_sect;
00070 
00071   if (cf.open_section(root, RTPS_SECTION_NAME, 0, rtps_sect) == 0) {
00072 
00073     // Ensure there are no properties in this section
00074     DCPS::ValueMap vm;
00075     if (DCPS::pullValues(cf, rtps_sect, vm) > 0) {
00076       // There are values inside [rtps_discovery]
00077       ACE_ERROR_RETURN((LM_ERROR,
00078                         ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00079                         ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
00080                        -1);
00081     }
00082     // Process the subsections of this section (the individual rtps_discovery/*)
00083     DCPS::KeyList keys;
00084     if (DCPS::processSections(cf, rtps_sect, keys) != 0) {
00085       ACE_ERROR_RETURN((LM_ERROR,
00086                         ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00087                         ACE_TEXT("too many nesting layers in the [rtps] section.\n")),
00088                        -1);
00089     }
00090 
00091     // Loop through the [rtps_discovery/*] sections
00092     for (DCPS::KeyList::const_iterator it = keys.begin();
00093          it != keys.end(); ++it) {
00094       const OPENDDS_STRING& rtps_name = it->first;
00095 
00096       int resend = 0;
00097       u_short pb = 0, dg = 0, pg = 0, d0 = 0, d1 = 0, dx = 0;
00098       unsigned char ttl = 0;
00099       AddrVec spdp_send_addrs;
00100       OPENDDS_STRING default_multicast_group = "239.255.0.1" /*RTPS v2.1 9.6.1.4.1*/;
00101       OPENDDS_STRING mi, sla, gi;
00102       OPENDDS_STRING spdpaddr;
00103       bool has_resend = false, has_pb = false, has_dg = false, has_pg = false,
00104         has_d0 = false, has_d1 = false, has_dx = false, has_sm = false,
00105         has_ttl = false, sm = false;
00106 
00107       // spdpaddr defaults to DCPSDefaultAddress if set
00108       if (!TheServiceParticipant->default_address().empty()) {
00109         spdpaddr = TheServiceParticipant->default_address().c_str();
00110       }
00111 
00112       DCPS::ValueMap values;
00113       DCPS::pullValues(cf, it->second, values);
00114       for (DCPS::ValueMap::const_iterator it = values.begin();
00115            it != values.end(); ++it) {
00116         const OPENDDS_STRING& name = it->first;
00117         if (name == "ResendPeriod") {
00118           const OPENDDS_STRING& value = it->second;
00119           has_resend = DCPS::convertToInteger(value, resend);
00120           if (!has_resend) {
00121             ACE_ERROR_RETURN((LM_ERROR,
00122               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00123               ACE_TEXT("Invalid entry (%C) for ResendPeriod in ")
00124               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00125               value.c_str(), rtps_name.c_str()), -1);
00126           }
00127         } else if (name == "PB") {
00128           const OPENDDS_STRING& value = it->second;
00129           has_pb = DCPS::convertToInteger(value, pb);
00130           if (!has_pb) {
00131             ACE_ERROR_RETURN((LM_ERROR,
00132               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00133               ACE_TEXT("Invalid entry (%C) for PB in ")
00134               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00135               value.c_str(), rtps_name.c_str()), -1);
00136           }
00137         } else if (name == "DG") {
00138           const OPENDDS_STRING& value = it->second;
00139           has_dg = DCPS::convertToInteger(value, dg);
00140           if (!has_dg) {
00141             ACE_ERROR_RETURN((LM_ERROR,
00142               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00143               ACE_TEXT("Invalid entry (%C) for DG in ")
00144               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00145               value.c_str(), rtps_name.c_str()), -1);
00146           }
00147         } else if (name == "PG") {
00148           const OPENDDS_STRING& value = it->second;
00149           has_pg = DCPS::convertToInteger(value, pg);
00150           if (!has_pg) {
00151             ACE_ERROR_RETURN((LM_ERROR,
00152               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00153               ACE_TEXT("Invalid entry (%C) for PG in ")
00154               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00155               value.c_str(), rtps_name.c_str()), -1);
00156           }
00157         } else if (name == "D0") {
00158           const OPENDDS_STRING& value = it->second;
00159           has_d0 = DCPS::convertToInteger(value, d0);
00160           if (!has_d0) {
00161             ACE_ERROR_RETURN((LM_ERROR,
00162               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00163               ACE_TEXT("Invalid entry (%C) for D0 in ")
00164               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00165               value.c_str(), rtps_name.c_str()), -1);
00166           }
00167         } else if (name == "D1") {
00168           const OPENDDS_STRING& value = it->second;
00169           has_d1 = DCPS::convertToInteger(value, d1);
00170           if (!has_d1) {
00171             ACE_ERROR_RETURN((LM_ERROR,
00172               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00173               ACE_TEXT("Invalid entry (%C) for D1 in ")
00174               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00175               value.c_str(), rtps_name.c_str()), -1);
00176           }
00177         } else if (name == "DX") {
00178           const OPENDDS_STRING& value = it->second;
00179           has_dx = DCPS::convertToInteger(value, dx);
00180           if (!has_dx) {
00181             ACE_ERROR_RETURN((LM_ERROR,
00182                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00183                ACE_TEXT("Invalid entry (%C) for DX in ")
00184                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00185                value.c_str(), rtps_name.c_str()), -1);
00186           }
00187         } else if (name == "TTL") {
00188           const OPENDDS_STRING& value = it->second;
00189           unsigned short ttl_us;
00190           has_ttl = DCPS::convertToInteger(value, ttl_us);
00191           ttl = static_cast<unsigned char>(ttl_us);
00192           if (!has_ttl || ttl_us > UCHAR_MAX) {
00193             ACE_ERROR_RETURN((LM_ERROR,
00194                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00195                ACE_TEXT("Invalid entry (%C) for TTL in ")
00196                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00197                value.c_str(), rtps_name.c_str()), -1);
00198           }
00199         } else if (name == "SedpMulticast") {
00200           const OPENDDS_STRING& value = it->second;
00201           int smInt;
00202           has_sm = DCPS::convertToInteger(value, smInt);
00203           if (!has_sm) {
00204             ACE_ERROR_RETURN((LM_ERROR,
00205                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
00206                ACE_TEXT("Invalid entry (%C) for SedpMulticast in ")
00207                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00208                value.c_str(), rtps_name.c_str()), -1);
00209           }
00210           sm = bool(smInt);
00211         } else if (name == "MulticastInterface") {
00212           mi = it->second;
00213         } else if (name == "SedpLocalAddress") {
00214           sla = it->second;
00215         } else if (name == "SpdpLocalAddress") {
00216           spdpaddr = it->second;
00217         } else if (name == "GuidInterface") {
00218           gi = it->second;
00219         } else if (name == "InteropMulticastOverride") {
00220           /// FUTURE: handle > 1 group.
00221           default_multicast_group = it->second;
00222         } else if (name == "SpdpSendAddrs") {
00223           const OPENDDS_STRING& value = it->second;
00224           size_t i = 0;
00225           do {
00226             i = value.find_first_not_of(' ', i); // skip spaces
00227             const size_t n = value.find_first_of(", ", i);
00228             spdp_send_addrs.push_back(value.substr(i, (n == OPENDDS_STRING::npos) ? n : n - i));
00229             i = value.find(',', i);
00230           } while (i++ != OPENDDS_STRING::npos); // skip past comma if there is one
00231         } else {
00232           ACE_ERROR_RETURN((LM_ERROR,
00233                             ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00234                             ACE_TEXT("Unexpected entry (%C) in [rtps_discovery/%C] section.\n"),
00235                             name.c_str(), rtps_name.c_str()),
00236                            -1);
00237         }
00238       }
00239 
00240       RtpsDiscovery_rch discovery (OpenDDS::DCPS::make_rch<RtpsDiscovery>(rtps_name));
00241       if (has_resend) discovery->resend_period(ACE_Time_Value(resend));
00242       if (has_pb) discovery->pb(pb);
00243       if (has_dg) discovery->dg(dg);
00244       if (has_pg) discovery->pg(pg);
00245       if (has_d0) discovery->d0(d0);
00246       if (has_d1) discovery->d1(d1);
00247       if (has_dx) discovery->dx(dx);
00248       if (has_ttl) discovery->ttl(ttl);
00249       if (has_sm) discovery->sedp_multicast(sm);
00250       discovery->multicast_interface(mi);
00251       discovery->default_multicast_group(default_multicast_group);
00252       discovery->spdp_send_addrs().swap(spdp_send_addrs);
00253       discovery->sedp_local_address(sla);
00254       discovery->guid_interface(gi);
00255       discovery->spdp_local_address(spdpaddr);
00256       TheServiceParticipant->add_discovery(discovery);
00257     }
00258   }
00259 
00260   // If the default RTPS discovery object has not been configured,
00261   // instantiate it now.
00262   const DCPS::Service_Participant::RepoKeyDiscoveryMap& discoveryMap = TheServiceParticipant->discoveryMap();
00263   if (discoveryMap.find(Discovery::DEFAULT_RTPS) == discoveryMap.end()) {
00264     TheServiceParticipant->add_discovery(OpenDDS::DCPS::make_rch<RtpsDiscovery>(Discovery::DEFAULT_RTPS));
00265   }
00266 
00267   return 0;
00268 }
00269 
00270 // Participant operations:
00271 
00272 OpenDDS::DCPS::RepoId
00273 RtpsDiscovery::generate_participant_guid() {
00274   OpenDDS::DCPS::RepoId id = GUID_UNKNOWN;
00275   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, id);
00276   if (!guid_interface_.empty()) {
00277     if (guid_gen_.interfaceName(guid_interface_.c_str()) != 0) {
00278       if (DCPS::DCPS_debug_level) {
00279         ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant()"
00280                    " - attempt to use specific network interface's MAC addr for"
00281                    " GUID generation failed.\n"));
00282       }
00283     }
00284   }
00285   guid_gen_.populate(id);
00286   id.entityId = ENTITYID_PARTICIPANT;
00287   return id;
00288 }
00289 
00290 DCPS::AddDomainStatus
00291 RtpsDiscovery::add_domain_participant(DDS::DomainId_t domain,
00292                                       const DDS::DomainParticipantQos& qos)
00293 {
00294   DCPS::AddDomainStatus ads = {OpenDDS::DCPS::RepoId(), false /*federated*/};
00295   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00296   if (!guid_interface_.empty()) {
00297     if (guid_gen_.interfaceName(guid_interface_.c_str()) != 0) {
00298       if (DCPS::DCPS_debug_level) {
00299         ACE_DEBUG((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant()"
00300                    " - attempt to use specific network interface's MAC addr for"
00301                    " GUID generation failed.\n"));
00302       }
00303     }
00304   }
00305   guid_gen_.populate(ads.id);
00306   ads.id.entityId = ENTITYID_PARTICIPANT;
00307   try {
00308     const DCPS::RcHandle<Spdp> spdp (DCPS::make_rch<Spdp>(domain, ref(ads.id), qos, this));
00309     // ads.id may change during Spdp constructor
00310     participants_[domain][ads.id] = spdp;
00311   } catch (const std::exception& e) {
00312     ads.id = GUID_UNKNOWN;
00313     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsDiscovery::add_domain_participant() - "
00314       "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00315       e.what()));
00316   }
00317   return ads;
00318 }
00319 
00320 #if defined(OPENDDS_SECURITY)
00321 DCPS::AddDomainStatus
00322 RtpsDiscovery::add_domain_participant_secure(DDS::DomainId_t domain,
00323                                       const DDS::DomainParticipantQos& qos,
00324                                       const OpenDDS::DCPS::RepoId& guid,
00325                                       DDS::Security::IdentityHandle id,
00326                                       DDS::Security::PermissionsHandle perm,
00327                                       DDS::Security::ParticipantCryptoHandle part_crypto)
00328 {
00329   DCPS::AddDomainStatus ads = {guid, false /*federated*/};
00330   ads.id.entityId = ENTITYID_PARTICIPANT;
00331   try {
00332     const DCPS::RcHandle<Spdp> spdp (DCPS::make_rch<Spdp>(domain, ads.id, qos, this, id, perm, part_crypto));
00333     participants_[domain][ads.id] = spdp;
00334   } catch (const std::exception& e) {
00335     ads.id = GUID_UNKNOWN;
00336     ACE_ERROR((LM_WARNING, "(%P|%t) RtpsDiscovery::add_domain_participant_secure() - "
00337       "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00338       e.what()));
00339   }
00340   return ads;
00341 }
00342 #endif
00343 
00344 void
00345 RtpsDiscovery::signal_liveliness(const DDS::DomainId_t domain_id,
00346                                  const OpenDDS::DCPS::RepoId& part_id,
00347                                  DDS::LivelinessQosPolicyKind kind)
00348 {
00349   get_part(domain_id, part_id)->signal_liveliness(kind);
00350 }
00351 
00352 RtpsDiscovery::StaticInitializer::StaticInitializer()
00353 {
00354   TheServiceParticipant->register_discovery_type("rtps_discovery", new Config);
00355 }
00356 
00357 } // namespace DCPS
00358 } // namespace OpenDDS
00359 
00360 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1