00001
00002
00003
00004
00005
00006
00007
00008 #include "RtpsDiscovery.h"
00009
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dds/DCPS/ConfigUtils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/SubscriberImpl.h"
00014 #include "dds/DCPS/Marked_Default_Qos.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsInfoUtilsC.h"
00018
00019 #include "ace/Reactor.h"
00020 #include "ace/Select_Reactor.h"
00021
00022 #include <cstdlib>
00023
00024 namespace {
00025 u_short get_default_d0(u_short fallback)
00026 {
00027 #if !defined ACE_LACKS_GETENV && !defined ACE_LACKS_ENV
00028 const char* from_env = std::getenv("OPENDDS_RTPS_DEFAULT_D0");
00029 if (from_env) {
00030 return static_cast<u_short>(std::atoi(from_env));
00031 }
00032 #endif
00033 return fallback;
00034 }
00035 }
00036
00037 namespace OpenDDS {
00038 namespace RTPS {
00039
00040 RtpsDiscovery::RtpsDiscovery(const RepoKey& key)
00041 : DCPS::PeerDiscovery<Spdp>(key)
00042 , resend_period_(30 )
00043 , pb_(7400)
00044 , dg_(250)
00045 , pg_(2)
00046 , d0_(get_default_d0(0))
00047 , d1_(10)
00048 , dx_(2)
00049 , ttl_(1)
00050 , sedp_multicast_(true)
00051 , default_multicast_group_("239.255.0.1")
00052 {
00053 }
00054
00055 RtpsDiscovery::~RtpsDiscovery()
00056 {
00057 }
00058
00059 namespace {
00060 const ACE_TCHAR RTPS_SECTION_NAME[] = ACE_TEXT("rtps_discovery");
00061 }
00062
00063 int
00064 RtpsDiscovery::Config::discovery_config(ACE_Configuration_Heap& cf)
00065 {
00066 const ACE_Configuration_Section_Key &root = cf.root_section();
00067 ACE_Configuration_Section_Key rtps_sect;
00068
00069 if (cf.open_section(root, RTPS_SECTION_NAME, 0, rtps_sect) == 0) {
00070
00071
00072 DCPS::ValueMap vm;
00073 if (DCPS::pullValues(cf, rtps_sect, vm) > 0) {
00074
00075 ACE_ERROR_RETURN((LM_ERROR,
00076 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00077 ACE_TEXT("rtps_discovery sections must have a subsection name\n")),
00078 -1);
00079 }
00080
00081 DCPS::KeyList keys;
00082 if (DCPS::processSections(cf, rtps_sect, keys) != 0) {
00083 ACE_ERROR_RETURN((LM_ERROR,
00084 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00085 ACE_TEXT("too many nesting layers in the [rtps] section.\n")),
00086 -1);
00087 }
00088
00089
00090 for (DCPS::KeyList::const_iterator it = keys.begin();
00091 it != keys.end(); ++it) {
00092 const OPENDDS_STRING& rtps_name = it->first;
00093
00094 int resend;
00095 u_short pb, dg, pg, d0, d1, dx;
00096 unsigned char ttl;
00097 AddrVec spdp_send_addrs;
00098 OPENDDS_STRING default_multicast_group = "239.255.0.1" ;
00099 OPENDDS_STRING mi, sla;
00100 bool has_resend = false, has_pb = false, has_dg = false, has_pg = false,
00101 has_d0 = false, has_d1 = false, has_dx = false, has_sm = false,
00102 has_ttl = false, sm = false;
00103
00104 DCPS::ValueMap values;
00105 DCPS::pullValues(cf, it->second, values);
00106 for (DCPS::ValueMap::const_iterator it = values.begin();
00107 it != values.end(); ++it) {
00108 const OPENDDS_STRING& name = it->first;
00109 if (name == "ResendPeriod") {
00110 const OPENDDS_STRING& value = it->second;
00111 has_resend = DCPS::convertToInteger(value, resend);
00112 if (!has_resend) {
00113 ACE_ERROR_RETURN((LM_ERROR,
00114 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00115 ACE_TEXT("Invalid entry (%C) for ResendPeriod in ")
00116 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00117 value.c_str(), rtps_name.c_str()), -1);
00118 }
00119 } else if (name == "PB") {
00120 const OPENDDS_STRING& value = it->second;
00121 has_pb = DCPS::convertToInteger(value, pb);
00122 if (!has_pb) {
00123 ACE_ERROR_RETURN((LM_ERROR,
00124 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00125 ACE_TEXT("Invalid entry (%C) for PB in ")
00126 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00127 value.c_str(), rtps_name.c_str()), -1);
00128 }
00129 } else if (name == "DG") {
00130 const OPENDDS_STRING& value = it->second;
00131 has_dg = DCPS::convertToInteger(value, dg);
00132 if (!has_dg) {
00133 ACE_ERROR_RETURN((LM_ERROR,
00134 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00135 ACE_TEXT("Invalid entry (%C) for DG in ")
00136 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00137 value.c_str(), rtps_name.c_str()), -1);
00138 }
00139 } else if (name == "PG") {
00140 const OPENDDS_STRING& value = it->second;
00141 has_pg = DCPS::convertToInteger(value, pg);
00142 if (!has_pg) {
00143 ACE_ERROR_RETURN((LM_ERROR,
00144 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00145 ACE_TEXT("Invalid entry (%C) for PG in ")
00146 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00147 value.c_str(), rtps_name.c_str()), -1);
00148 }
00149 } else if (name == "D0") {
00150 const OPENDDS_STRING& value = it->second;
00151 has_d0 = DCPS::convertToInteger(value, d0);
00152 if (!has_d0) {
00153 ACE_ERROR_RETURN((LM_ERROR,
00154 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00155 ACE_TEXT("Invalid entry (%C) for D0 in ")
00156 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00157 value.c_str(), rtps_name.c_str()), -1);
00158 }
00159 } else if (name == "D1") {
00160 const OPENDDS_STRING& value = it->second;
00161 has_d1 = DCPS::convertToInteger(value, d1);
00162 if (!has_d1) {
00163 ACE_ERROR_RETURN((LM_ERROR,
00164 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00165 ACE_TEXT("Invalid entry (%C) for D1 in ")
00166 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00167 value.c_str(), rtps_name.c_str()), -1);
00168 }
00169 } else if (name == "DX") {
00170 const OPENDDS_STRING& value = it->second;
00171 has_dx = DCPS::convertToInteger(value, dx);
00172 if (!has_dx) {
00173 ACE_ERROR_RETURN((LM_ERROR,
00174 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00175 ACE_TEXT("Invalid entry (%C) for DX in ")
00176 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00177 value.c_str(), rtps_name.c_str()), -1);
00178 }
00179 } else if (name == "TTL") {
00180 const OPENDDS_STRING& value = it->second;
00181 unsigned short ttl_us;
00182 has_ttl = DCPS::convertToInteger(value, ttl_us);
00183 ttl = static_cast<unsigned char>(ttl_us);
00184 if (!has_ttl || ttl_us > UCHAR_MAX) {
00185 ACE_ERROR_RETURN((LM_ERROR,
00186 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00187 ACE_TEXT("Invalid entry (%C) for TTL in ")
00188 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00189 value.c_str(), rtps_name.c_str()), -1);
00190 }
00191 } else if (name == "SedpMulticast") {
00192 const OPENDDS_STRING& value = it->second;
00193 int smInt;
00194 has_sm = DCPS::convertToInteger(value, smInt);
00195 if (!has_sm) {
00196 ACE_ERROR_RETURN((LM_ERROR,
00197 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config ")
00198 ACE_TEXT("Invalid entry (%C) for SedpMulticast in ")
00199 ACE_TEXT("[rtps_discovery/%C] section.\n"),
00200 value.c_str(), rtps_name.c_str()), -1);
00201 }
00202 sm = bool(smInt);
00203 } else if (name == "MulticastInterface") {
00204 mi = it->second;
00205 } else if (name == "SedpLocalAddress") {
00206 sla = it->second;
00207 } else if (name == "InteropMulticastOverride") {
00208
00209 default_multicast_group = it->second;
00210 } else if (name == "SpdpSendAddrs") {
00211 const OPENDDS_STRING& value = it->second;
00212 size_t i = 0;
00213 do {
00214 i = value.find_first_not_of(' ', i);
00215 const size_t n = value.find_first_of(", ", i);
00216 spdp_send_addrs.push_back(value.substr(i, (n == OPENDDS_STRING::npos) ? n : n - i));
00217 i = value.find(',', i);
00218 } while (i++ != OPENDDS_STRING::npos);
00219 } else {
00220 ACE_ERROR_RETURN((LM_ERROR,
00221 ACE_TEXT("(%P|%t) RtpsDiscovery::Config::discovery_config(): ")
00222 ACE_TEXT("Unexpected entry (%C) in [rtps_discovery/%C] section.\n"),
00223 name.c_str(), rtps_name.c_str()),
00224 -1);
00225 }
00226 }
00227
00228 RtpsDiscovery_rch discovery = new RtpsDiscovery(rtps_name);
00229 if (has_resend) discovery->resend_period(ACE_Time_Value(resend));
00230 if (has_pb) discovery->pb(pb);
00231 if (has_dg) discovery->dg(dg);
00232 if (has_pg) discovery->pg(pg);
00233 if (has_d0) discovery->d0(d0);
00234 if (has_d1) discovery->d1(d1);
00235 if (has_dx) discovery->dx(dx);
00236 if (has_ttl) discovery->ttl(ttl);
00237 if (has_sm) discovery->sedp_multicast(sm);
00238 discovery->multicast_interface(mi);
00239 discovery->default_multicast_group( default_multicast_group);
00240 discovery->spdp_send_addrs().swap(spdp_send_addrs);
00241 discovery->sedp_local_address(sla);
00242 TheServiceParticipant->add_discovery(
00243 DCPS::static_rchandle_cast<Discovery>(discovery));
00244 }
00245 }
00246
00247
00248
00249 const DCPS::Service_Participant::RepoKeyDiscoveryMap& discoveryMap = TheServiceParticipant->discoveryMap();
00250 if (discoveryMap.find(Discovery::DEFAULT_RTPS) == discoveryMap.end()) {
00251 RtpsDiscovery_rch discovery = new RtpsDiscovery(Discovery::DEFAULT_RTPS);
00252 TheServiceParticipant->add_discovery(
00253 DCPS::static_rchandle_cast<Discovery>(discovery));
00254 }
00255
00256 return 0;
00257 }
00258
00259
00260
00261 DCPS::AddDomainStatus
00262 RtpsDiscovery::add_domain_participant(DDS::DomainId_t domain,
00263 const DDS::DomainParticipantQos& qos)
00264 {
00265 DCPS::AddDomainStatus ads = {OpenDDS::DCPS::RepoId(), false };
00266 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ads);
00267 guid_gen_.populate(ads.id);
00268 ads.id.entityId = ENTITYID_PARTICIPANT;
00269 try {
00270 const DCPS::RcHandle<Spdp> spdp = new Spdp(domain, ads.id, qos, this);
00271
00272 participants_[domain][ads.id] = spdp;
00273 } catch (const std::exception& e) {
00274 ads.id = GUID_UNKNOWN;
00275 ACE_ERROR((LM_ERROR, "(%P|%t) RtpsDiscovery::add_domain_participant() - "
00276 "failed to initialize RTPS Simple Participant Discovery Protocol: %C\n",
00277 e.what()));
00278 }
00279 return ads;
00280 }
00281
00282 void
00283 RtpsDiscovery::signal_liveliness(const DDS::DomainId_t domain_id,
00284 const OpenDDS::DCPS::RepoId& part_id,
00285 DDS::LivelinessQosPolicyKind kind)
00286 {
00287 get_part(domain_id, part_id)->signal_liveliness(kind);
00288 }
00289
00290 RtpsDiscovery::StaticInitializer::StaticInitializer()
00291 {
00292 TheServiceParticipant->register_discovery_type("rtps_discovery", new Config);
00293 }
00294
00295 }
00296 }