OpenDDS::RTPS::Spdp Class Reference

#include <Spdp.h>

Inheritance diagram for OpenDDS::RTPS::Spdp:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp:
Collaboration graph
[legend]

List of all members.

Classes

struct  SpdpTransport

Public Member Functions

 Spdp (DDS::DomainId_t domain, DCPS::RepoId &guid, const DDS::DomainParticipantQos &qos, RtpsDiscovery *disco)
 ~Spdp ()
const DCPS::RepoIdguid () const
void init_bit (const DDS::Subscriber_var &bit_subscriber)
void fini_bit ()
bool get_default_locators (const DCPS::RepoId &part_id, DCPS::LocatorSeq &target, bool &inlineQos)
void signal_liveliness (DDS::LivelinessQosPolicyKind kind)
bool shutting_down ()
bool associated () const
bool has_discovered_participant (const DCPS::RepoId &guid)
WaitForAckswait_for_acks ()
void handle_participant_data (DCPS::MessageId id, const Security::SPDPdiscoveredParticipantData &pdata)
bool is_opendds (const GUID_t &participant) const

Protected Member Functions

Sedpendpoint_manager ()
Security::SPDPdiscoveredParticipantData build_local_pdata (Security::DiscoveredParticipantDataKind)
bool announce_domain_participant_qos ()

Private Member Functions

void init (DDS::DomainId_t domain, DCPS::RepoId &guid, const DDS::DomainParticipantQos &qos, RtpsDiscovery *disco)
ACE_Reactorreactor () const
void data_received (const DataSubmessage &data, const ParameterList &plist)
void match_unauthenticated (const DCPS::RepoId &guid, DiscoveredParticipant &dp)
DCPS::ParticipantBuiltinTopicDataDataReaderImplpart_bit ()
void remove_expired_participants ()
void get_discovered_participant_ids (DCPS::RepoIdSet &results) const

Private Attributes

RtpsDiscoverydisco_
const DDS::DomainId_t domain_
DCPS::RepoId guid_
DCPS::LocatorSeq sedp_unicast_
DCPS::LocatorSeq sedp_multicast_
OpenDDS::RTPS::Spdp::SpdpTransporttport_
ACE_Event_Handler_var eh_
bool eh_shutdown_
ACE_Condition_Thread_Mutex shutdown_cond_
ACE_Atomic_Op
< ACE_Thread_Mutex, bool > 
shutdown_flag_
Sedp sedp_
WaitForAcks wait_for_acks_

Detailed Description

Each instance of class Spdp represents the implementation of the RTPS Simple Participant Discovery Protocol for a single local DomainParticipant.

Definition at line 47 of file Spdp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Spdp::Spdp ( DDS::DomainId_t  domain,
DCPS::RepoId guid,
const DDS::DomainParticipantQos qos,
RtpsDiscovery disco 
)
OpenDDS::RTPS::Spdp::~Spdp (  ) 

Definition at line 251 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), OpenDDS::DCPS::DCPS_debug_level, eh_, eh_shutdown_, get_discovered_participant_ids(), LM_INFO, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), ACE_Event_Handler_var::reset(), sedp_, OpenDDS::RTPS::Sedp::shutdown(), shutdown_cond_, shutdown_flag_, tport_, and ACE_Condition< ACE_Thread_Mutex >::wait().

00252 {
00253   shutdown_flag_ = true;
00254   {
00255     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00256     if (DCPS::DCPS_debug_level > 3) {
00257       ACE_DEBUG((LM_INFO,
00258                  ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00259                  ACE_TEXT("remove discovered participants\n")));
00260     }
00261 
00262 #if defined(OPENDDS_SECURITY)
00263     write_secure_disposes();
00264 #endif
00265 
00266     // Iterate through a copy of the repo Ids, rather than the map
00267     //   as it gets unlocked in remove_discovered_participant()
00268     DCPS::RepoIdSet participant_ids;
00269     get_discovered_participant_ids(participant_ids);
00270     for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00271          participant_id != participant_ids.end();
00272          ++participant_id)
00273     {
00274       DiscoveredParticipantIter part = participants_.find(*participant_id);
00275       if (part != participants_.end()) {
00276         remove_discovered_participant(part);
00277       }
00278     }
00279   }
00280 
00281   // ensure sedp's task queue is drained before data members are being
00282   // deleted
00283   sedp_.shutdown();
00284 
00285   // release lock for reset of event handler, which may delete transport
00286   tport_->close();
00287   eh_.reset();
00288   {
00289     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00290     while (!eh_shutdown_) {
00291       shutdown_cond_.wait();
00292     }
00293   }
00294 }

Here is the call graph for this function:


Member Function Documentation

bool OpenDDS::RTPS::Spdp::announce_domain_participant_qos (  )  [protected, virtual]

Reimplemented from OpenDDS::DCPS::LocalParticipant< Sedp >.

Definition at line 1265 of file Spdp.cpp.

01266 {
01267 
01268 #if defined(OPENDDS_SECURITY)
01269   if (is_security_enabled())
01270     write_secure_updates();
01271 #endif
01272 
01273   return true;
01274 }

bool OpenDDS::RTPS::Spdp::associated (  )  const

Definition at line 1778 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.

01779 {
01780   return !participants_.empty();
01781 }

Security::SPDPdiscoveredParticipantData OpenDDS::RTPS::Spdp::build_local_pdata ( Security::DiscoveredParticipantDataKind  kind  )  [protected]

Definition at line 1158 of file Spdp.cpp.

References OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER, DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER, DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER, DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER, DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER, DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, disco_, guid_, OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::RTPS::LOCATOR_KIND_UDPv4, DDS::DomainParticipantQos::property, OpenDDS::RTPS::PROTOCOLVERSION, OpenDDS::DCPS::LocalParticipant< Sedp >::qos_, OpenDDS::RTPS::RtpsDiscovery::resend_period(), OpenDDS::RTPS::security_attributes_to_bitmask(), DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER, DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER, DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER, DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER, sedp_multicast_, sedp_unicast_, DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER, DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER, DDS::DomainParticipantQos::user_data, and OpenDDS::RTPS::VENDORID_OPENDDS.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

01159 {
01160   BuiltinEndpointSet_t availableBuiltinEndpoints =
01161     DISC_BUILTIN_ENDPOINT_PARTICIPANT_ANNOUNCER |
01162     DISC_BUILTIN_ENDPOINT_PARTICIPANT_DETECTOR |
01163     DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER |
01164     DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR |
01165     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER |
01166     DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR |
01167     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER |
01168     BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER
01169     ;
01170 
01171 #if defined(OPENDDS_SECURITY)
01172   if (is_security_enabled()) {
01173     availableBuiltinEndpoints |=
01174       DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_WRITER |
01175       DDS::Security::SEDP_BUILTIN_PUBLICATIONS_SECURE_READER |
01176       DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_WRITER |
01177       DDS::Security::SEDP_BUILTIN_SUBSCRIPTIONS_SECURE_READER |
01178       DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_WRITER |
01179       DDS::Security::BUILTIN_PARTICIPANT_MESSAGE_SECURE_READER |
01180       DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_WRITER |
01181       DDS::Security::BUILTIN_PARTICIPANT_STATELESS_MESSAGE_READER |
01182       DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_WRITER |
01183       DDS::Security::BUILTIN_PARTICIPANT_VOLATILE_MESSAGE_SECURE_READER |
01184       DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_WRITER |
01185       DDS::Security::SPDP_BUILTIN_PARTICIPANT_SECURE_READER
01186     ;
01187   }
01188 #endif
01189 
01190   // The RTPS spec has no constants for the builtinTopics{Writer,Reader}
01191 
01192   // This locator list should not be empty, but we won't actually be using it.
01193   // The OpenDDS publication/subscription data will have locators included.
01194   DCPS::LocatorSeq nonEmptyList(1);
01195   nonEmptyList.length(1);
01196   nonEmptyList[0].kind = LOCATOR_KIND_UDPv4;
01197   nonEmptyList[0].port = 12345;
01198   std::memset(nonEmptyList[0].address, 0, 12);
01199   nonEmptyList[0].address[12] = 127;
01200   nonEmptyList[0].address[13] = 0;
01201   nonEmptyList[0].address[14] = 0;
01202   nonEmptyList[0].address[15] = 1;
01203 
01204   const GuidPrefix_t& gp = guid_.guidPrefix;
01205 
01206   const Security::SPDPdiscoveredParticipantData pdata = {
01207     kind,
01208     { // ParticipantBuiltinTopicDataSecure
01209       { // ParticipantBuiltinTopicData (security enhanced)
01210         { // ParticipantBuiltinTopicData (original)
01211           DDS::BuiltinTopicKey_t() /*ignored*/,
01212           qos_.user_data
01213         },
01214 
01215 #if defined(OPENDDS_SECURITY)
01216         identity_token_,
01217         permissions_token_,
01218 #else
01219         DDS::Security::Token(),
01220         DDS::Security::Token(),
01221 #endif
01222 
01223         qos_.property,
01224 
01225 #if defined(OPENDDS_SECURITY)
01226         {
01227           security_attributes_to_bitmask(participant_sec_attr_),
01228           participant_sec_attr_.plugin_participant_attributes
01229         }
01230 #else
01231         DDS::Security::ParticipantSecurityInfo()
01232 #endif
01233 
01234       },
01235 
01236 #if defined(OPENDDS_SECURITY)
01237       identity_status_token_
01238 #else
01239       DDS::Security::Token()
01240 #endif
01241 
01242     },
01243     { // ParticipantProxy_t
01244       PROTOCOLVERSION,
01245       {gp[0], gp[1], gp[2], gp[3], gp[4], gp[5],
01246        gp[6], gp[7], gp[8], gp[9], gp[10], gp[11]},
01247       VENDORID_OPENDDS,
01248       false /*expectsIQoS*/,
01249       availableBuiltinEndpoints,
01250       sedp_unicast_,
01251       sedp_multicast_,
01252       nonEmptyList /*defaultMulticastLocatorList*/,
01253       nonEmptyList /*defaultUnicastLocatorList*/,
01254       {0 /*manualLivelinessCount*/}   //FUTURE: implement manual liveliness
01255     },
01256     { // Duration_t (leaseDuration)
01257       static_cast<CORBA::Long>((disco_->resend_period() * LEASE_MULT).sec()),
01258       0 // we are not supporting fractional seconds in the lease duration
01259     }
01260   };
01261 
01262   return pdata;
01263 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::data_received ( const DataSubmessage data,
const ParameterList plist 
) [private]

Definition at line 484 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::RTPS::ParameterListConverter::from_param_list(), handle_participant_data(), OpenDDS::RTPS::DataSubmessage::inlineQos, LM_ERROR, OpenDDS::DCPS::SAMPLE_DATA, shutdown_flag_, and ACE_Atomic_Op_GCC< T >::value().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_input().

00485 {
00486   if (shutdown_flag_.value()) { return; }
00487 
00488   Security::SPDPdiscoveredParticipantData pdata;
00489   if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00490     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00491       ACE_TEXT("failed to convert from ParameterList to ")
00492       ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00493     return;
00494   }
00495 
00496   DCPS::MessageId msg_id = (data.inlineQos.length() && disposed(data.inlineQos)) ? DCPS::DISPOSE_INSTANCE : DCPS::SAMPLE_DATA;
00497 
00498   handle_participant_data(msg_id, pdata);
00499 }

Here is the call graph for this function:

Here is the caller graph for this function:

Sedp& OpenDDS::RTPS::Spdp::endpoint_manager (  )  [inline, protected, virtual]

Implements OpenDDS::DCPS::LocalParticipant< Sedp >.

Definition at line 119 of file Spdp.h.

00119 { return sedp_; }

void OpenDDS::RTPS::Spdp::fini_bit (  ) 

Definition at line 1109 of file Spdp.cpp.

References OpenDDS::RTPS::Sedp::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, OpenDDS::RTPS::WaitForAcks::reset(), sedp_, tport_, OpenDDS::RTPS::WaitForAcks::wait_for_acks(), and wait_for_acks_.

01110 {
01111   bit_subscriber_ = 0;
01112   wait_for_acks_.reset();
01113   // request for SpdpTransport(actually Reactor) thread and Sedp::Task
01114   // to acknowledge
01115   tport_->acknowledge();
01116   sedp_.acknowledge();
01117   // wait for the 2 acknowledgements
01118   wait_for_acks_.wait_for_acks(2);
01119 }

Here is the call graph for this function:

bool OpenDDS::RTPS::Spdp::get_default_locators ( const DCPS::RepoId part_id,
DCPS::LocatorSeq target,
bool &  inlineQos 
)
void OpenDDS::RTPS::Spdp::get_discovered_participant_ids ( DCPS::RepoIdSet &  results  )  const [private]

Definition at line 1791 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.

Referenced by remove_expired_participants(), and ~Spdp().

01792 {
01793   DiscoveredParticipantMap::const_iterator idx;
01794   for (idx = participants_.begin(); idx != participants_.end(); ++idx)
01795   {
01796     results.insert(idx->first);
01797   }
01798 }

Here is the caller graph for this function:

const DCPS::RepoId& OpenDDS::RTPS::Spdp::guid (  )  const [inline]

Definition at line 68 of file Spdp.h.

00068 { return guid_; }

void OpenDDS::RTPS::Spdp::handle_participant_data ( DCPS::MessageId  id,
const Security::SPDPdiscoveredParticipantData pdata 
)

Definition at line 322 of file Spdp.cpp.

References ACE_TEXT(), DDS::Security::ParticipantBuiltinTopicDataSecure::base, OpenDDS::Security::SPDPdiscoveredParticipantData::dataKind, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::Security::SPDPdiscoveredParticipantData::ddsParticipantDataSecure, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::Security::DPDK_ENHANCED, OpenDDS::Security::DPDK_SECURE, OpenDDS::DCPS::ENTITYID_PARTICIPANT, ACE_OS::gettimeofday(), guid_, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::Security::SPDPdiscoveredParticipantData::leaseDuration, LM_DEBUG, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, match_unauthenticated(), DDS::NOT_NEW_VIEW_STATE, OPENDDS_STRING, part_bit(), OpenDDS::Security::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant(), OpenDDS::RTPS::Time_t::seconds, sedp_, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data(), tport_, and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_secure_i().

00323 {
00324   const ACE_Time_Value now = ACE_OS::gettimeofday();
00325 
00326   // Make a (non-const) copy so we can tweak values below
00327   Security::SPDPdiscoveredParticipantData pdata(cpdata);
00328 
00329   const DCPS::RepoId guid = make_guid(pdata.participantProxy.guidPrefix, DCPS::ENTITYID_PARTICIPANT);
00330 
00331   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00332   if (sedp_.ignoring(guid)) {
00333     // Ignore, this is our domain participant or one that the user has
00334     // asked us to ignore.
00335     return;
00336   }
00337 
00338   // Find the participant - iterator valid only as long as we hold the lock
00339   DiscoveredParticipantIter iter = participants_.find(guid);
00340 
00341   if (iter == participants_.end()) {
00342 
00343     // Trying to delete something that doesn't exist is a NOOP
00344     if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00345       return;
00346     }
00347 
00348     // copy guid prefix (octet[12]) into BIT key (long[3])
00349     std::memcpy(pdata.ddsParticipantDataSecure.base.base.key.value,
00350                 pdata.participantProxy.guidPrefix,
00351                 sizeof(pdata.ddsParticipantDataSecure.base.base.key.value));
00352 
00353     if (DCPS::DCPS_debug_level) {
00354       DCPS::GuidConverter local(guid_), remote(guid);
00355       ACE_DEBUG((LM_DEBUG,
00356         ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00357         OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00358         pdata.leaseDuration.seconds));
00359     }
00360 
00361     // add a new participant
00362     participants_[guid] = DiscoveredParticipant(pdata, now);
00363     DiscoveredParticipant& dp = participants_[guid];
00364 
00365 #if defined(OPENDDS_SECURITY)
00366     if (is_security_enabled()) {
00367       // Associate the stateless reader / writer for handshakes & auth requests
00368       sedp_.associate_preauth(dp.pdata_);
00369 
00370       // If we've gotten auth requests for this (previously undiscovered) participant, pull in the tokens now
00371       PendingRemoteAuthTokenMap::iterator token_iter = pending_remote_auth_tokens_.find(guid);
00372       if (token_iter != pending_remote_auth_tokens_.end()) {
00373         dp.remote_auth_request_token_ = token_iter->second;
00374         pending_remote_auth_tokens_.erase(token_iter);
00375       }
00376     }
00377 #endif
00378 
00379     // Since we've just seen a new participant, let's send out our
00380     // own announcement, so they don't have to wait.
00381     this->tport_->write_i();
00382 
00383 #if defined(OPENDDS_SECURITY)
00384     if (is_security_enabled()) {
00385       bool has_security_data = dp.pdata_.dataKind == Security::DPDK_ENHANCED ||
00386                                dp.pdata_.dataKind == Security::DPDK_SECURE;
00387 
00388       if (has_security_data == false) {
00389         if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00390           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00391             ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00392             std::string(DCPS::GuidConverter(guid)).c_str()));
00393             participants_.erase(guid);
00394         } else { // allow_unauthenticated_participants == true
00395           dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00396           match_unauthenticated(guid, dp);
00397         }
00398       } else { // has_security_data == true
00399         dp.identity_token_ = pdata.ddsParticipantDataSecure.base.identity_token;
00400         dp.permissions_token_ = pdata.ddsParticipantDataSecure.base.permissions_token;
00401         dp.property_qos_ = pdata.ddsParticipantDataSecure.base.property;
00402         dp.security_info_ = pdata.ddsParticipantDataSecure.base.security_info;
00403 
00404         attempt_authentication(guid, dp);
00405         if (dp.auth_state_ == DCPS::AS_UNAUTHENTICATED) {
00406           if (participant_sec_attr_.allow_unauthenticated_participants == false) {
00407             ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DEBUG: Spdp::data_received - ")
00408               ACE_TEXT("Incompatible security attributes in discovered participant: %C\n"),
00409               std::string(DCPS::GuidConverter(guid)).c_str()));
00410             participants_.erase(guid);
00411           } else { // allow_unauthenticated_participants == true
00412             dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00413             match_unauthenticated(guid, dp);
00414           }
00415         } else if (dp.auth_state_ == DCPS::AS_AUTHENTICATED) {
00416           if (match_authenticated(guid, dp) == false) {
00417             participants_.erase(guid);
00418           }
00419         }
00420         // otherwise just return, since we're waiting for input to finish authentication
00421       }
00422     } else {
00423 
00424       dp.auth_state_ = DCPS::AS_UNAUTHENTICATED;
00425       match_unauthenticated(guid, dp);
00426 
00427     }
00428 #else
00429     match_unauthenticated(guid, dp);
00430 #endif
00431 
00432   } else {
00433 
00434 #if defined(OPENDDS_SECURITY)
00435     // Non-secure updates for authenticated participants are used for liveliness but
00436     // are otherwise ignored. Non-secure dispose messages are ignored completely.
00437     if (iter->second.auth_state_ == DCPS::AS_AUTHENTICATED &&
00438         pdata.dataKind != Security::DPDK_SECURE &&
00439         id != DCPS::DISPOSE_INSTANCE &&
00440         id != DCPS::DISPOSE_UNREGISTER_INSTANCE)
00441     {
00442       iter->second.last_seen_ = now;
00443       return;
00444     }
00445 #endif
00446 
00447     if (id == DCPS::DISPOSE_INSTANCE || id == DCPS::DISPOSE_UNREGISTER_INSTANCE) {
00448       remove_discovered_participant(iter);
00449       return;
00450     }
00451 
00452     // Must unlock when calling into part_bit() as it may call back into us
00453     ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00454 
00455     // update an existing participant
00456     pdata.ddsParticipantDataSecure.base.base.key = iter->second.pdata_.ddsParticipantDataSecure.base.base.key;
00457 #ifndef OPENDDS_SAFETY_PROFILE
00458     using DCPS::operator!=;
00459 #endif
00460     if (iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data !=
00461         pdata.ddsParticipantDataSecure.base.base.user_data) {
00462       iter->second.pdata_.ddsParticipantDataSecure.base.base.user_data =
00463         pdata.ddsParticipantDataSecure.base.base.user_data;
00464 #ifndef DDS_HAS_MINIMUM_BIT
00465       DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00466       if (bit) {
00467         ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00468         bit->store_synthetic_data(pdata.ddsParticipantDataSecure.base.base,
00469                                   DDS::NOT_NEW_VIEW_STATE);
00470       }
00471 #endif /* DDS_HAS_MINIMUM_BIT */
00472       // Perform search again, so iterator becomes valid
00473       iter = participants_.find(guid);
00474     }
00475     // Participant may have been removed while lock released
00476     if (iter != participants_.end()) {
00477       iter->second.pdata_ = pdata;
00478       iter->second.last_seen_ = now;
00479     }
00480   }
00481 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::RTPS::Spdp::has_discovered_participant ( const DCPS::RepoId guid  ) 

Definition at line 1784 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_.

Referenced by OpenDDS::RTPS::Sedp::data_received(), and OpenDDS::RTPS::Sedp::disassociate().

01785 {
01786   return participants_.find(guid) != participants_.end();
01787 }

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::init ( DDS::DomainId_t  domain,
DCPS::RepoId guid,
const DDS::DomainParticipantQos qos,
RtpsDiscovery disco 
) [private]

Definition at line 112 of file Spdp.cpp.

References OpenDDS::DCPS::Locator_t::address, OpenDDS::RTPS::address_to_bytes(), OpenDDS::RTPS::address_to_kind(), domain_, ACE_INET_Addr::get_port_number(), guid_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignore(), OpenDDS::RTPS::Sedp::init(), OpenDDS::DCPS::Locator_t::kind, OpenDDS::RTPS::Sedp::multicast_group(), sedp_, OpenDDS::RTPS::RtpsDiscovery::sedp_multicast(), sedp_multicast_, sedp_unicast_, and OpenDDS::RTPS::Sedp::unicast_locators().

00116 {
00117   guid = guid_; // may have changed in SpdpTransport constructor
00118   sedp_.ignore(guid);
00119   sedp_.init(guid_, *disco, domain_);
00120 
00121   // Append metatraffic unicast locator
00122   sedp_.unicast_locators(sedp_unicast_);
00123 
00124   if (disco->sedp_multicast()) { // Append metatraffic multicast locator
00125     const ACE_INET_Addr& mc_addr = sedp_.multicast_group();
00126     DCPS::Locator_t mc_locator;
00127     mc_locator.kind = address_to_kind(mc_addr);
00128     mc_locator.port = mc_addr.get_port_number();
00129     address_to_bytes(mc_locator.address, mc_addr);
00130     sedp_multicast_.length(1);
00131     sedp_multicast_[0] = mc_locator;
00132   }
00133 }

Here is the call graph for this function:

void OpenDDS::RTPS::Spdp::init_bit ( const DDS::Subscriber_var &  bit_subscriber  ) 

Definition at line 1102 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, OpenDDS::RTPS::Spdp::SpdpTransport::open(), and tport_.

01103 {
01104   bit_subscriber_ = bit_subscriber;
01105   tport_->open();
01106 }

Here is the call graph for this function:

bool OpenDDS::RTPS::Spdp::is_opendds ( const GUID_t &  participant  )  const

Definition at line 1147 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, and OpenDDS::DCPS::VENDORID_OCI.

Referenced by OpenDDS::RTPS::Sedp::is_opendds().

01148 {
01149   const DiscoveredParticipantConstIter iter = participants_.find(participant);
01150   if (iter == participants_.end()) {
01151     return false;
01152   }
01153   return 0 == std::memcmp(&iter->second.pdata_.participantProxy.vendorId,
01154                           DCPS::VENDORID_OCI, sizeof(VendorId_t));
01155 }

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::match_unauthenticated ( const DCPS::RepoId guid,
DiscoveredParticipant &  dp 
) [private]

Definition at line 502 of file Spdp.cpp.

References OpenDDS::RTPS::Sedp::associate(), DDS::HANDLE_NIL, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, DDS::NEW_VIEW_STATE, part_bit(), OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, sedp_, and OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data().

Referenced by handle_participant_data().

00503 {
00504   // Must unlock when calling into part_bit() as it may call back into us
00505   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00506 
00507   DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00508 #ifndef DDS_HAS_MINIMUM_BIT
00509   DCPS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00510   if (bit) {
00511     ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00512     bit_instance_handle =
00513       bit->store_synthetic_data(dp.pdata_.ddsParticipantDataSecure.base.base,
00514                                 DDS::NEW_VIEW_STATE);
00515   }
00516 #endif /* DDS_HAS_MINIMUM_BIT */
00517 
00518   // notify Sedp of association
00519   // Sedp may call has_discovered_participant, which is why the participant must be added before this call to associate.
00520   sedp_.associate(dp.pdata_);
00521 
00522   // Iterator is no longer valid
00523   DiscoveredParticipantIter iter = participants_.find(guid);
00524   if (iter != participants_.end()) {
00525     iter->second.bit_ih_ = bit_instance_handle;
00526   }
00527 }

Here is the call graph for this function:

Here is the caller graph for this function:

DCPS::ParticipantBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Spdp::part_bit (  )  [private]

Reimplemented from OpenDDS::DCPS::LocalParticipant< Sedp >.

Definition at line 1123 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< Sedp >::bit_subscriber_, and OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC.

Referenced by handle_participant_data(), and match_unauthenticated().

01124 {
01125   if (!bit_subscriber_.in())
01126     return 0;
01127 
01128   DDS::DataReader_var d =
01129     bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01130   return dynamic_cast<DCPS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01131 }

Here is the caller graph for this function:

ACE_Reactor * OpenDDS::RTPS::Spdp::reactor ( void   )  const [private]

Definition at line 1135 of file Spdp.cpp.

References disco_, and OpenDDS::DCPS::PeerDiscovery< Participant >::reactor().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), and OpenDDS::RTPS::Spdp::SpdpTransport::open().

01136 {
01137   return disco_->reactor();
01138 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::RTPS::Spdp::remove_expired_participants (  )  [private]

Definition at line 1071 of file Spdp.cpp.

References ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, get_discovered_participant_ids(), ACE_OS::gettimeofday(), LM_WARNING, OpenDDS::DCPS::LocalParticipant< Sedp >::lock_, OPENDDS_STRING, OpenDDS::DCPS::LocalParticipant< Sedp >::participants_, and OpenDDS::DCPS::LocalParticipant< Sedp >::remove_discovered_participant().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout().

01072 {
01073   // Find and remove any expired discovered participant
01074   ACE_GUARD (ACE_Thread_Mutex, g, lock_);
01075   // Iterate through a copy of the repo Ids, rather than the map
01076   //   as it gets unlocked in remove_discovered_participant()
01077   DCPS::RepoIdSet participant_ids;
01078   get_discovered_participant_ids(participant_ids);
01079   for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
01080        participant_id != participant_ids.end();
01081        ++participant_id)
01082   {
01083     DiscoveredParticipantIter part = participants_.find(*participant_id);
01084     if (part != participants_.end()) {
01085       if (part->second.last_seen_ <
01086           ACE_OS::gettimeofday() -
01087           ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
01088         if (DCPS::DCPS_debug_level > 1) {
01089           DCPS::GuidConverter conv(part->first);
01090           ACE_DEBUG((LM_WARNING,
01091             ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
01092             ACE_TEXT("participant %C exceeded lease duration, removing\n"),
01093             OPENDDS_STRING(conv).c_str()));
01094         }
01095         remove_discovered_participant(part);
01096       }
01097     }
01098   }
01099 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::RTPS::Spdp::shutting_down ( void   )  [inline]
void OpenDDS::RTPS::Spdp::signal_liveliness ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 1690 of file Spdp.cpp.

References sedp_, and OpenDDS::RTPS::Sedp::signal_liveliness().

01691 {
01692   sedp_.signal_liveliness(kind);
01693 }

Here is the call graph for this function:

WaitForAcks & OpenDDS::RTPS::Spdp::wait_for_acks (  ) 

Definition at line 1141 of file Spdp.cpp.

References wait_for_acks_.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception(), and OpenDDS::RTPS::Sedp::Task::svc().

01142 {
01143   return wait_for_acks_;
01144 }

Here is the caller graph for this function:


Member Data Documentation

Definition at line 135 of file Spdp.h.

Referenced by init(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().

Definition at line 181 of file Spdp.h.

Referenced by ~Spdp().

Definition at line 182 of file Spdp.h.

Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().

Definition at line 137 of file Spdp.h.

Referenced by build_local_pdata(), and init().

Definition at line 137 of file Spdp.h.

Referenced by build_local_pdata(), and init().

Definition at line 183 of file Spdp.h.

Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().

Definition at line 184 of file Spdp.h.

Referenced by data_received(), and ~Spdp().

Definition at line 192 of file Spdp.h.

Referenced by fini_bit(), and wait_for_acks().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1