RtpsDiscovery.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "RtpsDiscovery.h"
00009 
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dds/DCPS/ConfigUtils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Marked_Default_Qos.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018 
00019 #include "ace/Reactor.h"
00020 #include "ace/Select_Reactor.h"
00021 
00022 #include <cstdlib>
00023 
00024 namespace {
00025   u_short get_default_d0(u_short fallback)
00026   {
00027 #if !defined ACE_LACKS_GETENV && !defined ACE_LACKS_ENV
00028     const char* from_env = std::getenv("OPENDDS_RTPS_DEFAULT_D0");
00029     if (from_env) {
00030       return static_cast<u_short>(std::atoi(from_env));
00031     }
00032 #endif
00033     return fallback;
00034   }
00035 }
00036 
00037 namespace OpenDDS {
00038 namespace RTPS {
00039 
00040 RtpsDiscovery::RtpsDiscovery(const RepoKey& key)
00041   : DCPS::PeerDiscovery<Spdp>(key)
00042   , resend_period_(30 /*seconds*/) // see RTPS v2.1 9.6.1.4.2
00043   , pb_(7400) // see RTPS v2.1 9.6.1.3 for PB, DG, PG, D0, D1 defaults
00044   , dg_(250)
00045   , pg_(2)
00046   , d0_(get_default_d0(0))
00047   , d1_(10)
00048   , dx_(2)
00049   , ttl_(1)
00050   , sedp_multicast_(true)
00051   , default_multicast_group_("239.255.0.1")
00052 {
00053 }
00054 
00055 RtpsDiscovery::~RtpsDiscovery()
00056 {
00057 }
00058 
00059 namespace {
00060   const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00061 }
00062 
00063 int
00064 RtpsDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00065 {
00066   const ACE_Configuration_Section_Key &root = cf.root_section();
00067   ACE_Configuration_Section_Key rtps_sect;
00068 
00069   if (cf.open_section(root, RTPS_SECTION_NAME, 0, rtps_sect) == 0) {
00070 
00071     // Ensure there are no properties in this section
00072     DCPS::ValueMap vm;
00073     if (DCPS::pullValues(cf, rtps_sect, vm) > 0) {
00074       // There are values inside [rtps_discovery]
00075       ACE_ERROR_RETURN((LM_ERROR,
00076                         ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00077                         ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
00078                        -1);
00079     }
00080     // Process the subsections of this section (the individual rtps_discovery/*)
00081     DCPS::KeyList keys;
00082     if (DCPS::processSections(cf, rtps_sect, keys) != 0) {
00083       ACE_ERROR_RETURN((LM_ERROR,
00084                         ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00085                         ACE_TEXT("too many nesting layers in the [rtps] section.\n")),
00086                        -1);
00087     }
00088 
00089     // Loop through the [rtps_discovery/*] sections
00090     for (DCPS::KeyList::const_iterator it = keys.begin();
00091          it != keys.end(); ++it) {
00092       const OPENDDS_STRING& rtps_name = it->first;
00093 
00094       int resend;
00095       u_short pb, dg, pg, d0, d1, dx;
00096       unsigned char ttl;
00097       AddrVec spdp_send_addrs;
00098       OPENDDS_STRING default_multicast_group = "239.255.0.1" /*RTPS v2.1 9.6.1.4.1*/;
00099       OPENDDS_STRING mi, sla;
00100       bool has_resend = false, has_pb = false, has_dg = false, has_pg = false,
00101         has_d0 = false, has_d1 = false, has_dx = false, has_sm = false,
00102         has_ttl = false, sm = false;
00103 
00104       DCPS::ValueMap values;
00105       DCPS::pullValues(cf, it->second, values);
00106       for (DCPS::ValueMap::const_iterator it = values.begin();
00107            it != values.end(); ++it) {
00108         const OPENDDS_STRING& name = it->first;
00109         if (name == "ResendPeriod") {
00110           const OPENDDS_STRING& value = it->second;
00111           has_resend = DCPS::convertToInteger(value, resend);
00112           if (!has_resend) {
00113             ACE_ERROR_RETURN((LM_ERROR,
00114               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00115               ACE_TEXT("Invalid entry (%C) for ResendPeriod in ")
00116               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00117               value.c_str(), rtps_name.c_str()), -1);
00118           }
00119         } else if (name == "PB") {
00120           const OPENDDS_STRING& value = it->second;
00121           has_pb = DCPS::convertToInteger(value, pb);
00122           if (!has_pb) {
00123             ACE_ERROR_RETURN((LM_ERROR,
00124               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00125               ACE_TEXT("Invalid entry (%C) for PB in ")
00126               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00127               value.c_str(), rtps_name.c_str()), -1);
00128           }
00129         } else if (name == "DG") {
00130           const OPENDDS_STRING& value = it->second;
00131           has_dg = DCPS::convertToInteger(value, dg);
00132           if (!has_dg) {
00133             ACE_ERROR_RETURN((LM_ERROR,
00134               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00135               ACE_TEXT("Invalid entry (%C) for DG in ")
00136               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00137               value.c_str(), rtps_name.c_str()), -1);
00138           }
00139         } else if (name == "PG") {
00140           const OPENDDS_STRING& value = it->second;
00141           has_pg = DCPS::convertToInteger(value, pg);
00142           if (!has_pg) {
00143             ACE_ERROR_RETURN((LM_ERROR,
00144               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00145               ACE_TEXT("Invalid entry (%C) for PG in ")
00146               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00147               value.c_str(), rtps_name.c_str()), -1);
00148           }
00149         } else if (name == "D0") {
00150           const OPENDDS_STRING& value = it->second;
00151           has_d0 = DCPS::convertToInteger(value, d0);
00152           if (!has_d0) {
00153             ACE_ERROR_RETURN((LM_ERROR,
00154               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00155               ACE_TEXT("Invalid entry (%C) for D0 in ")
00156               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00157               value.c_str(), rtps_name.c_str()), -1);
00158           }
00159         } else if (name == "D1") {
00160           const OPENDDS_STRING& value = it->second;
00161           has_d1 = DCPS::convertToInteger(value, d1);
00162           if (!has_d1) {
00163             ACE_ERROR_RETURN((LM_ERROR,
00164               ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00165               ACE_TEXT("Invalid entry (%C) for D1 in ")
00166               ACE_TEXT("[rtps_discovery/%C] section.\n"),
00167               value.c_str(), rtps_name.c_str()), -1);
00168           }
00169         } else if (name == "DX") {
00170           const OPENDDS_STRING& value = it->second;
00171           has_dx = DCPS::convertToInteger(value, dx);
00172           if (!has_dx) {
00173             ACE_ERROR_RETURN((LM_ERROR,
00174                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00175                ACE_TEXT("Invalid entry (%C) for DX in ")
00176                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00177                value.c_str(), rtps_name.c_str()), -1);
00178           }
00179         } else if (name == "TTL") {
00180           const OPENDDS_STRING& value = it->second;
00181           unsigned short ttl_us;
00182           has_ttl = DCPS::convertToInteger(value, ttl_us);
00183           ttl = static_cast<unsigned char>(ttl_us);
00184           if (!has_ttl || ttl_us > UCHAR_MAX) {
00185             ACE_ERROR_RETURN((LM_ERROR,
00186                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00187                ACE_TEXT("Invalid entry (%C) for TTL in ")
00188                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00189                value.c_str(), rtps_name.c_str()), -1);
00190           }
00191         } else if (name == "SedpMulticast") {
00192           const OPENDDS_STRING& value = it->second;
00193           int smInt;
00194           has_sm = DCPS::convertToInteger(value, smInt);
00195           if (!has_sm) {
00196             ACE_ERROR_RETURN((LM_ERROR,
00197                ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
00198                ACE_TEXT("Invalid entry (%C) for SedpMulticast in ")
00199                ACE_TEXT("[rtps_discovery/%C] section.\n"),
00200                value.c_str(), rtps_name.c_str()), -1);
00201           }
00202           sm = bool(smInt);
00203         } else if (name == "MulticastInterface") {
00204           mi = it->second;
00205         } else if (name == "SedpLocalAddress") {
00206           sla = it->second;
00207         } else if (name == "InteropMulticastOverride") {
00208           /// FUTURE: handle > 1 group.
00209           default_multicast_group = it->second;
00210         } else if (name == "SpdpSendAddrs") {
00211           const OPENDDS_STRING& value = it->second;
00212           size_t i = 0;
00213           do {
00214             i = value.find_first_not_of(' ', i); // skip spaces
00215             const size_t n = value.find_first_of(", ", i);
00216             spdp_send_addrs.push_back(value.substr(i, (n == OPENDDS_STRING::npos) ? n : n - i));
00217             i = value.find(',', i);
00218           } while (i++ != OPENDDS_STRING::npos); // skip past comma if there is one
00219         } else {
00220           ACE_ERROR_RETURN((LM_ERROR,
00221                             ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00222                             ACE_TEXT("Unexpected entry (%C) in [rtps_discovery/%C] section.\n"),
00223                             name.c_str(), rtps_name.c_str()),
00224                            -1);
00225         }
00226       }
00227 
00228       RtpsDiscovery_rch discovery = new RtpsDiscovery(rtps_name);
00229       if (has_resend) discovery->resend_period(ACE_Time_Value(resend));
00230       if (has_pb) discovery->pb(pb);
00231       if (has_dg) discovery->dg(dg);
00232       if (has_pg) discovery->pg(pg);
00233       if (has_d0) discovery->d0(d0);
00234       if (has_d1) discovery->d1(d1);
00235       if (has_dx) discovery->dx(dx);
00236       if (has_ttl) discovery->ttl(ttl);
00237       if (has_sm) discovery->sedp_multicast(sm);
00238       discovery->multicast_interface(mi);
00239       discovery->default_multicast_group( default_multicast_group);
00240       discovery->spdp_send_addrs().swap(spdp_send_addrs);
00241       discovery->sedp_local_address(sla);
00242       TheServiceParticipant->add_discovery(
00243         DCPS::static_rchandle_cast<Discovery>(discovery));
00244     }
00245   }
00246 
00247   // If the default RTPS discovery object has not been configured,
00248   // instantiate it now.
00249   const DCPS::Service_Participant::RepoKeyDiscoveryMap& discoveryMap = TheServiceParticipant->discoveryMap();
00250   if (discoveryMap.find(Discovery::DEFAULT_RTPS) == discoveryMap.end()) {
00251     RtpsDiscovery_rch discovery = new RtpsDiscovery(Discovery::DEFAULT_RTPS);
00252     TheServiceParticipant->add_discovery(
00253       DCPS::static_rchandle_cast<Discovery>(discovery));
00254   }
00255 
00256   return 0;
00257 }
00258 
00259 // Participant operations:
00260 
00261 DCPS::AddDomainStatus
00262 RtpsDiscovery::add_domain_participant(DDS::DomainId_t domain,
00263                                       const DDS::DomainParticipantQos& qos)
00264 {
00265   DCPS::AddDomainStatus ads = {OpenDDS::DCPS::RepoId(), false /*federated*/};
00266   ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00267   guid_gen_.populate(ads.id);
00268   ads.id.entityId = ENTITYID_PARTICIPANT;
00269   try {
00270     const DCPS::RcHandle<Spdp> spdp = new Spdp(domain, ads.id, qos, this);
00271     // ads.id may change during Spdp constructor
00272     participants_[domain][ads.id] = spdp;
00273   } catch (const std::exception& e) {
00274     ads.id = GUID_UNKNOWN;
00275     ACE_ERROR((LM_ERROR, "(%P|%t) RtpsDiscovery::add_domain_participant() - "
00276       "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00277       e.what()));
00278   }
00279   return ads;
00280 }
00281 
00282 void
00283 RtpsDiscovery::signal_liveliness(const DDS::DomainId_t domain_id,
00284                                  const OpenDDS::DCPS::RepoId& part_id,
00285                                  DDS::LivelinessQosPolicyKind kind)
00286 {
00287   get_part(domain_id, part_id)->signal_liveliness(kind);
00288 }
00289 
00290 RtpsDiscovery::StaticInitializer::StaticInitializer()
00291 {
00292   TheServiceParticipant->register_discovery_type("rtps_discovery", new Config);
00293 }
00294 
00295 } // namespace DCPS
00296 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:26 2016 for OpenDDS by  doxygen 1.4.7