#include <Spdp.h>
Inheritance diagram for OpenDDS::RTPS::Spdp:
Definition at line 42 of file Spdp.h.
OpenDDS::RTPS::Spdp::Spdp | ( | DDS::DomainId_t | domain, | |
DCPS::RepoId & | guid, | |||
const DDS::DomainParticipantQos & | qos, | |||
RtpsDiscovery * | disco | |||
) |
OpenDDS::RTPS::Spdp::~Spdp | ( | ) |
Definition at line 78 of file Spdp.cpp.
References OpenDDS::RTPS::Spdp::SpdpTransport::close(), OpenDDS::DCPS::DCPS_debug_level, eh_, eh_shutdown_, get_discovered_participant_ids(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant(), sedp_, OpenDDS::RTPS::Sedp::shutdown(), shutdown_cond_, shutdown_flag_, and tport_.
00079 { 00080 shutdown_flag_ = 1; 00081 { 00082 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00083 if (DCPS::DCPS_debug_level > 3) { 00084 ACE_DEBUG((LM_INFO, 00085 ACE_TEXT("(%P|%t) Spdp::~Spdp ") 00086 ACE_TEXT("remove discovered participants\n"))); 00087 } 00088 // Iterate through a copy of the repo Ids, rather than the map 00089 // as it gets unlocked in remove_discovered_participant() 00090 DCPS::RepoIdSet participant_ids; 00091 get_discovered_participant_ids(participant_ids); 00092 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin(); 00093 participant_id != participant_ids.end(); 00094 ++participant_id) 00095 { 00096 DiscoveredParticipantIter part = participants_.find(*participant_id); 00097 if (part != participants_.end()) { 00098 remove_discovered_participant(part); 00099 } 00100 } 00101 } 00102 00103 // ensure sedp's task queue is drained before data members are being 00104 // deleted 00105 sedp_.shutdown(); 00106 00107 // release lock for reset of event handler, which may delete transport 00108 tport_->close(); 00109 eh_.reset(); 00110 { 00111 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00112 while (!eh_shutdown_) { 00113 shutdown_cond_.wait(); 00114 } 00115 } 00116 }
bool OpenDDS::RTPS::Spdp::associated | ( | ) | const |
Definition at line 808 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.
00809 { 00810 return !participants_.empty(); 00811 }
void OpenDDS::RTPS::Spdp::data_received | ( | const DataSubmessage & | data, | |
const ParameterList & | plist | |||
) | [private] |
Definition at line 119 of file Spdp.cpp.
References OpenDDS::RTPS::Sedp::associate(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::RTPS::SPDPdiscoveredParticipantData::ddsParticipantData, OpenDDS::RTPS::disposed(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::RTPS::ParameterListConverter::from_param_list(), guid_, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::DCPS::GUID_t::guidPrefix, DDS::HANDLE_NIL, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::RTPS::DataSubmessage::inlineQos, DDS::ParticipantBuiltinTopicData::key, OpenDDS::RTPS::SPDPdiscoveredParticipantData::leaseDuration, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OPENDDS_STRING, part_bit(), OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant(), OpenDDS::RTPS::Time_t::seconds, sedp_, shutdown_flag_, tport_, DDS::ParticipantBuiltinTopicData::user_data, and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_input().
00120 { 00121 if (shutdown_flag_.value()) { return; } 00122 00123 const ACE_Time_Value time = ACE_OS::gettimeofday(); 00124 SPDPdiscoveredParticipantData pdata; 00125 if (ParameterListConverter::from_param_list(plist, pdata) < 0) { 00126 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Spdp::data_received - ") 00127 ACE_TEXT("failed to convert from ParameterList to ") 00128 ACE_TEXT("SPDPdiscoveredParticipantData\n"))); 00129 return; 00130 } 00131 00132 DCPS::RepoId guid; 00133 std::memcpy(guid.guidPrefix, pdata.participantProxy.guidPrefix, 00134 sizeof(guid.guidPrefix)); 00135 guid.entityId = OpenDDS::DCPS::ENTITYID_PARTICIPANT; 00136 00137 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00138 if (sedp_.ignoring(guid)) { 00139 // Ignore, this is our domain participant or one that the user has 00140 // asked us to ignore. 00141 return; 00142 } 00143 00144 // Find the participant - iterator valid only as long as we hold the lock 00145 DiscoveredParticipantIter iter = participants_.find(guid); 00146 00147 // Must unlock when calling into part_bit() as it may call back into us 00148 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00149 00150 if (iter == participants_.end()) { 00151 // copy guid prefix (octet[12]) into BIT key (long[3]) 00152 std::memcpy(pdata.ddsParticipantData.key.value, 00153 pdata.participantProxy.guidPrefix, 00154 sizeof(pdata.ddsParticipantData.key.value)); 00155 00156 if (DCPS::DCPS_debug_level) { 00157 DCPS::GuidConverter local(guid_), remote(guid); 00158 ACE_DEBUG((LM_DEBUG, 00159 ACE_TEXT("(%P|%t) Spdp::data_received - %C discovered %C lease %ds\n"), 00160 OPENDDS_STRING(local).c_str(), OPENDDS_STRING(remote).c_str(), 00161 pdata.leaseDuration.seconds)); 00162 } 00163 00164 // add a new participant 00165 participants_[guid] = DiscoveredParticipant(pdata, time); 00166 DDS::InstanceHandle_t bit_instance_handle = DDS::HANDLE_NIL; 00167 #ifndef DDS_HAS_MINIMUM_BIT 00168 DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit(); 00169 if (bit) { 00170 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 00171 bit_instance_handle = 00172 bit->store_synthetic_data(pdata.ddsParticipantData, 00173 DDS::NEW_VIEW_STATE); 00174 } 00175 #endif /* DDS_HAS_MINIMUM_BIT */ 00176 00177 // notify Sedp of association 00178 // Sedp may call has_discovered_participant. 00179 // This is what the participant must be added before this call to associate. 00180 sedp_.associate(pdata); 00181 00182 // Since we've just seen a new participant, let's send out our 00183 // own announcement, so they don't have to wait. 00184 this->tport_->write_i(); 00185 00186 // Iterator is no longer valid 00187 iter = participants_.find(guid); 00188 if (iter != participants_.end()) { 00189 iter->second.bit_ih_ = bit_instance_handle; 00190 } 00191 00192 } else if (data.inlineQos.length() && disposed(data.inlineQos)) { 00193 remove_discovered_participant(iter); 00194 00195 } else { 00196 // update an existing participant 00197 pdata.ddsParticipantData.key = iter->second.pdata_.ddsParticipantData.key; 00198 #ifndef OPENDDS_SAFETY_PROFILE 00199 using OpenDDS::DCPS::operator!=; 00200 #endif 00201 if (iter->second.pdata_.ddsParticipantData.user_data != 00202 pdata.ddsParticipantData.user_data) { 00203 iter->second.pdata_.ddsParticipantData.user_data = 00204 pdata.ddsParticipantData.user_data; 00205 #ifndef DDS_HAS_MINIMUM_BIT 00206 DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit(); 00207 if (bit) { 00208 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 00209 bit->store_synthetic_data(pdata.ddsParticipantData, 00210 DDS::NOT_NEW_VIEW_STATE); 00211 } 00212 #endif /* DDS_HAS_MINIMUM_BIT */ 00213 // Perform search again, so iterator becomes valid 00214 iter = participants_.find(guid); 00215 } 00216 // Participant may have been removed while lock released 00217 if (iter != participants_.end()) { 00218 iter->second.pdata_ = pdata; 00219 iter->second.last_seen_ = time; 00220 } 00221 } 00222 }
Sedp& OpenDDS::RTPS::Spdp::endpoint_manager | ( | ) | [inline, protected, virtual] |
Implements OpenDDS::DCPS::LocalParticipant< EndpointManagerType >.
Definition at line 68 of file Spdp.h.
00068 { return sedp_; }
void OpenDDS::RTPS::Spdp::fini_bit | ( | ) |
Definition at line 263 of file Spdp.cpp.
References OpenDDS::RTPS::Sedp::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, OpenDDS::RTPS::WaitForAcks::reset(), sedp_, tport_, OpenDDS::RTPS::WaitForAcks::wait_for_acks(), and wait_for_acks_.
00264 { 00265 bit_subscriber_ = 0; 00266 wait_for_acks_.reset(); 00267 // request for SpdpTransport(actually Reactor) thread and Sedp::Task 00268 // to acknowledge 00269 tport_->acknowledge(); 00270 sedp_.acknowledge(); 00271 // wait for the 2 acknowledgements 00272 wait_for_acks_.wait_for_acks(2); 00273 }
bool OpenDDS::RTPS::Spdp::get_default_locators | ( | const DCPS::RepoId & | part_id, | |
OpenDDS::DCPS::LocatorSeq & | target, | |||
bool & | inlineQos | |||
) |
Referenced by OpenDDS::RTPS::Sedp::populate_transport_locator_sequence().
void OpenDDS::RTPS::Spdp::get_discovered_participant_ids | ( | DCPS::RepoIdSet & | results | ) | const [private] |
Definition at line 821 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.
Referenced by remove_expired_participants(), and ~Spdp().
00822 { 00823 DiscoveredParticipantMap::const_iterator idx; 00824 for (idx = participants_.begin(); idx != participants_.end(); ++idx) 00825 { 00826 results.insert(idx->first); 00827 } 00828 }
bool OpenDDS::RTPS::Spdp::has_discovered_participant | ( | const DCPS::RepoId & | guid | ) |
Definition at line 814 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_.
Referenced by OpenDDS::RTPS::Sedp::data_received(), and OpenDDS::RTPS::Sedp::disassociate().
00815 { 00816 return participants_.find(guid) != participants_.end(); 00817 }
void OpenDDS::RTPS::Spdp::init_bit | ( | const DDS::Subscriber_var & | bit_subscriber | ) |
Definition at line 256 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, OpenDDS::RTPS::Spdp::SpdpTransport::open(), and tport_.
00257 { 00258 bit_subscriber_ = bit_subscriber; 00259 tport_->open(); 00260 }
DDS::ParticipantBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Spdp::part_bit | ( | ) | [private] |
Reimplemented from OpenDDS::DCPS::LocalParticipant< EndpointManagerType >.
Definition at line 277 of file Spdp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber_, and OpenDDS::DCPS::BUILT_IN_PARTICIPANT_TOPIC.
Referenced by data_received().
00278 { 00279 if (!bit_subscriber_.in()) 00280 return 0; 00281 00282 DDS::DataReader_var d = 00283 bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC); 00284 return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in()); 00285 }
ACE_Reactor * OpenDDS::RTPS::Spdp::reactor | ( | ) | const [private] |
Definition at line 289 of file Spdp.cpp.
References disco_, and OpenDDS::DCPS::PeerDiscovery< Participant >::reactor().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::acknowledge(), OpenDDS::RTPS::Spdp::SpdpTransport::close(), and OpenDDS::RTPS::Spdp::SpdpTransport::open().
00290 { 00291 return disco_->reactor(); 00292 }
void OpenDDS::RTPS::Spdp::remove_expired_participants | ( | ) | [private] |
Definition at line 225 of file Spdp.cpp.
References OpenDDS::DCPS::DCPS_debug_level, get_discovered_participant_ids(), OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::lock_, OPENDDS_STRING, OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::participants_, and OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::remove_discovered_participant().
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_timeout().
00226 { 00227 // Find and remove any expired discovered participant 00228 ACE_GUARD (ACE_Thread_Mutex, g, lock_); 00229 // Iterate through a copy of the repo Ids, rather than the map 00230 // as it gets unlocked in remove_discovered_participant() 00231 DCPS::RepoIdSet participant_ids; 00232 get_discovered_participant_ids(participant_ids); 00233 for (DCPS::RepoIdSet::iterator participant_id = participant_ids.begin(); 00234 participant_id != participant_ids.end(); 00235 ++participant_id) 00236 { 00237 DiscoveredParticipantIter part = participants_.find(*participant_id); 00238 if (part != participants_.end()) { 00239 if (part->second.last_seen_ < 00240 ACE_OS::gettimeofday() - 00241 ACE_Time_Value(part->second.pdata_.leaseDuration.seconds)) { 00242 if (DCPS::DCPS_debug_level > 1) { 00243 DCPS::GuidConverter conv(part->first); 00244 ACE_DEBUG((LM_WARNING, 00245 ACE_TEXT("(%P|%t) Spdp::remove_expired_participants() - ") 00246 ACE_TEXT("participant %C exceeded lease duration, removing\n"), 00247 OPENDDS_STRING(conv).c_str())); 00248 } 00249 remove_discovered_participant(part); 00250 } 00251 } 00252 } 00253 }
bool OpenDDS::RTPS::Spdp::shutting_down | ( | ) | [inline] |
Definition at line 60 of file Spdp.h.
Referenced by OpenDDS::RTPS::Sedp::data_received(), OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::remove_entities_belonging_to(), OpenDDS::RTPS::Sedp::shutting_down(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00060 { return shutdown_flag_.value(); }
void OpenDDS::RTPS::Spdp::signal_liveliness | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 726 of file Spdp.cpp.
References sedp_, and OpenDDS::RTPS::Sedp::signal_liveliness().
00727 { 00728 sedp_.signal_liveliness(kind); 00729 }
WaitForAcks & OpenDDS::RTPS::Spdp::wait_for_acks | ( | ) |
Definition at line 295 of file Spdp.cpp.
References wait_for_acks_.
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::handle_exception(), and OpenDDS::RTPS::Sedp::Task::svc().
00296 { 00297 return wait_for_acks_; 00298 }
RtpsDiscovery* OpenDDS::RTPS::Spdp::disco_ [private] |
Definition at line 73 of file Spdp.h.
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::open(), OpenDDS::RTPS::Spdp::SpdpTransport::open_unicast_socket(), reactor(), and OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().
const DDS::DomainId_t OpenDDS::RTPS::Spdp::domain_ [private] |
Definition at line 76 of file Spdp.h.
Referenced by OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport().
ACE_Event_Handler_var OpenDDS::RTPS::Spdp::eh_ [private] |
bool OpenDDS::RTPS::Spdp::eh_shutdown_ [private] |
Definition at line 115 of file Spdp.h.
Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().
DCPS::RepoId OpenDDS::RTPS::Spdp::guid_ [private] |
Definition at line 77 of file Spdp.h.
Referenced by data_received(), OpenDDS::RTPS::Spdp::SpdpTransport::dispose_unregister(), OpenDDS::RTPS::Spdp::SpdpTransport::SpdpTransport(), and OpenDDS::RTPS::Spdp::SpdpTransport::write_i().
Sedp OpenDDS::RTPS::Spdp::sedp_ [private] |
Definition at line 122 of file Spdp.h.
Referenced by data_received(), fini_bit(), signal_liveliness(), and ~Spdp().
ACE_Condition_Thread_Mutex OpenDDS::RTPS::Spdp::shutdown_cond_ [private] |
Definition at line 116 of file Spdp.h.
Referenced by ~Spdp(), and OpenDDS::RTPS::Spdp::SpdpTransport::~SpdpTransport().
ACE_Atomic_Op<ACE_Thread_Mutex, long> OpenDDS::RTPS::Spdp::shutdown_flag_ [private] |
Referenced by data_received(), fini_bit(), init_bit(), and ~Spdp().