#include <Sedp.h>
Inheritance diagram for OpenDDS::RTPS::Sedp:
Definition at line 54 of file Sedp.h.
typedef LocalParticipantMessageMap::const_iterator OpenDDS::RTPS::Sedp::LocalParticipantMessageCIter [private] |
typedef LocalParticipantMessageMap::iterator OpenDDS::RTPS::Sedp::LocalParticipantMessageIter [private] |
typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredReaderData> OpenDDS::RTPS::Sedp::MsgIdRdrDataPair [private] |
typedef std::pair<DCPS::MessageId, OpenDDS::DCPS::DiscoveredWriterData> OpenDDS::RTPS::Sedp::MsgIdWtrDataPair [private] |
typedef DCPS::RcHandle<Reader> OpenDDS::RTPS::Sedp::Reader_rch [private] |
OpenDDS::RTPS::Sedp::Sedp | ( | const DCPS::RepoId & | participant_id, | |
Spdp & | owner, | |||
ACE_Thread_Mutex & | lock | |||
) |
void OpenDDS::RTPS::Sedp::acknowledge | ( | ) |
request for acknowledgement from all Sedp threads (Task)
Definition at line 1962 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Task::acknowledge(), and task_.
Referenced by OpenDDS::RTPS::Spdp::fini_bit().
01963 { 01964 task_.acknowledge(); 01965 }
void OpenDDS::RTPS::Sedp::assign_bit_key | ( | DiscoveredSubscription & | sub | ) | [private] |
Definition at line 364 of file Sedp.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_.
00365 { 00366 increment_key(sub_bit_key_); 00367 sub_key_to_id_[sub_bit_key_] = sub.reader_data_.readerProxy.remoteReaderGuid; 00368 sub.reader_data_.ddsSubscriptionData.key = sub_bit_key_; 00369 }
void OpenDDS::RTPS::Sedp::assign_bit_key | ( | DiscoveredPublication & | pub | ) | [private] |
Definition at line 356 of file Sedp.cpp.
References OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_.
Referenced by data_received().
00357 { 00358 increment_key(pub_bit_key_); 00359 pub_key_to_id_[pub_bit_key_] = pub.writer_data_.writerProxy.remoteWriterGuid; 00360 pub.writer_data_.ddsPublicationData.key = pub_bit_key_; 00361 }
void OpenDDS::RTPS::Sedp::associate | ( | const SPDPdiscoveredParticipantData & | pdata | ) |
Definition at line 405 of file Sedp.cpp.
References OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, OpenDDS::RTPS::create_association_data_proto(), OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, participant_message_reader_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, publications_reader_, OpenDDS::DCPS::AssociationData::remote_id_, subscriptions_reader_, and task_.
Referenced by OpenDDS::RTPS::Spdp::data_received().
00406 { 00407 // First create a 'prototypical' instance of AssociationData. It will 00408 // be copied and modified for each of the (up to) four SEDP Endpoints. 00409 DCPS::AssociationData proto; 00410 create_association_data_proto(proto, pdata); 00411 00412 const BuiltinEndpointSet_t& avail = 00413 pdata.participantProxy.availableBuiltinEndpoints; 00414 00415 // See RTPS v2.1 section 8.5.5.1 00416 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) { 00417 DCPS::AssociationData peer = proto; 00418 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER; 00419 publications_reader_->assoc(peer); 00420 } 00421 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) { 00422 DCPS::AssociationData peer = proto; 00423 peer.remote_id_.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER; 00424 subscriptions_reader_->assoc(peer); 00425 } 00426 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) { 00427 DCPS::AssociationData peer = proto; 00428 peer.remote_id_.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER; 00429 participant_message_reader_->assoc(peer); 00430 } 00431 00432 SPDPdiscoveredParticipantData* dpd = 00433 new SPDPdiscoveredParticipantData(pdata); 00434 task_.enqueue(dpd); 00435 }
void OpenDDS::RTPS::Sedp::association_complete | ( | const DCPS::RepoId & | localId, | |
const DCPS::RepoId & | remoteId | |||
) | [virtual] |
void OpenDDS::RTPS::Sedp::data_received | ( | DCPS::MessageId | message_id, | |
const ParticipantMessageData & | data | |||
) | [private] |
Definition at line 1284 of file Sedp.cpp.
References OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::RTPS::ParticipantMessageData::participantGuid, OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
01286 { 01287 if (spdp_.shutting_down()) { return; } 01288 01289 const RepoId& guid = data.participantGuid; 01290 RepoId guid_participant = guid; 01291 guid_participant.entityId = ENTITYID_PARTICIPANT; 01292 RepoId prefix = data.participantGuid; 01293 prefix.entityId = EntityId_t(); // Clear the entityId so lower bound will work. 01294 01295 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01296 01297 if (ignoring(guid) 01298 || ignoring(guid_participant)) { 01299 return; 01300 } 01301 01302 if (!spdp_.has_discovered_participant (guid_participant)) { 01303 return; 01304 } 01305 01306 for (LocalSubscriptionMap::const_iterator sub_pos = local_subscriptions_.begin(), 01307 sub_limit = local_subscriptions_.end(); 01308 sub_pos != sub_limit; ++sub_pos) { 01309 const DCPS::RepoIdSet::const_iterator pos = 01310 sub_pos->second.matched_endpoints_.lower_bound(prefix); 01311 if (pos != sub_pos->second.matched_endpoints_.end() && 01312 OpenDDS::DCPS::GuidPrefixEqual()(pos->guidPrefix, prefix.guidPrefix)) { 01313 sub_pos->second.subscription_->signal_liveliness(guid_participant); 01314 } 01315 } 01316 }
void OpenDDS::RTPS::Sedp::data_received | ( | DCPS::MessageId | message_id, | |
const OpenDDS::DCPS::DiscoveredReaderData & | rdata | |||
) | [private] |
Definition at line 1085 of file Sedp.cpp.
References assign_bit_key(), OpenDDS::DCPS::DiscoveredReaderData::contentFilterProperty, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DiscoveredReaderData::ddsSubscriptionData, deferred_subscriptions_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), DDS::HANDLE_NIL, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), inconsistent_topic(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, paramsChanged(), qosChanged(), OpenDDS::DCPS::DiscoveredReaderData::readerProxy, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, sub_bit(), DDS::SubscriptionBuiltinTopicData::topic_data, DDS::SubscriptionBuiltinTopicData::topic_name, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_, DDS::SubscriptionBuiltinTopicData::type_name, and OpenDDS::DCPS::UNREGISTER_INSTANCE.
01087 { 01088 if (spdp_.shutting_down()) { return; } 01089 01090 const RepoId& guid = rdata.readerProxy.remoteReaderGuid; 01091 RepoId guid_participant = guid; 01092 guid_participant.entityId = ENTITYID_PARTICIPANT; 01093 01094 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 01095 01096 if (ignoring(guid) 01097 || ignoring(guid_participant) 01098 || ignoring(rdata.ddsSubscriptionData.topic_name)) { 01099 return; 01100 } 01101 01102 if (!spdp_.has_discovered_participant (guid_participant)) { 01103 deferred_subscriptions_[guid] = std::make_pair (message_id, rdata); 01104 return; 01105 } 01106 01107 OPENDDS_STRING topic_name; 01108 // Find the publication - iterator valid only as long as we hold the lock 01109 DiscoveredSubscriptionIter iter = discovered_subscriptions_.find(guid); 01110 01111 // Must unlock when calling into sub_bit() as it may call back into us 01112 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 01113 01114 if (message_id == DCPS::SAMPLE_DATA) { 01115 OpenDDS::DCPS::DiscoveredReaderData rdata_copy; 01116 01117 if (iter == discovered_subscriptions_.end()) { // add new 01118 { // Reduce scope of sub and td 01119 DiscoveredSubscription& sub = 01120 discovered_subscriptions_[guid] = DiscoveredSubscription(rdata); 01121 01122 topic_name = get_topic_name(sub); 01123 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01124 topics_.find(topic_name); 01125 if (top_it == topics_.end()) { 01126 top_it = 01127 topics_.insert(std::make_pair(topic_name, TopicDetails())).first; 01128 top_it->second.data_type_ = rdata.ddsSubscriptionData.type_name; 01129 top_it->second.qos_.topic_data = rdata.ddsSubscriptionData.topic_data; 01130 top_it->second.repo_id_ = make_topic_guid(); 01131 01132 } else if (top_it->second.data_type_ != 01133 rdata.ddsSubscriptionData.type_name.in()) { 01134 inconsistent_topic(top_it->second.endpoints_); 01135 if (DCPS::DCPS_debug_level) { 01136 ACE_DEBUG((LM_WARNING, 01137 ACE_TEXT("(%P|%t) Sedp::data_received(drd) - WARNING ") 01138 ACE_TEXT("topic %C discovered data type %C doesn't ") 01139 ACE_TEXT("match known data type %C, ignoring ") 01140 ACE_TEXT("discovered subcription.\n"), 01141 topic_name.c_str(), 01142 rdata.ddsSubscriptionData.type_name.in(), 01143 top_it->second.data_type_.c_str())); 01144 } 01145 return; 01146 } 01147 01148 TopicDetails& td = top_it->second; 01149 topic_names_[td.repo_id_] = topic_name; 01150 td.endpoints_.insert(guid); 01151 01152 std::memcpy(sub.reader_data_.ddsSubscriptionData.participant_key.value, 01153 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t)); 01154 assign_bit_key(sub); 01155 rdata_copy = sub.reader_data_; 01156 } 01157 01158 // Iter no longer valid once lock released 01159 iter = discovered_subscriptions_.end(); 01160 01161 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL; 01162 #ifndef DDS_HAS_MINIMUM_BIT 01163 { 01164 // Release lock for call into sub_bit 01165 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock); 01166 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit(); 01167 if (bit) { // bit may be null if the DomainParticipant is shutting down 01168 instance_handle = 01169 bit->store_synthetic_data(rdata_copy.ddsSubscriptionData, 01170 DDS::NEW_VIEW_STATE); 01171 } 01172 } 01173 #endif /* DDS_HAS_MINIMUM_BIT */ 01174 01175 if (spdp_.shutting_down()) { return; } 01176 // Subscription may have been removed while lock released 01177 iter = discovered_subscriptions_.find(guid); 01178 if (iter != discovered_subscriptions_.end()) { 01179 iter->second.bit_ih_ = instance_handle; 01180 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01181 topics_.find(topic_name); 01182 if (top_it != topics_.end()) { 01183 if (DCPS::DCPS_debug_level > 3) { 01184 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ") 01185 ACE_TEXT("calling match_endpoints new\n"))); 01186 } 01187 match_endpoints(guid, top_it->second); 01188 } 01189 } 01190 01191 } else { // update existing 01192 if (qosChanged(iter->second.reader_data_.ddsSubscriptionData, 01193 rdata.ddsSubscriptionData)) { 01194 #ifndef DDS_HAS_MINIMUM_BIT 01195 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* bit = sub_bit(); 01196 if (bit) { // bit may be null if the DomainParticipant is shutting down 01197 bit->store_synthetic_data( 01198 iter->second.reader_data_.ddsSubscriptionData, 01199 DDS::NOT_NEW_VIEW_STATE); 01200 } 01201 #endif /* DDS_HAS_MINIMUM_BIT */ 01202 01203 // Match/unmatch local publication(s) 01204 topic_name = get_topic_name(iter->second); 01205 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01206 topics_.find(topic_name); 01207 if (top_it != topics_.end()) { 01208 if (DCPS::DCPS_debug_level > 3) { 01209 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ") 01210 ACE_TEXT("calling match_endpoints update\n"))); 01211 } 01212 match_endpoints(guid, top_it->second); 01213 } 01214 } 01215 01216 if (paramsChanged(iter->second.reader_data_.contentFilterProperty, 01217 rdata.contentFilterProperty)) { 01218 // Let any associated local publications know about the change 01219 topic_name = get_topic_name(iter->second); 01220 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01221 topics_.find(topic_name); 01222 using DCPS::RepoIdSet; 01223 const RepoIdSet& assoc = 01224 (top_it == topics_.end()) ? RepoIdSet() : top_it->second.endpoints_; 01225 for (RepoIdSet::const_iterator i = assoc.begin(); i != assoc.end(); ++i) { 01226 if (i->entityId.entityKind & 4) continue; // subscription 01227 const LocalPublicationIter lpi = local_publications_.find(*i); 01228 if (lpi != local_publications_.end()) { 01229 lpi->second.publication_->update_subscription_params(guid, 01230 rdata.contentFilterProperty.expressionParameters); 01231 } 01232 } 01233 } 01234 } 01235 // For each associated opendds writer to this reader 01236 CORBA::ULong len = rdata.readerProxy.associatedWriters.length(); 01237 for (CORBA::ULong writerIndex = 0; writerIndex < len; ++writerIndex) 01238 { 01239 GUID_t writerGuid = rdata.readerProxy.associatedWriters[writerIndex]; 01240 01241 // If the associated writer is in this participant 01242 LocalPublicationIter lp = local_publications_.find(writerGuid); 01243 if (lp != local_publications_.end()) { 01244 // If the local writer is not fully associated with the reader 01245 if (lp->second.remote_opendds_associations_.insert(guid).second) { 01246 // This is a new association 01247 lp->second.publication_->association_complete(guid); 01248 } 01249 } 01250 } 01251 01252 } else if (message_id == DCPS::UNREGISTER_INSTANCE || 01253 message_id == DCPS::DISPOSE_INSTANCE || 01254 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) { 01255 if (iter != discovered_subscriptions_.end()) { 01256 // Unmatch local publication(s) 01257 topic_name = get_topic_name(iter->second); 01258 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01259 topics_.find(topic_name); 01260 if (top_it != topics_.end()) { 01261 top_it->second.endpoints_.erase(guid); 01262 if (DCPS::DCPS_debug_level > 3) { 01263 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(drd) - ") 01264 ACE_TEXT("calling match_endpoints disp/unreg\n"))); 01265 } 01266 match_endpoints(guid, top_it->second, true /*remove*/); 01267 if (spdp_.shutting_down()) { return; } 01268 } 01269 remove_from_bit(iter->second); 01270 discovered_subscriptions_.erase(iter); 01271 } 01272 } 01273 }
void OpenDDS::RTPS::Sedp::data_received | ( | DCPS::MessageId | message_id, | |
const OpenDDS::DCPS::DiscoveredWriterData & | wdata | |||
) | [private] |
Definition at line 925 of file Sedp.cpp.
References assign_bit_key(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DiscoveredWriterData::ddsPublicationData, deferred_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), DDS::HANDLE_NIL, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring(), inconsistent_topic(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), DDS::NEW_VIEW_STATE, DDS::NOT_NEW_VIEW_STATE, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, pub_bit(), qosChanged(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::RTPS::Spdp::shutting_down(), spdp_, DDS::PublicationBuiltinTopicData::topic_data, DDS::PublicationBuiltinTopicData::topic_name, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_, DDS::PublicationBuiltinTopicData::type_name, OpenDDS::DCPS::UNREGISTER_INSTANCE, and OpenDDS::DCPS::DiscoveredWriterData::writerProxy.
Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().
00927 { 00928 if (spdp_.shutting_down()) { return; } 00929 00930 const RepoId& guid = wdata.writerProxy.remoteWriterGuid; 00931 RepoId guid_participant = guid; 00932 guid_participant.entityId = ENTITYID_PARTICIPANT; 00933 00934 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00935 if (ignoring(guid) 00936 || ignoring(guid_participant) 00937 || ignoring(wdata.ddsPublicationData.topic_name)) { 00938 return; 00939 } 00940 00941 if (!spdp_.has_discovered_participant (guid_participant)) { 00942 deferred_publications_[guid] = std::make_pair (message_id, wdata); 00943 return; 00944 } 00945 00946 OPENDDS_STRING topic_name; 00947 // Find the publication - iterator valid only as long as we hold the lock 00948 DiscoveredPublicationIter iter = discovered_publications_.find(guid); 00949 00950 if (message_id == DCPS::SAMPLE_DATA) { 00951 OpenDDS::DCPS::DiscoveredWriterData wdata_copy; 00952 00953 if (iter == discovered_publications_.end()) { // add new 00954 // Must unlock when calling into pub_bit() as it may call back into us 00955 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00956 00957 { // Reduce scope of pub and td 00958 DiscoveredPublication& pub = 00959 discovered_publications_[guid] = DiscoveredPublication(wdata); 00960 00961 topic_name = get_topic_name(pub); 00962 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00963 topics_.find(topic_name); 00964 if (top_it == topics_.end()) { 00965 top_it = 00966 topics_.insert(std::make_pair(topic_name, TopicDetails())).first; 00967 top_it->second.data_type_ = wdata.ddsPublicationData.type_name; 00968 top_it->second.qos_.topic_data = wdata.ddsPublicationData.topic_data; 00969 top_it->second.repo_id_ = make_topic_guid(); 00970 00971 } else if (top_it->second.data_type_ != 00972 wdata.ddsPublicationData.type_name.in()) { 00973 inconsistent_topic(top_it->second.endpoints_); 00974 if (DCPS::DCPS_debug_level) { 00975 ACE_DEBUG((LM_WARNING, 00976 ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - WARNING ") 00977 ACE_TEXT("topic %C discovered data type %C doesn't ") 00978 ACE_TEXT("match known data type %C, ignoring ") 00979 ACE_TEXT("discovered publication.\n"), 00980 topic_name.c_str(), 00981 wdata.ddsPublicationData.type_name.in(), 00982 top_it->second.data_type_.c_str())); 00983 } 00984 return; 00985 } 00986 00987 TopicDetails& td = top_it->second; 00988 topic_names_[td.repo_id_] = topic_name; 00989 td.endpoints_.insert(guid); 00990 00991 std::memcpy(pub.writer_data_.ddsPublicationData.participant_key.value, 00992 guid.guidPrefix, sizeof(DDS::BuiltinTopicKey_t)); 00993 assign_bit_key(pub); 00994 wdata_copy = pub.writer_data_; 00995 } 00996 00997 // Iter no longer valid once lock released 00998 iter = discovered_publications_.end(); 00999 01000 DDS::InstanceHandle_t instance_handle = DDS::HANDLE_NIL; 01001 #ifndef DDS_HAS_MINIMUM_BIT 01002 { 01003 // Release lock for call into pub_bit 01004 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock); 01005 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit(); 01006 if (bit) { // bit may be null if the DomainParticipant is shutting down 01007 instance_handle = 01008 bit->store_synthetic_data(wdata_copy.ddsPublicationData, 01009 DDS::NEW_VIEW_STATE); 01010 } 01011 } 01012 #endif /* DDS_HAS_MINIMUM_BIT */ 01013 01014 if (spdp_.shutting_down()) { return; } 01015 // Publication may have been removed while lock released 01016 iter = discovered_publications_.find(guid); 01017 if (iter != discovered_publications_.end()) { 01018 iter->second.bit_ih_ = instance_handle; 01019 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01020 topics_.find(topic_name); 01021 if (top_it != topics_.end()) { 01022 if (DCPS::DCPS_debug_level > 3) { 01023 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ") 01024 ACE_TEXT("calling match_endpoints new\n"))); 01025 } 01026 match_endpoints(guid, top_it->second); 01027 } 01028 } 01029 01030 } else if (qosChanged(iter->second.writer_data_.ddsPublicationData, 01031 wdata.ddsPublicationData)) { // update existing 01032 #ifndef DDS_HAS_MINIMUM_BIT 01033 DDS::PublicationBuiltinTopicDataDataReaderImpl* bit = pub_bit(); 01034 if (bit) { // bit may be null if the DomainParticipant is shutting down 01035 bit->store_synthetic_data(iter->second.writer_data_.ddsPublicationData, 01036 DDS::NOT_NEW_VIEW_STATE); 01037 } 01038 #endif /* DDS_HAS_MINIMUM_BIT */ 01039 01040 // Match/unmatch local subscription(s) 01041 topic_name = get_topic_name(iter->second); 01042 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01043 topics_.find(topic_name); 01044 if (top_it != topics_.end()) { 01045 if (DCPS::DCPS_debug_level > 3) { 01046 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ") 01047 ACE_TEXT("calling match_endpoints update\n"))); 01048 } 01049 match_endpoints(guid, top_it->second); 01050 } 01051 } 01052 01053 } else if (message_id == DCPS::UNREGISTER_INSTANCE || 01054 message_id == DCPS::DISPOSE_INSTANCE || 01055 message_id == DCPS::DISPOSE_UNREGISTER_INSTANCE) { 01056 if (iter != discovered_publications_.end()) { 01057 // Unmatch local subscription(s) 01058 topic_name = get_topic_name(iter->second); 01059 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 01060 topics_.find(topic_name); 01061 if (top_it != topics_.end()) { 01062 top_it->second.endpoints_.erase(guid); 01063 match_endpoints(guid, top_it->second, true /*remove*/); 01064 if (spdp_.shutting_down()) { return; } 01065 } 01066 remove_from_bit(iter->second); 01067 if (DCPS::DCPS_debug_level > 3) { 01068 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) Sedp::data_received(dwd) - ") 01069 ACE_TEXT("calling match_endpoints disp/unreg\n"))); 01070 } 01071 discovered_publications_.erase(iter); 01072 } 01073 } 01074 }
virtual bool OpenDDS::RTPS::Sedp::defer_reader | ( | const DCPS::RepoId & | reader, | |
const DCPS::RepoId & | reader_participant | |||
) | [private, virtual] |
virtual bool OpenDDS::RTPS::Sedp::defer_writer | ( | const DCPS::RepoId & | writer, | |
const DCPS::RepoId & | writer_participant | |||
) | [private, virtual] |
bool OpenDDS::RTPS::Sedp::disassociate | ( | const SPDPdiscoveredParticipantData & | pdata | ) |
Definition at line 550 of file Sedp.cpp.
References associated_participants_, OpenDDS::RTPS::ParticipantProxy_t::availableBuiltinEndpoints, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER, OpenDDS::RTPS::BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER, OpenDDS::DCPS::TransportClient::disassociate(), OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER, OpenDDS::RTPS::DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER, OpenDDS::DCPS::ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER, OpenDDS::DCPS::ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER, OpenDDS::RTPS::ParticipantProxy_t::guidPrefix, OpenDDS::RTPS::Spdp::has_discovered_participant(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_, participant_message_reader_, participant_message_writer_, OpenDDS::RTPS::SPDPdiscoveredParticipantData::participantProxy, publications_reader_, publications_writer_, remove_entities_belonging_to(), spdp_, subscriptions_reader_, and subscriptions_writer_.
00551 { 00552 RepoId part; 00553 std::memcpy(part.guidPrefix, pdata.participantProxy.guidPrefix, 00554 sizeof(GuidPrefix_t)); 00555 part.entityId = ENTITYID_PARTICIPANT; 00556 associated_participants_.erase(part); 00557 const BuiltinEndpointSet_t avail = 00558 pdata.participantProxy.availableBuiltinEndpoints; 00559 00560 { // Release lock, so we can call into transport 00561 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00562 ACE_GUARD_RETURN(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock, false); 00563 00564 // See RTPS v2.1 section 8.5.5.2 00565 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_DETECTOR) { 00566 RepoId id = part; 00567 id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_READER; 00568 publications_writer_.disassociate(id); 00569 } 00570 if (avail & DISC_BUILTIN_ENDPOINT_PUBLICATION_ANNOUNCER) { 00571 RepoId id = part; 00572 id.entityId = ENTITYID_SEDP_BUILTIN_PUBLICATIONS_WRITER; 00573 publications_reader_->disassociate(id); 00574 } 00575 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_DETECTOR) { 00576 RepoId id = part; 00577 id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_READER; 00578 subscriptions_writer_.disassociate(id); 00579 } 00580 if (avail & DISC_BUILTIN_ENDPOINT_SUBSCRIPTION_ANNOUNCER) { 00581 RepoId id = part; 00582 id.entityId = ENTITYID_SEDP_BUILTIN_SUBSCRIPTIONS_WRITER; 00583 subscriptions_reader_->disassociate(id); 00584 } 00585 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_READER) { 00586 RepoId id = part; 00587 id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_READER; 00588 participant_message_writer_.disassociate(id); 00589 } 00590 if (avail & BUILTIN_ENDPOINT_PARTICIPANT_MESSAGE_DATA_WRITER) { 00591 RepoId id = part; 00592 id.entityId = ENTITYID_P2P_BUILTIN_PARTICIPANT_MESSAGE_WRITER; 00593 participant_message_reader_->disassociate(id); 00594 } 00595 //FUTURE: if/when topic propagation is supported, add it here 00596 } 00597 if (spdp_.has_discovered_participant(part)) { 00598 remove_entities_belonging_to(discovered_publications_, part); 00599 remove_entities_belonging_to(discovered_subscriptions_, part); 00600 return true; 00601 } else { 00602 return false; 00603 } 00604 }
void OpenDDS::RTPS::Sedp::inconsistent_topic | ( | const DCPS::RepoIdSet & | endpoints | ) | const [private] |
Definition at line 769 of file Sedp.cpp.
References OpenDDS::DCPS::GUID_t::guidPrefix, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, and OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_.
Referenced by data_received().
00770 { 00771 using DCPS::RepoIdSet; 00772 for (RepoIdSet::const_iterator iter(eps.begin()); iter != eps.end(); ++iter) { 00773 if (0 == std::memcmp(participant_id_.guidPrefix, iter->guidPrefix, 00774 sizeof(GuidPrefix_t))) { 00775 const bool reader = iter->entityId.entityKind & 4; 00776 if (reader) { 00777 const LocalSubscriptionCIter lsi = local_subscriptions_.find(*iter); 00778 if (lsi != local_subscriptions_.end()) { 00779 lsi->second.subscription_->inconsistent_topic(); 00780 // Only make one callback per inconsistent topic, even if we have 00781 // more than one reader/writer on the topic -- it's the Topic object 00782 // that will actually see the InconsistentTopicStatus change. 00783 return; 00784 } 00785 } else { 00786 const LocalPublicationCIter lpi = local_publications_.find(*iter); 00787 if (lpi != local_publications_.end()) { 00788 lpi->second.publication_->inconsistent_topic(); 00789 return; // see comment above 00790 } 00791 } 00792 } 00793 } 00794 }
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::init | ( | const DCPS::RepoId & | guid, | |
const RtpsDiscovery & | disco, | |||
DDS::DomainId_t | domainId | |||
) |
const ACE_INET_Addr & OpenDDS::RTPS::Sedp::local_address | ( | ) | const |
Definition at line 331 of file Sedp.cpp.
References transport_inst_.
Referenced by map_ipv4_to_ipv6().
00332 { 00333 DCPS::RtpsUdpInst_rch rtps_inst = 00334 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_); 00335 return rtps_inst->local_address_; 00336 }
static DCPS::RepoId OpenDDS::RTPS::Sedp::make_id | ( | const DCPS::RepoId & | participant_id, | |
const EntityId_t & | entity | |||
) | [static, private] |
bool OpenDDS::RTPS::Sedp::map_ipv4_to_ipv6 | ( | ) | const |
Definition at line 346 of file Sedp.cpp.
References local_address().
00347 { 00348 bool map = false; 00349 if (local_address().get_type() != AF_INET) { 00350 map = true; 00351 } 00352 return map; 00353 }
const ACE_INET_Addr & OpenDDS::RTPS::Sedp::multicast_group | ( | ) | const |
Definition at line 339 of file Sedp.cpp.
References transport_inst_.
00340 { 00341 DCPS::RtpsUdpInst_rch rtps_inst = 00342 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_); 00343 return rtps_inst->multicast_group_address_; 00344 }
typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
MsgIdRdrDataPair | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
MsgIdWtrDataPair | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
typedef OpenDDS::RTPS::Sedp::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
LocalParticipantMessage | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [private] |
void OpenDDS::RTPS::Sedp::populate_discovered_reader_msg | ( | OpenDDS::DCPS::DiscoveredReaderData & | drd, | |
const DCPS::RepoId & | subscription_id, | |||
const LocalSubscription & | sub | |||
) | [private] |
void OpenDDS::RTPS::Sedp::populate_discovered_writer_msg | ( | OpenDDS::DCPS::DiscoveredWriterData & | dwd, | |
const DCPS::RepoId & | publication_id, | |||
const LocalPublication & | pub | |||
) | [private] |
void OpenDDS::RTPS::Sedp::populate_transport_locator_sequence | ( | DCPS::TransportLocatorSeq *& | tls, | |
DiscoveredPublicationIter & | iter, | |||
const DCPS::RepoId & | writer | |||
) | [private, virtual] |
Definition at line 2114 of file Sedp.cpp.
References OpenDDS::DCPS::TransportLocator::data, defer_match_endpoints_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::Spdp::get_default_locators(), OpenDDS::RTPS::message_block_to_sequence(), spdp_, and OpenDDS::DCPS::TransportLocator::transport_type.
02117 { 02118 OpenDDS::DCPS::LocatorSeq locs; 02119 bool participantExpectsInlineQos = false; 02120 RepoId remote_participant = writer; 02121 remote_participant.entityId = ENTITYID_PARTICIPANT; 02122 const bool participant_found = 02123 spdp_.get_default_locators(remote_participant, locs, 02124 participantExpectsInlineQos); 02125 if (!wTls->length()) { // if no locators provided, add the default 02126 if (!participant_found) { 02127 defer_match_endpoints_.insert(writer); 02128 return; 02129 } else if (locs.length()) { 02130 size_t size = 0, padding = 0; 02131 DCPS::gen_find_size(locs, size, padding); 02132 02133 ACE_Message_Block mb_locator(size + 1); // Add space for boolean 02134 using DCPS::Serializer; 02135 Serializer ser_loc(&mb_locator, 02136 ACE_CDR_BYTE_ORDER, 02137 Serializer::ALIGN_CDR); 02138 ser_loc << locs; 02139 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos); 02140 02141 DCPS::TransportLocator tl; 02142 tl.transport_type = "rtps_udp"; 02143 message_block_to_sequence (mb_locator, tl.data); 02144 wTls->length(1); 02145 (*wTls)[0] = tl; 02146 } else { 02147 ACE_DEBUG((LM_WARNING, 02148 ACE_TEXT("(%P|%t) Sedp::match - ") 02149 ACE_TEXT("remote writer found with no locators ") 02150 ACE_TEXT("and no default locators\n"))); 02151 } 02152 } 02153 }
void OpenDDS::RTPS::Sedp::populate_transport_locator_sequence | ( | DCPS::TransportLocatorSeq *& | tls, | |
DiscoveredSubscriptionIter & | iter, | |||
const DCPS::RepoId & | reader | |||
) | [private, virtual] |
Definition at line 2069 of file Sedp.cpp.
References OpenDDS::DCPS::TransportLocator::data, defer_match_endpoints_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::ENTITYID_PARTICIPANT, OpenDDS::DCPS::gen_find_size(), OpenDDS::RTPS::Spdp::get_default_locators(), OpenDDS::RTPS::message_block_to_sequence(), spdp_, and OpenDDS::DCPS::TransportLocator::transport_type.
02072 { 02073 OpenDDS::DCPS::LocatorSeq locs; 02074 bool participantExpectsInlineQos = false; 02075 RepoId remote_participant = reader; 02076 remote_participant.entityId = ENTITYID_PARTICIPANT; 02077 const bool participant_found = 02078 spdp_.get_default_locators(remote_participant, locs, 02079 participantExpectsInlineQos); 02080 if (!rTls->length()) { // if no locators provided, add the default 02081 if (!participant_found) { 02082 defer_match_endpoints_.insert(reader); 02083 return; 02084 } else if (locs.length()) { 02085 size_t size = 0, padding = 0; 02086 DCPS::gen_find_size(locs, size, padding); 02087 02088 ACE_Message_Block mb_locator(size + 1); // Add space for boolean 02089 using DCPS::Serializer; 02090 Serializer ser_loc(&mb_locator, 02091 ACE_CDR_BYTE_ORDER, 02092 Serializer::ALIGN_CDR); 02093 ser_loc << locs; 02094 const bool readerExpectsInlineQos = 02095 dsi->second.reader_data_.readerProxy.expectsInlineQos; 02096 ser_loc << ACE_OutputCDR::from_boolean(participantExpectsInlineQos 02097 || readerExpectsInlineQos); 02098 02099 DCPS::TransportLocator tl; 02100 tl.transport_type = "rtps_udp"; 02101 message_block_to_sequence (mb_locator, tl.data); 02102 rTls->length(1); 02103 (*rTls)[0] = tl; 02104 } else { 02105 ACE_DEBUG((LM_WARNING, 02106 ACE_TEXT("(%P|%t) Sedp::match - ") 02107 ACE_TEXT("remote reader found with no locators ") 02108 ACE_TEXT("and no default locators\n"))); 02109 } 02110 } 02111 }
DDS::PublicationBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::pub_bit | ( | ) | [private] |
Definition at line 702 of file Sedp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, and spdp_.
Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00703 { 00704 DDS::Subscriber_var sub = spdp_.bit_subscriber(); 00705 if (!sub.in()) 00706 return 0; 00707 00708 DDS::DataReader_var d = 00709 sub->lookup_datareader(DCPS::BUILT_IN_PUBLICATION_TOPIC); 00710 return dynamic_cast<DDS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in()); 00711 }
void OpenDDS::RTPS::Sedp::remove_entities_belonging_to | ( | Map & | m, | |
RepoId | participant | |||
) |
Definition at line 608 of file Sedp.cpp.
References OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP(), OPENDDS_STRING, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit(), OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
00609 { 00610 participant.entityId.entityKey[0] = 0; 00611 participant.entityId.entityKey[1] = 0; 00612 participant.entityId.entityKey[2] = 0; 00613 participant.entityId.entityKind = 0; 00614 for (typename Map::iterator i = m.lower_bound(participant); 00615 i != m.end() && 0 == std::memcmp(i->first.guidPrefix, 00616 participant.guidPrefix, 00617 sizeof(GuidPrefix_t));) { 00618 OPENDDS_STRING topic_name = get_topic_name(i->second); 00619 OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00620 topics_.find(topic_name); 00621 if (top_it != topics_.end()) { 00622 top_it->second.endpoints_.erase(i->first); 00623 if (DCPS::DCPS_debug_level > 3) { 00624 ACE_DEBUG((LM_DEBUG, 00625 ACE_TEXT("(%P|%t) Sedp::remove_entities_belonging_to - ") 00626 ACE_TEXT("calling match_endpoints remove\n"))); 00627 } 00628 match_endpoints(i->first, top_it->second, true /*remove*/); 00629 if (spdp_.shutting_down()) { return; } 00630 } 00631 remove_from_bit(i->second); 00632 m.erase(i++); 00633 } 00634 }
void OpenDDS::RTPS::Sedp::remove_entities_belonging_to | ( | Map & | m, | |
DCPS::RepoId | participant | |||
) | [private] |
Referenced by disassociate().
void OpenDDS::RTPS::Sedp::remove_from_bit_i | ( | const DiscoveredSubscription & | sub | ) | [private] |
Definition at line 647 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_SUB_BIT, and task_.
00648 { 00649 #ifndef DDS_HAS_MINIMUM_BIT 00650 task_.enqueue(Msg::MSG_REMOVE_FROM_SUB_BIT, sub.bit_ih_); 00651 #else 00652 ACE_UNUSED_ARG(sub); 00653 #endif /* DDS_HAS_MINIMUM_BIT */ 00654 }
void OpenDDS::RTPS::Sedp::remove_from_bit_i | ( | const DiscoveredPublication & | pub | ) | [private] |
Definition at line 637 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Task::enqueue(), OpenDDS::RTPS::Sedp::Msg::MSG_REMOVE_FROM_PUB_BIT, and task_.
00638 { 00639 #ifndef DDS_HAS_MINIMUM_BIT 00640 task_.enqueue(Msg::MSG_REMOVE_FROM_PUB_BIT, pub.bit_ih_); 00641 #else 00642 ACE_UNUSED_ARG(pub); 00643 #endif /* DDS_HAS_MINIMUM_BIT */ 00644 }
virtual DDS::ReturnCode_t OpenDDS::RTPS::Sedp::remove_publication_i | ( | const DCPS::RepoId & | publicationId | ) | [private, virtual] |
virtual DDS::ReturnCode_t OpenDDS::RTPS::Sedp::remove_subscription_i | ( | const DCPS::RepoId & | subscriptionId | ) | [private, virtual] |
void OpenDDS::RTPS::Sedp::set_inline_qos | ( | DCPS::TransportLocatorSeq & | locators | ) | [static, private] |
Definition at line 1949 of file Sedp.cpp.
References OPENDDS_STRING.
Referenced by OpenDDS::RTPS::Sedp::Reader::data_received().
01950 { 01951 const OPENDDS_STRING rtps_udp = "rtps_udp"; 01952 for (CORBA::ULong i = 0; i < locators.length(); ++i) { 01953 if (locators[i].transport_type.in() == rtps_udp) { 01954 const CORBA::ULong len = locators[i].data.length(); 01955 locators[i].data.length(len + 1); 01956 locators[i].data[len] = CORBA::Octet(1); 01957 } 01958 } 01959 }
void OpenDDS::RTPS::Sedp::shutdown | ( | ) |
Definition at line 891 of file Sedp.cpp.
References participant_message_reader_, publications_reader_, OpenDDS::RTPS::Sedp::Task::shutdown(), subscriptions_reader_, and task_.
Referenced by OpenDDS::RTPS::Spdp::~Spdp().
00892 { 00893 task_.shutdown(); 00894 publications_reader_->shutting_down_ = 1; 00895 subscriptions_reader_->shutting_down_ = 1; 00896 participant_message_reader_->shutting_down_ = 1; 00897 }
bool OpenDDS::RTPS::Sedp::shutting_down | ( | ) | const [private, virtual] |
Implements OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >.
Definition at line 2063 of file Sedp.cpp.
References OpenDDS::RTPS::Spdp::shutting_down(), and spdp_.
02064 { 02065 return spdp_.shutting_down(); 02066 }
void OpenDDS::RTPS::Sedp::signal_liveliness | ( | DDS::LivelinessQosPolicyKind | kind | ) |
Definition at line 1341 of file Sedp.cpp.
References DDS::AUTOMATIC_LIVELINESS_QOS, automatic_liveliness_seq_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::GUID_UNKNOWN, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, manual_liveliness_seq_, OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_, OpenDDS::RTPS::PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE, OpenDDS::RTPS::PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE, participant_message_writer_, OpenDDS::RTPS::ParticipantMessageData::participantGuid, and OpenDDS::RTPS::Sedp::Writer::write_sample().
Referenced by OpenDDS::RTPS::Spdp::signal_liveliness().
01342 { 01343 ParticipantMessageData pmd; 01344 pmd.participantGuid = participant_id_; 01345 switch (kind) { 01346 case DDS::AUTOMATIC_LIVELINESS_QOS: 01347 pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_AUTOMATIC_LIVELINESS_UPDATE); 01348 participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, automatic_liveliness_seq_); 01349 break; 01350 case DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS: 01351 pmd.participantGuid.entityId = OpenDDS::DCPS::EntityIdConverter(PARTICIPANT_MESSAGE_DATA_KIND_MANUAL_LIVELINESS_UPDATE); 01352 participant_message_writer_.write_sample(pmd, GUID_UNKNOWN, manual_liveliness_seq_); 01353 break; 01354 case DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS: 01355 // Do nothing. 01356 break; 01357 } 01358 }
DDS::SubscriptionBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::sub_bit | ( | ) | [private] |
Definition at line 714 of file Sedp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, and spdp_.
Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00715 { 00716 DDS::Subscriber_var sub = spdp_.bit_subscriber(); 00717 if (!sub.in()) 00718 return 0; 00719 00720 DDS::DataReader_var d = 00721 sub->lookup_datareader(DCPS::BUILT_IN_SUBSCRIPTION_TOPIC); 00722 return dynamic_cast<DDS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in()); 00723 }
DDS::TopicBuiltinTopicDataDataReaderImpl * OpenDDS::RTPS::Sedp::topic_bit | ( | ) | [private] |
Definition at line 690 of file Sedp.cpp.
References OpenDDS::DCPS::LocalParticipant< EndpointManagerType >::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_TOPIC_TOPIC, and spdp_.
00691 { 00692 DDS::Subscriber_var sub = spdp_.bit_subscriber(); 00693 if (!sub.in()) 00694 return 0; 00695 00696 DDS::DataReader_var d = 00697 sub->lookup_datareader(DCPS::BUILT_IN_TOPIC_TOPIC); 00698 return dynamic_cast<DDS::TopicBuiltinTopicDataDataReaderImpl*>(d.in()); 00699 }
void OpenDDS::RTPS::Sedp::unicast_locators | ( | OpenDDS::DCPS::LocatorSeq & | locators | ) | const |
Definition at line 283 of file Sedp.cpp.
References OpenDDS::RTPS::address_to_bytes(), OpenDDS::RTPS::address_to_kind(), OpenDDS::DCPS::get_interface_addrs(), OPENDDS_VECTOR, TheServiceParticipant, and transport_inst_.
00284 { 00285 DCPS::RtpsUdpInst_rch rtps_inst = 00286 DCPS::static_rchandle_cast<DCPS::RtpsUdpInst>(transport_inst_); 00287 using namespace OpenDDS::RTPS; 00288 00289 CORBA::ULong idx = 0; 00290 00291 // multicast first so it's preferred by remote peers 00292 if (rtps_inst->use_multicast_ && rtps_inst->multicast_group_address_ != ACE_INET_Addr()) { 00293 idx = locators.length(); 00294 locators.length(idx + 1); 00295 locators[idx].kind = address_to_kind(rtps_inst->multicast_group_address_); 00296 locators[idx].port = rtps_inst->multicast_group_address_.get_port_number(); 00297 RTPS::address_to_bytes(locators[idx].address, 00298 rtps_inst->multicast_group_address_); 00299 } 00300 00301 //if local_address_string is empty, or only the port has been set 00302 //need to get interface addresses to populate into the locator 00303 if (rtps_inst->local_address_config_str_.empty() || 00304 rtps_inst->local_address_config_str_.rfind(':') == 0) { 00305 typedef OPENDDS_VECTOR(ACE_INET_Addr) AddrVector; 00306 AddrVector addrs; 00307 if (TheServiceParticipant->default_address ().empty ()) { 00308 OpenDDS::DCPS::get_interface_addrs(addrs); 00309 } else { 00310 addrs.push_back (ACE_INET_Addr (static_cast<u_short> (0), TheServiceParticipant->default_address ().c_str ())); 00311 } 00312 for (AddrVector::iterator adr_it = addrs.begin(); adr_it != addrs.end(); ++adr_it) { 00313 idx = locators.length(); 00314 locators.length(idx + 1); 00315 locators[idx].kind = address_to_kind(*adr_it); 00316 locators[idx].port = rtps_inst->local_address_.get_port_number(); 00317 RTPS::address_to_bytes(locators[idx].address, 00318 *adr_it); 00319 } 00320 } else { 00321 idx = locators.length(); 00322 locators.length(idx + 1); 00323 locators[idx].kind = address_to_kind(rtps_inst->local_address_); 00324 locators[idx].port = rtps_inst->local_address_.get_port_number(); 00325 RTPS::address_to_bytes(locators[idx].address, 00326 rtps_inst->local_address_); 00327 } 00328 }
bool OpenDDS::RTPS::Sedp::update_publication_qos | ( | const DCPS::RepoId & | publicationId, | |
const DDS::DataWriterQos & | qos, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [virtual] |
bool OpenDDS::RTPS::Sedp::update_subscription_params | ( | const DCPS::RepoId & | subId, | |
const DDS::StringSeq & | params | |||
) | [virtual] |
bool OpenDDS::RTPS::Sedp::update_subscription_qos | ( | const DCPS::RepoId & | subscriptionId, | |
const DDS::DataReaderQos & | qos, | |||
const DDS::SubscriberQos & | subscriberQos | |||
) | [virtual] |
bool OpenDDS::RTPS::Sedp::update_topic_qos | ( | const DCPS::RepoId & | topicId, | |
const DDS::TopicQos & | qos, | |||
OPENDDS_STRING & | name | |||
) | [virtual] |
void OpenDDS::RTPS::Sedp::write_durable_participant_message_data | ( | const DCPS::RepoId & | reader | ) | [private] |
Definition at line 1858 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), local_participant_messages_, participant_message_writer_, and write_participant_message_data().
Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().
01859 { 01860 LocalParticipantMessageIter part, end = local_participant_messages_.end(); 01861 for (part = local_participant_messages_.begin(); part != end; ++part) { 01862 write_participant_message_data(part->first, part->second, reader); 01863 } 01864 participant_message_writer_.end_historic_samples(reader); 01865 }
void OpenDDS::RTPS::Sedp::write_durable_publication_data | ( | const DCPS::RepoId & | reader | ) | [private] |
Definition at line 1838 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_, publications_writer_, and write_publication_data().
Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().
01839 { 01840 LocalPublicationIter pub, end = local_publications_.end(); 01841 for (pub = local_publications_.begin(); pub != end; ++pub) { 01842 write_publication_data(pub->first, pub->second, reader); 01843 } 01844 publications_writer_.end_historic_samples(reader); 01845 }
void OpenDDS::RTPS::Sedp::write_durable_subscription_data | ( | const DCPS::RepoId & | reader | ) | [private] |
Definition at line 1848 of file Sedp.cpp.
References OpenDDS::RTPS::Sedp::Writer::end_historic_samples(), OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_, subscriptions_writer_, and write_subscription_data().
Referenced by OpenDDS::RTPS::Sedp::Task::svc_i().
01849 { 01850 LocalSubscriptionIter sub, end = local_subscriptions_.end(); 01851 for (sub = local_subscriptions_.begin(); sub != end; ++sub) { 01852 write_subscription_data(sub->first, sub->second, reader); 01853 } 01854 subscriptions_writer_.end_historic_samples(reader); 01855 }
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_participant_message_data | ( | const DCPS::RepoId & | rid, | |
LocalParticipantMessage & | part, | |||
const DCPS::RepoId & | reader = DCPS::GUID_UNKNOWN | |||
) | [private] |
Referenced by write_durable_participant_message_data().
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_publication_data | ( | const DCPS::RepoId & | rid, | |
LocalPublication & | pub, | |||
const DCPS::RepoId & | reader = DCPS::GUID_UNKNOWN | |||
) | [private] |
Referenced by write_durable_publication_data().
DDS::ReturnCode_t OpenDDS::RTPS::Sedp::write_subscription_data | ( | const DCPS::RepoId & | rid, | |
LocalSubscription & | pub, | |||
const DCPS::RepoId & | reader = DCPS::GUID_UNKNOWN | |||
) | [private] |
Referenced by write_durable_subscription_data().
DCPS::RepoIdSet OpenDDS::RTPS::Sedp::associated_participants_ [private] |
Definition at line 335 of file Sedp.h.
Referenced by disassociate(), and OpenDDS::RTPS::Sedp::Task::svc_i().
DCPS::RepoIdSet OpenDDS::RTPS::Sedp::defer_match_endpoints_ [private] |
Definition at line 335 of file Sedp.h.
Referenced by populate_transport_locator_sequence(), and OpenDDS::RTPS::Sedp::Task::svc_i().
DeferredPublicationMap OpenDDS::RTPS::Sedp::deferred_publications_ [private] |
Definition at line 314 of file Sedp.h.
Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().
DeferredSubscriptionMap OpenDDS::RTPS::Sedp::deferred_subscriptions_ [private] |
Definition at line 319 of file Sedp.h.
Referenced by data_received(), and OpenDDS::RTPS::Sedp::Task::svc_i().
const bool OpenDDS::RTPS::Sedp::host_is_bigendian_ [static] |
LocalParticipantMessageMap OpenDDS::RTPS::Sedp::local_participant_messages_ [private] |
Referenced by disassociate(), OpenDDS::RTPS::Sedp::Task::svc_i(), and write_durable_publication_data().
Spdp& OpenDDS::RTPS::Sedp::spdp_ [private] |
Definition at line 103 of file Sedp.h.
Referenced by data_received(), disassociate(), populate_transport_locator_sequence(), pub_bit(), remove_entities_belonging_to(), shutting_down(), sub_bit(), and topic_bit().
Referenced by disassociate(), OpenDDS::RTPS::Sedp::Task::svc_i(), and write_durable_subscription_data().
Referenced by acknowledge(), associate(), OpenDDS::RTPS::Sedp::Reader::data_received(), remove_from_bit_i(), and shutdown().
Definition at line 278 of file Sedp.h.
Referenced by local_address(), multicast_group(), and unicast_locators().