OpenDDS::RTPS::Spdp Class Reference

#include <Spdp.h>

Inheritance diagram for OpenDDS::RTPS::Spdp:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::RTPS::Spdp:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 Spdp (DDS::DomainId_t domain, DCPS::RepoId &guid, const DDS::DomainParticipantQos &qos, RtpsDiscovery *disco)
 ~Spdp ()
void init_bit (const DDS::Subscriber_var &bit_subscriber)
void fini_bit ()
bool get_default_locators (const DCPS::RepoId &part_id, OpenDDS::DCPS::LocatorSeq &target, bool &inlineQos)
void signal_liveliness (DDS::LivelinessQosPolicyKind kind)
bool shutting_down ()
bool associated () const
bool has_discovered_participant (const DCPS::RepoId &guid)
WaitForAckswait_for_acks ()

Protected Member Functions

Sedpendpoint_manager ()

Private Member Functions

ACE_Reactor * reactor () const
void data_received (const DataSubmessage &data, const ParameterList &plist)
DDS::ParticipantBuiltinTopicDataDataReaderImpl * part_bit ()
void remove_expired_participants ()
void get_discovered_participant_ids (DCPS::RepoIdSet &results) const

Private Attributes

RtpsDiscoverydisco_
const DDS::DomainId_t domain_
DCPS::RepoId guid_
OpenDDS::DCPS::LocatorSeq sedp_unicast_
OpenDDS::DCPS::LocatorSeq sedp_multicast_
OpenDDS::RTPS::Spdp::SpdpTransporttport_
ACE_Event_Handler_var eh_
bool eh_shutdown_
ACE_Condition_Thread_Mutex shutdown_cond_
ACE_Atomic_Op< ACE_Thread_Mutex,
long > 
shutdown_flag_
Sedp sedp_
WaitForAcks wait_for_acks_

Classes

struct  SpdpTransport

Detailed Description

Each instance of class Spdp represents the implementation of the RTPS Simple Participant Discovery Protocol for a single local DomainParticipant.

Definition at line 42 of file Spdp.h.


Constructor & Destructor Documentation

OpenDDS::RTPS::Spdp::Spdp ( DDS::DomainId_t  domain,
DCPS::RepoId guid,
const DDS::DomainParticipantQos qos,
RtpsDiscovery disco 
)

OpenDDS::RTPS::Spdp::~Spdp (  ) 

Definition at line 78 of file Spdp.cpp.

References OpenDDS::RTPS::Spdp::SpdpTransport::close(), OpenDDS::DCPS::DCPS_debug_level, eh_, eh_shutdown_, get_discovered_participant_ids(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant(), sedp_, OpenDDS::RTPS::Sedp::shutdown(), shutdown_cond_, shutdown_flag_, and tport_.

00079 {
00080   shutdown_flag_ = 1;
00081   {
00082     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00083     if (DCPS::DCPS_debug_level > 3) {
00084       ACE_DEBUG((LM_INFO,
00085                  ACE_TEXT("(%P|%t) Spdp::~Spdp ")
00086                  ACE_TEXT("remove discovered participants\n")));
00087     }
00088     // Iterate through a copy of the repo Ids, rather than the map
00089     //   as it gets unlocked in remove_discovered_participant()
00090     DCPS::RepoIdSet participant_ids;
00091     get_discovered_participant_ids(participant_ids);
00092     for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00093          participant_id != participant_ids.end();
00094          ++participant_id)
00095     {
00096       DiscoveredParticipantIter part = participants_.find(*participant_id);
00097       if (part != participants_.end()) {
00098         remove_discovered_participant(part);
00099       }
00100     }
00101   }
00102 
00103   // ensure sedp's task queue is drained before data members are being
00104   // deleted
00105   sedp_.shutdown();
00106 
00107   // release lock for reset of event handler, which may delete transport
00108   tport_->close();
00109   eh_.reset();
00110   {
00111     ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00112     while (!eh_shutdown_) {
00113       shutdown_cond_.wait();
00114     }
00115   }
00116 }


Member Function Documentation

bool OpenDDS::RTPS::Spdp::associated (  )  const

Definition at line 808 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.

00809 {
00810   return !participants_.empty();
00811 }

void OpenDDS::RTPS::Spdp::data_received ( const DataSubmessage data,
const ParameterList plist 
) [private]

Definition at line 119 of file Spdp.cpp.

References OpenDDS::RTPS::Sedp::associate(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::SPDPdiscoveredParticipantData::ddsParticipantData, OpenDDS::RTPS::disposed(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::RTPS::ParameterListConverter::from_param_list(), guid_, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::DCPS::GUID_t::guidPrefix, DDS::HANDLE_NIL, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::RTPS::DataSubmessage::inlineQos, DDS::ParticipantBuiltinTopicData::key, OpenDDS::RTPS::SPDPdiscoveredParticipantData::leaseDuration, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OPENDDS_STRING, part_bit(), OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant(), OpenDDS::RTPS::Time_t::seconds, sedp_, shutdown_flag_, tport_, DDS::ParticipantBuiltinTopicData::user_data, and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_input().

00120 {
00121   if (shutdown_flag_.value()) { return; }
00122 
00123   const ACE_Time_Value time = ACE_OS::gettimeofday();
00124   SPDPdiscoveredParticipantData pdata;
00125   if (ParameterListConverter::from_param_list(plist, pdata) < 0) {
00126     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ")
00127       ACE_TEXT("failed to convert from ParameterList to ")
00128       ACE_TEXT("SPDPdiscoveredParticipantData\n")));
00129     return;
00130   }
00131 
00132   DCPS::RepoId guid;
00133   std::memcpy(guid.guidPrefix, pdata.participantProxy.guidPrefix,
00134               sizeof(guid.guidPrefix));
00135   guid.entityId = OpenDDS::DCPS::ENTITYID_PARTICIPANT;
00136 
00137   ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00138   if (sedp_.ignoring(guid)) {
00139     // Ignore, this is our domain participant or one that the user has
00140     // asked us to ignore.
00141     return;
00142   }
00143 
00144   // Find the participant - iterator valid only as long as we hold the lock
00145   DiscoveredParticipantIter iter = participants_.find(guid);
00146 
00147   // Must unlock when calling into part_bit() as it may call back into us
00148   ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00149 
00150   if (iter == participants_.end()) {
00151     // copy guid prefix (octet[12]) into BIT key (long[3])
00152     std::memcpy(pdata.ddsParticipantData.key.value,
00153                 pdata.participantProxy.guidPrefix,
00154                 sizeof(pdata.ddsParticipantData.key.value));
00155 
00156     if (DCPS::DCPS_debug_level) {
00157       DCPS::GuidConverter local(guid_), remote(guid);
00158       ACE_DEBUG((LM_DEBUG,
00159         ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"),
00160         OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(),
00161         pdata.leaseDuration.seconds));
00162     }
00163 
00164     // add a new participant
00165     participants_[guid] = DiscoveredParticipant(pdata, time);
00166     DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL;
00167 #ifndef DDS_HAS_MINIMUM_BIT
00168     DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00169     if (bit) {
00170       ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00171       bit_instance_handle =
00172         bit->store_synthetic_data(pdata.ddsParticipantData,
00173                                   DDS::NEW_VIEW_STATE);
00174     }
00175 #endif /* DDS_HAS_MINIMUM_BIT */
00176 
00177     // notify Sedp of association
00178     // Sedp may call has_discovered_participant.
00179     // This is what the participant must be added before this call to associate.
00180     sedp_.associate(pdata);
00181 
00182     // Since we've just seen a new participant, let's send out our
00183     // own announcement, so they don't have to wait.
00184     this->tport_->write_i();
00185 
00186     // Iterator is no longer valid
00187     iter = participants_.find(guid);
00188     if (iter != participants_.end()) {
00189       iter->second.bit_ih_ = bit_instance_handle;
00190     }
00191 
00192   } else if (data.inlineQos.length() && disposed(data.inlineQos)) {
00193     remove_discovered_participant(iter);
00194 
00195   } else {
00196     // update an existing participant
00197     pdata.ddsParticipantData.key = iter->second.pdata_.ddsParticipantData.key;
00198 #ifndef OPENDDS_SAFETY_PROFILE
00199     using OpenDDS::DCPS::operator!=;
00200 #endif
00201     if (iter->second.pdata_.ddsParticipantData.user_data !=
00202         pdata.ddsParticipantData.user_data) {
00203       iter->second.pdata_.ddsParticipantData.user_data =
00204         pdata.ddsParticipantData.user_data;
00205 #ifndef DDS_HAS_MINIMUM_BIT
00206       DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
00207       if (bit) {
00208         ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00209         bit->store_synthetic_data(pdata.ddsParticipantData,
00210                                   DDS::NOT_NEW_VIEW_STATE);
00211       }
00212 #endif /* DDS_HAS_MINIMUM_BIT */
00213       // Perform search again, so iterator becomes valid
00214       iter = participants_.find(guid);
00215     }
00216     // Participant may have been removed while lock released
00217     if (iter != participants_.end()) {
00218       iter->second.pdata_ = pdata;
00219       iter->second.last_seen_ = time;
00220     }
00221   }
00222 }

Sedp& OpenDDS::RTPS::Spdp::endpoint_manager (  )  [inline, protected, virtual]

Implements OpenDDS::DCPS::LocalParticipant< EndpointManagerType >.

Definition at line 68 of file Spdp.h.

00068 { return sedp_; }

void OpenDDS::RTPS::Spdp::fini_bit (  ) 

Definition at line 263 of file Spdp.cpp.

References OpenDDS::RTPS::Sedp::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, OpenDDS::RTPS::WaitForAcks::reset(), sedp_, tport_, OpenDDS::RTPS::WaitForAcks::wait_for_acks(), and wait_for_acks_.

00264 {
00265   bit_subscriber_ = 0;
00266   wait_for_acks_.reset();
00267   // request for SpdpTransport(actually Reactor) thread and Sedp::Task
00268   // to acknowledge
00269   tport_->acknowledge();
00270   sedp_.acknowledge();
00271   // wait for the 2 acknowledgements
00272   wait_for_acks_.wait_for_acks(2);
00273 }

bool OpenDDS::RTPS::Spdp::get_default_locators ( const DCPS::RepoId part_id,
OpenDDS::DCPS::LocatorSeq target,
bool &  inlineQos 
)

Referenced by OpenDDS::RTPS::Sedp::populate_transport_locator_sequence().

void OpenDDS::RTPS::Spdp::get_discovered_participant_ids ( DCPS::RepoIdSet &  results  )  const [private]

Definition at line 821 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.

Referenced by remove_expired_participants(), and ~Spdp().

00822 {
00823   DiscoveredParticipantMap::const_iterator idx;
00824   for (idx = participants_.begin(); idx != participants_.end(); ++idx)
00825   {
00826     results.insert(idx->first);
00827   }
00828 }

bool OpenDDS::RTPS::Spdp::has_discovered_participant ( const DCPS::RepoId guid  ) 

Definition at line 814 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.

Referenced by OpenDDS::RTPS::Sedp::data_received(), and OpenDDS::RTPS::Sedp::disassociate().

00815 {
00816   return participants_.find(guid) != participants_.end();
00817 }

void OpenDDS::RTPS::Spdp::init_bit ( const DDS::Subscriber_var &  bit_subscriber  ) 

Definition at line 256 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, OpenDDS::RTPS::Spdp::SpdpTransport::open(), and tport_.

00257 {
00258   bit_subscriber_ = bit_subscriber;
00259   tport_->open();
00260 }

DDS::ParticipantBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Spdp::part_bit (  )  [private]

Reimplemented from OpenDDS::DCPS::LocalParticipant< EndpointManagerType >.

Definition at line 277 of file Spdp.cpp.

References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, and OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC.

Referenced by data_received().

00278 {
00279   if (!bit_subscriber_.in())
00280     return 0;
00281 
00282   DDS::DataReader_var d =
00283     bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
00284   return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
00285 }

ACE_Reactor * OpenDDS::RTPS::Spdp::reactor (  )  const [private]

Definition at line 289 of file Spdp.cpp.

References disco_, and OpenDDS::DCPS::PeerDiscovery< Participant >::reactor().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), and OpenDDS::RTPS::Spdp::SpdpTransport::open().

00290 {
00291   return disco_->reactor();
00292 }

void OpenDDS::RTPS::Spdp::remove_expired_participants (  )  [private]

Definition at line 225 of file Spdp.cpp.

References OpenDDS::DCPS::DCPS_debug_level, get_discovered_participant_ids(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, OPENDDS_STRING, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, and OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant().

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout().

00226 {
00227   // Find and remove any expired discovered participant
00228   ACE_GUARD (ACE_Thread_Mutex, g, lock_);
00229   // Iterate through a copy of the repo Ids, rather than the map
00230   //   as it gets unlocked in remove_discovered_participant()
00231   DCPS::RepoIdSet participant_ids;
00232   get_discovered_participant_ids(participant_ids);
00233   for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin();
00234        participant_id != participant_ids.end();
00235        ++participant_id)
00236   {
00237     DiscoveredParticipantIter part = participants_.find(*participant_id);
00238     if (part != participants_.end()) {
00239       if (part->second.last_seen_ <
00240           ACE_OS::gettimeofday() -
00241           ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) {
00242         if (DCPS::DCPS_debug_level > 1) {
00243           DCPS::GuidConverter conv(part->first);
00244           ACE_DEBUG((LM_WARNING,
00245             ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ")
00246             ACE_TEXT("participant %C exceeded lease duration, removing\n"),
00247             OPENDDS_STRING(conv).c_str()));
00248         }
00249         remove_discovered_participant(part);
00250       }
00251     }
00252   }
00253 }

bool OpenDDS::RTPS::Spdp::shutting_down (  )  [inline]

Definition at line 60 of file Spdp.h.

Referenced by OpenDDS::RTPS::Sedp::data_received(), OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::remove_entities_belonging_to(), OpenDDS::RTPS::Sedp::shutting_down(), and OpenDDS::RTPS::Sedp::Task::svc_i().

00060 { return shutdown_flag_.value(); }

void OpenDDS::RTPS::Spdp::signal_liveliness ( DDS::LivelinessQosPolicyKind  kind  ) 

Definition at line 726 of file Spdp.cpp.

References sedp_, and OpenDDS::RTPS::Sedp::signal_liveliness().

00727 {
00728   sedp_.signal_liveliness(kind);
00729 }

WaitForAcks & OpenDDS::RTPS::Spdp::wait_for_acks (  ) 

Definition at line 295 of file Spdp.cpp.

References wait_for_acks_.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception(), and OpenDDS::RTPS::Sedp::Task::svc().

00296 {
00297   return wait_for_acks_;
00298 }


Member Data Documentation

RtpsDiscovery* OpenDDS::RTPS::Spdp::disco_ [private]

Definition at line 73 of file Spdp.h.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::open(), OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket(), reactor(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().

const DDS::DomainId_t OpenDDS::RTPS::Spdp::domain_ [private]

Definition at line 76 of file Spdp.h.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().

ACE_Event_Handler_var OpenDDS::RTPS::Spdp::eh_ [private]

Definition at line 114 of file Spdp.h.

Referenced by ~Spdp().

bool OpenDDS::RTPS::Spdp::eh_shutdown_ [private]

Definition at line 115 of file Spdp.h.

Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().

DCPS::RepoId OpenDDS::RTPS::Spdp::guid_ [private]

Definition at line 77 of file Spdp.h.

Referenced by data_received(), OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister(), OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport(), and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

Sedp OpenDDS::RTPS::Spdp::sedp_ [private]

Definition at line 122 of file Spdp.h.

Referenced by data_received(), fini_bit(), signal_liveliness(), and ~Spdp().

OpenDDS::DCPS::LocatorSeq OpenDDS::RTPS::Spdp::sedp_multicast_ [private]

Definition at line 78 of file Spdp.h.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

OpenDDS::DCPS::LocatorSeq OpenDDS::RTPS::Spdp::sedp_unicast_ [private]

Definition at line 78 of file Spdp.h.

Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::write_i().

ACE_Condition_Thread_Mutex OpenDDS::RTPS::Spdp::shutdown_cond_ [private]

Definition at line 116 of file Spdp.h.

Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().

ACE_Atomic_Op<ACE_Thread_Mutex, long> OpenDDS::RTPS::Spdp::shutdown_flag_ [private]

Definition at line 117 of file Spdp.h.

Referenced by data_received(), and ~Spdp().

OpenDDS::RTPS::Spdp::SpdpTransport * OpenDDS::RTPS::Spdp::tport_ [private]

Referenced by data_received(), fini_bit(), init_bit(), and ~Spdp().

WaitForAcks OpenDDS::RTPS::Spdp::wait_for_acks_ [private]

Definition at line 125 of file Spdp.h.

Referenced by fini_bit(), and wait_for_acks().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:50 2016 for OpenDDS by  doxygen 1.4.7