#include <Spdp.h>
Each instance of class Spdp represents the implementation of the RTPS Simple Participant Discovery Protocol for a single local DomainParticipant.
Definition at line 47 of file Spdp.h.
OpenDDS::RTPS::Spdp::Spdp | ( | DDS::DomainId_t | domain, | |
DCPS::RepoId & | guid, | |||
const DDS::DomainParticipantQos & | qos, | |||
RtpsDiscovery * | disco | |||
) |
OpenDDS::RTPS::Spdp::~Spdp | ( | ) |
Definition at line 251 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), OpenDDS::DCPS::DCPS_debug_level, eh_, eh_shutdown_, get_discovered_participant_ids(), LM_INFO, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), ACE_Event_Handler_var::reset(), sedp_, OpenDDS::RTPS::Sedp::shutdown(), shutdown_cond_, shutdown_flag_, tport_, and ACE_Condition< ACE_Thread_Mutex >::wait().
00252 { 00253 shutdown_flag_ = true; 00254 { 00255 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00256 if (DCPS::DCPS_debug_level > 3) { 00257 ACE_DEBUG((LM_INFO, 00258 ACE_TEXT("(%P|%t) Spdp::~Spdp ") 00259 ACE_TEXT("remove discovered participants\n"))); 00260 } 00261 00262 #if defined(OPENDDS_SECURITY) 00263 write_secure_disposes(); 00264 #endif 00265 00266 // Iterate through a copy of the repo Ids, rather than the map 00267 // as it gets unlocked in remove_discovered_participant() 00268 DCPS::RepoIdSet participant_ids; 00269 get_discovered_participant_ids(participant_ids); 00270 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin(); 00271 participant_id != participant_ids.end(); 00272 ++participant_id) 00273 { 00274 DiscoveredParticipantIter part = participants_.find(*participant_id); 00275 if (part != participants_.end()) { 00276 remove_discovered_participant(part); 00277 } 00278 } 00279 } 00280 00281 // ensure sedp's task queue is drained before data members are being 00282 // deleted 00283 sedp_.shutdown(); 00284 00285 // release lock for reset of event handler, which may delete transport 00286 tport_->close(); 00287 eh_.reset(); 00288 { 00289 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00290 while (!eh_shutdown_) { 00291 shutdown_cond_.wait(); 00292 } 00293 } 00294 }
bool OpenDDS::RTPS::Spdp::announce_domain_participant_qos | ( | ) | [protected, virtual] |
Reimplemented from OpenDDS::DCPS::LocalParticipant< Sedp >.
bool OpenDDS::RTPS::Spdp::associated | ( | ) | const |
Definition at line 1778 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.
01779 { 01780 return !participants_.empty(); 01781 }
Security::SPDPdiscoveredParticipantData OpenDDS::RTPS::Spdp::build_local_pdata | ( | Security::DiscoveredParticipantDataKind | kind | ) | [protected] |
Definition at line 1158 of file Spdp.cpp.
References OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER, DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER, DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, disco_, guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::LOCATOR_KIND_UDPv4, DDS::DomainParticipantQos::property, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::DCPS::LocalParticipant< Sedp >::qos_, OpenDDS::RTPS::RtpsDiscovery::resend_period(), OpenDDS::RTPS::security_attributes_to_bitmask(), DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, sedp_multicast_, sedp_unicast_, DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER, DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER, DDS::DomainParticipantQos::user_data, and OpenDDS::RTPS::VENDORID_OPENDDS.
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::write_i().
01159 { 01160 BuiltinEndpointSet_t availableBuiltinEndpoints = 01161 DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER | 01162 DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR | 01163 DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER | 01164 DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR | 01165 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER | 01166 DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR | 01167 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER | 01168 BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER 01169 ; 01170 01171 #if defined(OPENDDS_SECURITY) 01172 if (is_security_enabled()) { 01173 availableBuiltinEndpoints |= 01174 DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER | 01175 DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER | 01176 DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER | 01177 DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER | 01178 DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER | 01179 DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER | 01180 DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER | 01181 DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER | 01182 DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER | 01183 DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER | 01184 DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER | 01185 DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER 01186 ; 01187 } 01188 #endif 01189 01190 // The RTPS spec has no constants for the builtinTopics{Writer,Reader} 01191 01192 // This locator list should not be empty, but we won't actually be using it. 01193 // The OpenDDS publication/subscription data will have locators included. 01194 DCPS::LocatorSeq nonEmptyList(1); 01195 nonEmptyList.length(1); 01196 nonEmptyList[0].kind = LOCATOR_KIND_UDPv4; 01197 nonEmptyList[0].port = 12345; 01198 std::memset(nonEmptyList[0].address, 0, 12); 01199 nonEmptyList[0].address[12] = 127; 01200 nonEmptyList[0].address[13] = 0; 01201 nonEmptyList[0].address[14] = 0; 01202 nonEmptyList[0].address[15] = 1; 01203 01204 const GuidPrefix_t& gp = guid_.guidPrefix; 01205 01206 const Security::SPDPdiscoveredParticipantData pdata = { 01207 kind, 01208 { // ParticipantBuiltinTopicDataSecure 01209 { // ParticipantBuiltinTopicData (security enhanced) 01210 { // ParticipantBuiltinTopicData (original) 01211 DDS::BuiltinTopicKey_t() /*ignored*/, 01212 qos_.user_data 01213 }, 01214 01215 #if defined(OPENDDS_SECURITY) 01216 identity_token_, 01217 permissions_token_, 01218 #else 01219 DDS::Security::Token(), 01220 DDS::Security::Token(), 01221 #endif 01222 01223 qos_.property, 01224 01225 #if defined(OPENDDS_SECURITY) 01226 { 01227 security_attributes_to_bitmask(participant_sec_attr_), 01228 participant_sec_attr_.plugin_participant_attributes 01229 } 01230 #else 01231 DDS::Security::ParticipantSecurityInfo() 01232 #endif 01233 01234 }, 01235 01236 #if defined(OPENDDS_SECURITY) 01237 identity_status_token_ 01238 #else 01239 DDS::Security::Token() 01240 #endif 01241 01242 }, 01243 { // ParticipantProxy_t 01244 PROTOCOLVERSION, 01245 {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5], 01246 gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]}, 01247 VENDORID_OPENDDS, 01248 false /*expectsIQoS*/, 01249 availableBuiltinEndpoints, 01250 sedp_unicast_, 01251 sedp_multicast_, 01252 nonEmptyList /*defaultMulticastLocatorList*/, 01253 nonEmptyList /*defaultUnicastLocatorList*/, 01254 {0 /*manualLivelinessCount*/} //FUTURE: implement manual liveliness 01255 }, 01256 { // Duration_t (leaseDuration) 01257 static_cast<CORBA::Long>((disco_->resend_period() * LEASE_MULT).sec()), 01258 0 // we are not supporting fractional seconds in the lease duration 01259 } 01260 }; 01261 01262 return pdata; 01263 }
void OpenDDS::RTPS::Spdp::data_received | ( | const DataSubmessage & | data, | |
const ParameterList & | plist | |||
) | [private] |
Definition at line 484 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::RTPS::ParameterListConverter::from_param_list(), handle_participant_data(), OpenDDS::RTPS::DataSubmessage::inlineQos, LM_ERROR, OpenDDS::DCPS::SAMPLE_DATA, shutdown_flag_, and ACE_Atomic_Op_GCC< T >::value().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_input().
00485 { 00486 if (shutdown_flag_.value()) { return; } 00487 00488 Security::SPDPdiscoveredParticipantData pdata; 00489 if (ParameterListConverter::from_param_list(plist, pdata) < 0) { 00490 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ") 00491 ACE_TEXT("failed to convert from ParameterList to ") 00492 ACE_TEXT("SPDPdiscoveredParticipantData\n"))); 00493 return; 00494 } 00495 00496 DCPS::MessageId msg_id = (data.inlineQos.length() && disposed(data.inlineQos)) ? DCPS::DISPOSE_INSTANCE : DCPS::SAMPLE_DATA; 00497 00498 handle_participant_data(msg_id, pdata); 00499 }
Sedp& OpenDDS::RTPS::Spdp::endpoint_manager | ( | ) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::LocalParticipant< Sedp >.
Definition at line 119 of file Spdp.h.
00119 { return sedp_; }
void OpenDDS::RTPS::Spdp::fini_bit | ( | ) |
Definition at line 1109 of file Spdp.cpp.
References OpenDDS::RTPS::Sedp::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, OpenDDS::RTPS::WaitForAcks::reset(), sedp_, tport_, OpenDDS::RTPS::WaitForAcks::wait_for_acks(), and wait_for_acks_.
01110 { 01111 bit_subscriber_ = 0; 01112 wait_for_acks_.reset(); 01113 // request for SpdpTransport(actually Reactor) thread and Sedp::Task 01114 // to acknowledge 01115 tport_->acknowledge(); 01116 sedp_.acknowledge(); 01117 // wait for the 2 acknowledgements 01118 wait_for_acks_.wait_for_acks(2); 01119 }
bool OpenDDS::RTPS::Spdp::get_default_locators | ( | const DCPS::RepoId & | part_id, | |
DCPS::LocatorSeq & | target, | |||
bool & | inlineQos | |||
) |
void OpenDDS::RTPS::Spdp::get_discovered_participant_ids | ( | DCPS::RepoIdSet & | results | ) | const [private] |
Definition at line 1791 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.
Referenced by remove_expired_participants(), and ~Spdp().
01792 { 01793 DiscoveredParticipantMap::const_iterator idx; 01794 for (idx = participants_.begin(); idx != participants_.end(); ++idx) 01795 { 01796 results.insert(idx->first); 01797 } 01798 }
const DCPS::RepoId& OpenDDS::RTPS::Spdp::guid | ( | ) | const [inline] |
void OpenDDS::RTPS::Spdp::handle_participant_data | ( | DCPS::MessageId | id, | |
const Security::SPDPdiscoveredParticipantData & | pdata | |||
) |
Definition at line 322 of file Spdp.cpp.
References ACE_TEXT(), DDS::Security::ParticipantBuiltinTopicDataSecure::base, OpenDDS::Security::SPDPdiscoveredParticipantData::dataKind, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::SPDPdiscoveredParticipantData::ddsParticipantDataSecure, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_SECURE, OpenDDS::DCPS::ENTITYID_PARTICIPANT, ACE_OS::gettimeofday(), guid_, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::Security::SPDPdiscoveredParticipantData::leaseDuration, LM_DEBUG, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, match_unauthenticated(), DDS::NOT_NEW_VIEW_STATE, OPENDDS_STRING, part_bit(), OpenDDS::Security::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), OpenDDS::RTPS::Time_t::seconds, sedp_, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data(), tport_, and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().
Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_secure_i().
00323 { 00324 const ACE_Time_Value now = ACE_OS::gettimeofday(); 00325 00326 // Make a (non-const) copy so we can tweak values below 00327 Security::SPDPdiscoveredParticipantData pdata(cpdata); 00328 00329 const DCPS::RepoId guid = make_guid(pdata.participantProxy.guidPrefix, DCPS::ENTITYID_PARTICIPANT); 00330 00331 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00332 if (sedp_.ignoring(guid)) { 00333 // Ignore, this is our domain participant or one that the user has 00334 // asked us to ignore. 00335 return; 00336 } 00337 00338 // Find the participant - iterator valid only as long as we hold the lock 00339 DiscoveredParticipantIter iter = participants_.find(guid); 00340 00341 if (iter == participants_.end()) { 00342 00343 // Trying to delete something that doesn't exist is a NOOP 00344 if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) { 00345 return; 00346 } 00347 00348 // copy guid prefix (octet[12]) into BIT key (long[3]) 00349 std::memcpy(pdata.ddsParticipantDataSecure.base.base.key.value, 00350 pdata.participantProxy.guidPrefix, 00351 sizeof(pdata.ddsParticipantDataSecure.base.base.key.value)); 00352 00353 if (DCPS::DCPS_debug_level) { 00354 DCPS::GuidConverter local(guid_), remote(guid); 00355 ACE_DEBUG((LM_DEBUG, 00356 ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"), 00357 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(), 00358 pdata.leaseDuration.seconds)); 00359 } 00360 00361 // add a new participant 00362 participants_[guid] = DiscoveredParticipant(pdata, now); 00363 DiscoveredParticipant& dp = participants_[guid]; 00364 00365 #if defined(OPENDDS_SECURITY) 00366 if (is_security_enabled()) { 00367 // Associate the stateless reader / writer for handshakes & auth requests 00368 sedp_.associate_preauth(dp.pdata_); 00369 00370 // If we've gotten auth requests for this (previously undiscovered) participant, pull in the tokens now 00371 PendingRemoteAuthTokenMap::iterator token_iter = pending_remote_auth_tokens_.find(guid); 00372 if (token_iter != pending_remote_auth_tokens_.end()) { 00373 dp.remote_auth_request_token_ = token_iter->second; 00374 pending_remote_auth_tokens_.erase(token_iter); 00375 } 00376 } 00377 #endif 00378 00379 // Since we've just seen a new participant, let's send out our 00380 // own announcement, so they don't have to wait. 00381 this->tport_->write_i(); 00382 00383 #if defined(OPENDDS_SECURITY) 00384 if (is_security_enabled()) { 00385 bool has_security_data = dp.pdata_.dataKind == Security::DPDK_ENHANCED || 00386 dp.pdata_.dataKind == Security::DPDK_SECURE; 00387 00388 if (has_security_data == false) { 00389 if (participant_sec_attr_.allow_unauthenticated_participants == false) { 00390 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ") 00391 ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"), 00392 std::string(DCPS::GuidConverter(guid)).c_str())); 00393 participants_.erase(guid); 00394 } else { // allow_unauthenticated_participants == true 00395 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED; 00396 match_unauthenticated(guid, dp); 00397 } 00398 } else { // has_security_data == true 00399 dp.identity_token_ = pdata.ddsParticipantDataSecure.base.identity_token; 00400 dp.permissions_token_ = pdata.ddsParticipantDataSecure.base.permissions_token; 00401 dp.property_qos_ = pdata.ddsParticipantDataSecure.base.property; 00402 dp.security_info_ = pdata.ddsParticipantDataSecure.base.security_info; 00403 00404 attempt_authentication(guid, dp); 00405 if (dp.auth_state_ == DCPS::AS_UNAUTHENTICATED) { 00406 if (participant_sec_attr_.allow_unauthenticated_participants == false) { 00407 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ") 00408 ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"), 00409 std::string(DCPS::GuidConverter(guid)).c_str())); 00410 participants_.erase(guid); 00411 } else { // allow_unauthenticated_participants == true 00412 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED; 00413 match_unauthenticated(guid, dp); 00414 } 00415 } else if (dp.auth_state_ == DCPS::AS_AUTHENTICATED) { 00416 if (match_authenticated(guid, dp) == false) { 00417 participants_.erase(guid); 00418 } 00419 } 00420 // otherwise just return, since we're waiting for input to finish authentication 00421 } 00422 } else { 00423 00424 dp.auth_state_ = DCPS::AS_UNAUTHENTICATED; 00425 match_unauthenticated(guid, dp); 00426 00427 } 00428 #else 00429 match_unauthenticated(guid, dp); 00430 #endif 00431 00432 } else { 00433 00434 #if defined(OPENDDS_SECURITY) 00435 // Non-secure updates for authenticated participants are used for liveliness but 00436 // are otherwise ignored. Non-secure dispose messages are ignored completely. 00437 if (iter->second.auth_state_ == DCPS::AS_AUTHENTICATED && 00438 pdata.dataKind != Security::DPDK_SECURE && 00439 id != DCPS::DISPOSE_INSTANCE && 00440 id != DCPS::DISPOSE_UNREGISTER_INSTANCE) 00441 { 00442 iter->second.last_seen_ = now; 00443 return; 00444 } 00445 #endif 00446 00447 if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) { 00448 remove_discovered_participant(iter); 00449 return; 00450 } 00451 00452 // Must unlock when calling into part_bit() as it may call back into us 00453 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00454 00455 // update an existing participant 00456 pdata.ddsParticipantDataSecure.base.base.key = iter->second.pdata_.ddsParticipantDataSecure.base.base.key; 00457 #ifndef OPENDDS_SAFETY_PROFILE 00458 using DCPS::operator!=; 00459 #endif 00460 if (iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data != 00461 pdata.ddsParticipantDataSecure.base.base.user_data) { 00462 iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data = 00463 pdata.ddsParticipantDataSecure.base.base.user_data; 00464 #ifndef DDS_HAS_MINIMUM_BIT 00465 DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit(); 00466 if (bit) { 00467 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 00468 bit->store_synthetic_data(pdata.ddsParticipantDataSecure.base.base, 00469 DDS::NOT_NEW_VIEW_STATE); 00470 } 00471 #endif /* DDS_HAS_MINIMUM_BIT */ 00472 // Perform search again, so iterator becomes valid 00473 iter = participants_.find(guid); 00474 } 00475 // Participant may have been removed while lock released 00476 if (iter != participants_.end()) { 00477 iter->second.pdata_ = pdata; 00478 iter->second.last_seen_ = now; 00479 } 00480 } 00481 }
bool OpenDDS::RTPS::Spdp::has_discovered_participant | ( | const DCPS::RepoId & | guid | ) |
Definition at line 1784 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.
Referenced by OpenDDS::RTPS::Sedp::data_received(), and OpenDDS::RTPS::Sedp::disassociate().
01785 { 01786 return participants_.find(guid) != participants_.end(); 01787 }
void OpenDDS::RTPS::Spdp::init | ( | DDS::DomainId_t | domain, | |
DCPS::RepoId & | guid, | |||
const DDS::DomainParticipantQos & | qos, | |||
RtpsDiscovery * | disco | |||
) | [private] |
Definition at line 112 of file Spdp.cpp.
References OpenDDS::DCPS::Locator_t::address, OpenDDS::RTPS::address_to_bytes(), OpenDDS::RTPS::address_to_kind(), domain_, ACE_INET_Addr::get_port_number(), guid_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignore(), OpenDDS::RTPS::Sedp::init(), OpenDDS::DCPS::Locator_t::kind, OpenDDS::RTPS::Sedp::multicast_group(), sedp_, OpenDDS::RTPS::RtpsDiscovery::sedp_multicast(), sedp_multicast_, sedp_unicast_, and OpenDDS::RTPS::Sedp::unicast_locators().
00116 { 00117 guid = guid_; // may have changed in SpdpTransport constructor 00118 sedp_.ignore(guid); 00119 sedp_.init(guid_, *disco, domain_); 00120 00121 // Append metatraffic unicast locator 00122 sedp_.unicast_locators(sedp_unicast_); 00123 00124 if (disco->sedp_multicast()) { // Append metatraffic multicast locator 00125 const ACE_INET_Addr& mc_addr = sedp_.multicast_group(); 00126 DCPS::Locator_t mc_locator; 00127 mc_locator.kind = address_to_kind(mc_addr); 00128 mc_locator.port = mc_addr.get_port_number(); 00129 address_to_bytes(mc_locator.address, mc_addr); 00130 sedp_multicast_.length(1); 00131 sedp_multicast_[0] = mc_locator; 00132 } 00133 }
void OpenDDS::RTPS::Spdp::init_bit | ( | const DDS::Subscriber_var & | bit_subscriber | ) |
Definition at line 1102 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, OpenDDS::RTPS::Spdp::SpdpTransport::open(), and tport_.
01103 { 01104 bit_subscriber_ = bit_subscriber; 01105 tport_->open(); 01106 }
bool OpenDDS::RTPS::Spdp::is_opendds | ( | const GUID_t & | participant | ) | const |
Definition at line 1147 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, and OpenDDS::DCPS::VENDORID_OCI.
Referenced by OpenDDS::RTPS::Sedp::is_opendds().
01148 { 01149 const DiscoveredParticipantConstIter iter = participants_.find(participant); 01150 if (iter == participants_.end()) { 01151 return false; 01152 } 01153 return 0 == std::memcmp(&iter->second.pdata_.participantProxy.vendorId, 01154 DCPS::VENDORID_OCI, sizeof(VendorId_t)); 01155 }
void OpenDDS::RTPS::Spdp::match_unauthenticated | ( | const DCPS::RepoId & | guid, | |
DiscoveredParticipant & | dp | |||
) | [private] |
Definition at line 502 of file Spdp.cpp.
References OpenDDS::RTPS::Sedp::associate(), DDS::HANDLE_NIL, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, DDS::NEW_VIEW_STATE, part_bit(), OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, sedp_, and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data().
Referenced by handle_participant_data().
00503 { 00504 // Must unlock when calling into part_bit() as it may call back into us 00505 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00506 00507 DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL; 00508 #ifndef DDS_HAS_MINIMUM_BIT 00509 DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit(); 00510 if (bit) { 00511 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 00512 bit_instance_handle = 00513 bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base, 00514 DDS::NEW_VIEW_STATE); 00515 } 00516 #endif /* DDS_HAS_MINIMUM_BIT */ 00517 00518 // notify Sedp of association 00519 // Sedp may call has_discovered_participant, which is why the participant must be added before this call to associate. 00520 sedp_.associate(dp.pdata_); 00521 00522 // Iterator is no longer valid 00523 DiscoveredParticipantIter iter = participants_.find(guid); 00524 if (iter != participants_.end()) { 00525 iter->second.bit_ih_ = bit_instance_handle; 00526 } 00527 }
DCPS::ParticipantBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Spdp::part_bit | ( | ) | [private] |
Reimplemented from OpenDDS::DCPS::LocalParticipant< Sedp >.
Definition at line 1123 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, and OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC.
Referenced by handle_participant_data(), and match_unauthenticated().
01124 { 01125 if (!bit_subscriber_.in()) 01126 return 0; 01127 01128 DDS::DataReader_var d = 01129 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC); 01130 return dynamic_cast<DCPS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in()); 01131 }
ACE_Reactor * OpenDDS::RTPS::Spdp::reactor | ( | void | ) | const [private] |
Definition at line 1135 of file Spdp.cpp.
References disco_, and OpenDDS::DCPS::PeerDiscovery< Participant >::reactor().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), and OpenDDS::RTPS::Spdp::SpdpTransport::open().
01136 { 01137 return disco_->reactor(); 01138 }
void OpenDDS::RTPS::Spdp::remove_expired_participants | ( | ) | [private] |
Definition at line 1071 of file Spdp.cpp.
References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, get_discovered_participant_ids(), ACE_OS::gettimeofday(), LM_WARNING, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, OPENDDS_STRING, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, and OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout().
01072 { 01073 // Find and remove any expired discovered participant 01074 ACE_GUARD (ACE_Thread_Mutex, g, lock_); 01075 // Iterate through a copy of the repo Ids, rather than the map 01076 // as it gets unlocked in remove_discovered_participant() 01077 DCPS::RepoIdSet participant_ids; 01078 get_discovered_participant_ids(participant_ids); 01079 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin(); 01080 participant_id != participant_ids.end(); 01081 ++participant_id) 01082 { 01083 DiscoveredParticipantIter part = participants_.find(*participant_id); 01084 if (part != participants_.end()) { 01085 if (part->second.last_seen_ < 01086 ACE_OS::gettimeofday() - 01087 ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) { 01088 if (DCPS::DCPS_debug_level > 1) { 01089 DCPS::GuidConverter conv(part->first); 01090 ACE_DEBUG((LM_WARNING, 01091 ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ") 01092 ACE_TEXT("participant %C exceeded lease duration, removing\n"), 01093 OPENDDS_STRING(conv).c_str())); 01094 } 01095 remove_discovered_participant(part); 01096 } 01097 } 01098 } 01099 }
bool OpenDDS::RTPS::Spdp::shutting_down | ( | void | ) | [inline] |
Definition at line 80 of file Spdp.h.
Referenced by OpenDDS::RTPS::Sedp::data_received(), OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::remove_entities_belonging_to(), OpenDDS::RTPS::Sedp::shutting_down(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00080 { return shutdown_flag_.value(); }
void OpenDDS::RTPS::Spdp::signal_liveliness | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 1690 of file Spdp.cpp.
References sedp_, and OpenDDS::RTPS::Sedp::signal_liveliness().
01691 { 01692 sedp_.signal_liveliness(kind); 01693 }
WaitForAcks & OpenDDS::RTPS::Spdp::wait_for_acks | ( | ) |
Definition at line 1141 of file Spdp.cpp.
References wait_for_acks_.
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception(), and OpenDDS::RTPS::Sedp::Task::svc().
01142 { 01143 return wait_for_acks_; 01144 }
RtpsDiscovery* OpenDDS::RTPS::Spdp::disco_ [private] |
Definition at line 132 of file Spdp.h.
Referenced by build_local_pdata(), OpenDDS::RTPS::Spdp::SpdpTransport::open(), OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket(), reactor(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().
const DDS::DomainId_t OpenDDS::RTPS::Spdp::domain_ [private] |
Definition at line 135 of file Spdp.h.
Referenced by init(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().
bool OpenDDS::RTPS::Spdp::eh_shutdown_ [private] |
Definition at line 182 of file Spdp.h.
Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().
DCPS::RepoId OpenDDS::RTPS::Spdp::guid_ [private] |
Definition at line 136 of file Spdp.h.
Referenced by build_local_pdata(), OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister(), handle_participant_data(), init(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().
Sedp OpenDDS::RTPS::Spdp::sedp_ [private] |
Definition at line 189 of file Spdp.h.
Referenced by fini_bit(), handle_participant_data(), init(), match_unauthenticated(), signal_liveliness(), and ~Spdp().
Definition at line 137 of file Spdp.h.
Referenced by build_local_pdata(), and init().
Definition at line 137 of file Spdp.h.
Referenced by build_local_pdata(), and init().
Definition at line 183 of file Spdp.h.
Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().
ACE_Atomic_Op<ACE_Thread_Mutex, bool> OpenDDS::RTPS::Spdp::shutdown_flag_ [private] |
Definition at line 184 of file Spdp.h.
Referenced by data_received(), and ~Spdp().
Referenced by fini_bit(), handle_participant_data(), init_bit(), and ~Spdp().
Definition at line 192 of file Spdp.h.
Referenced by fini_bit(), and wait_for_acks().