#include <DiscoveryBase.h>
Definition at line 109 of file DiscoveryBase.h.
typedef DiscoveredParticipantData_ OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredParticipantData |
Definition at line 164 of file DiscoveryBase.h.
typedef DiscoveredPublicationMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredPublicationIter [protected] |
Definition at line 161 of file DiscoveryBase.h.
typedef DiscoveredSubscriptionMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredSubscriptionIter [protected] |
Definition at line 136 of file DiscoveryBase.h.
typedef LocalPublicationMap::const_iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalPublicationCIter [protected] |
Definition at line 600 of file DiscoveryBase.h.
typedef LocalPublicationMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalPublicationIter [protected] |
Definition at line 599 of file DiscoveryBase.h.
typedef LocalSubscriptionMap::const_iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalSubscriptionCIter [protected] |
Definition at line 605 of file DiscoveryBase.h.
typedef LocalSubscriptionMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalSubscriptionIter [protected] |
Definition at line 604 of file DiscoveryBase.h.
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::EndpointManager | ( | const RepoId & | participant_id, | |
ACE_Thread_Mutex & | lock | |||
) | [inline] |
Definition at line 174 of file DiscoveryBase.h.
00175 : lock_(lock) 00176 , participant_id_(participant_id) 00177 , publication_counter_(0) 00178 , subscription_counter_(0) 00179 , topic_counter_(0) 00180 00181 #if defined(OPENDDS_SECURITY) 00182 , permissions_handle_(DDS::HANDLE_NIL) 00183 #endif 00184 00185 { 00186 00187 }
virtual OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::~EndpointManager | ( | ) | [inline, virtual] |
Definition at line 189 of file DiscoveryBase.h.
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_publication | ( | const DCPS::RepoId & | topicId, | |
DCPS::DataWriterCallbacks * | publication, | |||
const DDS::DataWriterQos & | qos, | |||
const DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [inline] |
Definition at line 317 of file DiscoveryBase.h.
00322 { 00323 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId()); 00324 00325 RepoId rid = participant_id_; 00326 assign_publication_key(rid, topicId, qos); 00327 LocalPublication& pb = local_publications_[rid]; 00328 pb.topic_id_ = topicId; 00329 pb.publication_ = publication; 00330 pb.qos_ = qos; 00331 pb.trans_info_ = transInfo; 00332 pb.publisher_qos_ = publisherQos; 00333 00334 const std::string& topic_name = topic_names_[topicId]; 00335 00336 #if defined(OPENDDS_SECURITY) 00337 if (is_security_enabled()) { 00338 DDS::Security::SecurityException ex; 00339 00340 DDS::Security::TopicSecurityAttributes topic_sec_attr; 00341 if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) { 00342 ACE_ERROR((LM_ERROR, 00343 ACE_TEXT("(%P|%t) ERROR: ") 00344 ACE_TEXT("DomainParticipant::add_publication, ") 00345 ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"), 00346 topic_name.data(), ex.code, ex.minor_code, ex.message.in())); 00347 return RepoId(); 00348 } 00349 00350 if (topic_sec_attr.is_write_protected == true) { 00351 if (!get_access_control()->check_create_datawriter(get_permissions_handle(), get_domain_id(), topic_name.data(), qos, 00352 publisherQos.partition, DDS::Security::DataTagQosPolicy(), ex)) { 00353 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ") 00354 ACE_TEXT("EndpointManager::add_publication() - ") 00355 ACE_TEXT("Permissions check failed for local datawriter on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(), 00356 ex.code, ex.minor_code, ex.message.in())); 00357 return RepoId(); 00358 } 00359 } 00360 00361 if (!get_access_control()->get_datawriter_sec_attributes(get_permissions_handle(), topic_name.data(), 00362 publisherQos.partition, DDS::Security::DataTagQosPolicy(), pb.security_attribs_, ex)) { 00363 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00364 ACE_TEXT("EndpointManager::add_publication() - ") 00365 ACE_TEXT("Unable to get security attributes for local datawriter. Security Exception[%d.%d]: %C\n"), 00366 ex.code, ex.minor_code, ex.message.in())); 00367 return RepoId(); 00368 } 00369 00370 if (pb.security_attribs_.is_submessage_protected || pb.security_attribs_.is_payload_protected) { 00371 DDS::Security::DatawriterCryptoHandle handle = get_crypto_key_factory()->register_local_datawriter(crypto_handle_, DDS::PropertySeq(), pb.security_attribs_, ex); 00372 if (handle == DDS::HANDLE_NIL) { 00373 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00374 ACE_TEXT("EndpointManager::add_publication() - ") 00375 ACE_TEXT("Unable to get local datawriter crypto handle. Security Exception[%d.%d]: %C\n"), 00376 ex.code, ex.minor_code, ex.message.in())); 00377 } 00378 00379 local_writer_crypto_handles_[rid] = handle; 00380 local_writer_security_attribs_[rid] = pb.security_attribs_; 00381 } 00382 } 00383 #endif 00384 00385 TopicDetails& td = topics_[topic_name]; 00386 td.endpoints_.insert(rid); 00387 00388 if (DDS::RETCODE_OK != add_publication_i(rid, pb)) { 00389 return RepoId(); 00390 } 00391 00392 if (DDS::RETCODE_OK != write_publication_data(rid, pb)) { 00393 return RepoId(); 00394 } 00395 00396 if (DCPS::DCPS_debug_level > 3) { 00397 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ") 00398 ACE_TEXT("calling match_endpoints\n"))); 00399 } 00400 match_endpoints(rid, td); 00401 00402 return rid; 00403 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_publication_i | ( | const DCPS::RepoId & | , | |
LocalPublication & | ||||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::StaticEndpointManager.
Definition at line 655 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().
00656 { return DDS::RETCODE_OK; }
virtual DCPS::TransportLocatorSeq OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_security_info | ( | const DCPS::TransportLocatorSeq & | locators, | |
const RepoId & | , | |||
const RepoId & | ||||
) | [inline, protected, virtual] |
Definition at line 1107 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_subscription | ( | const DCPS::RepoId & | topicId, | |
DCPS::DataReaderCallbacks * | subscription, | |||
const DDS::DataReaderQos & | qos, | |||
const DCPS::TransportLocatorSeq & | transInfo, | |||
const DDS::SubscriberQos & | subscriberQos, | |||
const char * | filterClassName, | |||
const char * | filterExpr, | |||
const DDS::StringSeq & | params | |||
) | [inline] |
Definition at line 432 of file DiscoveryBase.h.
00440 { 00441 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId()); 00442 00443 RepoId rid = participant_id_; 00444 assign_subscription_key(rid, topicId, qos); 00445 LocalSubscription& sb = local_subscriptions_[rid]; 00446 sb.topic_id_ = topicId; 00447 sb.subscription_ = subscription; 00448 sb.qos_ = qos; 00449 sb.trans_info_ = transInfo; 00450 sb.subscriber_qos_ = subscriberQos; 00451 sb.filterProperties.filterClassName = filterClassName; 00452 sb.filterProperties.filterExpression = filterExpr; 00453 sb.filterProperties.expressionParameters = params; 00454 00455 const std::string& topic_name = topic_names_[topicId]; 00456 00457 #if defined(OPENDDS_SECURITY) 00458 if (is_security_enabled()) { 00459 DDS::Security::SecurityException ex; 00460 00461 DDS::Security::TopicSecurityAttributes topic_sec_attr; 00462 if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) { 00463 ACE_ERROR((LM_ERROR, 00464 ACE_TEXT("(%P|%t) ERROR: ") 00465 ACE_TEXT("DomainParticipant::add_subscription, ") 00466 ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"), 00467 topic_name.data(), ex.code, ex.minor_code, ex.message.in())); 00468 return RepoId(); 00469 } 00470 00471 if (topic_sec_attr.is_read_protected == true) { 00472 if (!get_access_control()->check_create_datareader(get_permissions_handle(), get_domain_id(), topic_name.data(), qos, 00473 subscriberQos.partition, DDS::Security::DataTagQosPolicy(), ex)) { 00474 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ") 00475 ACE_TEXT("EndpointManager::add_subscription() - ") 00476 ACE_TEXT("Permissions check failed for local datareader on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(), 00477 ex.code, ex.minor_code, ex.message.in())); 00478 return RepoId(); 00479 } 00480 } 00481 00482 if (!get_access_control()->get_datareader_sec_attributes(get_permissions_handle(), topic_name.data(), 00483 subscriberQos.partition, DDS::Security::DataTagQosPolicy(), sb.security_attribs_, ex)) { 00484 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00485 ACE_TEXT("EndpointManager::add_subscription() - ") 00486 ACE_TEXT("Unable to get security attributes for local datareader. Security Exception[%d.%d]: %C\n"), 00487 ex.code, ex.minor_code, ex.message.in())); 00488 return RepoId(); 00489 } 00490 00491 if (sb.security_attribs_.is_submessage_protected || sb.security_attribs_.is_payload_protected) { 00492 DDS::Security::DatareaderCryptoHandle handle = get_crypto_key_factory()->register_local_datareader(crypto_handle_, DDS::PropertySeq(), sb.security_attribs_, ex); 00493 if (handle == DDS::HANDLE_NIL) { 00494 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00495 ACE_TEXT("EndpointManager::add_subscription() - ") 00496 ACE_TEXT("Unable to get local datareader crypto handle. Security Exception[%d.%d]: %C\n"), 00497 ex.code, ex.minor_code, ex.message.in())); 00498 } 00499 00500 local_reader_crypto_handles_[rid] = handle; 00501 local_reader_security_attribs_[rid] = sb.security_attribs_; 00502 } 00503 } 00504 #endif 00505 00506 TopicDetails& td = topics_[topic_name]; 00507 td.endpoints_.insert(rid); 00508 00509 if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) { 00510 return RepoId(); 00511 } 00512 00513 if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) { 00514 return RepoId(); 00515 } 00516 00517 if (DCPS::DCPS_debug_level > 3) { 00518 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ") 00519 ACE_TEXT("calling match_endpoints\n"))); 00520 } 00521 match_endpoints(rid, td); 00522 00523 return rid; 00524 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_subscription_i | ( | const DCPS::RepoId & | , | |
LocalSubscription & | ||||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::StaticEndpointManager.
Definition at line 662 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription().
00663 { return DDS::RETCODE_OK; };
DCPS::TopicStatus OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assert_topic | ( | DCPS::RepoId_out | topicId, | |
const char * | topicName, | |||
const char * | dataTypeName, | |||
const DDS::TopicQos & | qos, | |||
bool | hasDcpsKey | |||
) | [inline] |
Definition at line 271 of file DiscoveryBase.h.
00274 { 00275 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR); 00276 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter = 00277 topics_.find(topicName); 00278 if (iter != topics_.end()) { // types must match, RtpsDiscovery checked for us 00279 iter->second.qos_ = qos; 00280 iter->second.has_dcps_key_ = hasDcpsKey; 00281 topicId = iter->second.repo_id_; 00282 topic_names_[iter->second.repo_id_] = topicName; 00283 return DCPS::FOUND; 00284 } 00285 00286 TopicDetails& td = topics_[topicName]; 00287 td.data_type_ = dataTypeName; 00288 td.qos_ = qos; 00289 td.has_dcps_key_ = hasDcpsKey; 00290 td.repo_id_ = make_topic_guid(); 00291 topicId = td.repo_id_; 00292 topic_names_[td.repo_id_] = topicName; 00293 00294 return DCPS::CREATED; 00295 }
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_publication_key | ( | RepoId & | rid, | |
const RepoId & | topicId, | |||
const DDS::DataWriterQos & | ||||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::StaticEndpointManager.
Definition at line 625 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().
00627 { 00628 rid.entityId.entityKind = 00629 has_dcps_key(topicId) 00630 ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY 00631 : DCPS::ENTITYKIND_USER_WRITER_NO_KEY; 00632 assign(rid.entityId.entityKey, publication_counter_++); 00633 }
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_subscription_key | ( | RepoId & | rid, | |
const RepoId & | topicId, | |||
const DDS::DataReaderQos & | ||||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::DCPS::StaticEndpointManager.
Definition at line 634 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription().
00636 { 00637 rid.entityId.entityKind = 00638 has_dcps_key(topicId) 00639 ? DCPS::ENTITYKIND_USER_READER_WITH_KEY 00640 : DCPS::ENTITYKIND_USER_READER_NO_KEY; 00641 assign(rid.entityId.entityKey, subscription_counter_++); 00642 }
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_topic_key | ( | RepoId & | guid | ) | [inline, protected, virtual] |
Definition at line 643 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::make_topic_guid().
00643 { 00644 assign(guid.entityId.entityKey, topic_counter_++); 00645 00646 if (topic_counter_ == 0x1000000) { 00647 ACE_ERROR((LM_ERROR, 00648 ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ") 00649 ACE_TEXT("Exceeded Maximum number of topic entity keys!") 00650 ACE_TEXT("Next key will be a duplicate!\n"))); 00651 topic_counter_ = 0; 00652 } 00653 }
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::association_complete | ( | const DCPS::RepoId & | localId, | |
const DCPS::RepoId & | remoteId | |||
) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::bit_key_to_repo_id | ( | const char * | bit_topic_name, | |
const DDS::BuiltinTopicKey_t & | key | |||
) | [inline] |
Definition at line 191 of file DiscoveryBase.h.
00193 { 00194 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId()); 00195 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) { 00196 return pub_key_to_id_[key]; 00197 } 00198 if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) { 00199 return sub_key_to_id_[key]; 00200 } 00201 return RepoId(); 00202 }
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::defer_reader | ( | const RepoId & | writer, | |
const RepoId & | writer_participant | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::defer_writer | ( | const RepoId & | writer, | |
const RepoId & | writer_participant | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::disassociate | ( | const DiscoveredParticipantData & | pdata | ) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual DDS::DomainId_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_domain_id | ( | ) | const [inline, protected, virtual] |
Definition at line 749 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription().
static DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_key | ( | const DiscoveredSubscription & | sub | ) | [inline, static, protected] |
Definition at line 618 of file DiscoveryBase.h.
static DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_key | ( | const DiscoveredPublication & | pub | ) | [inline, static, protected] |
Definition at line 615 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_from_bit().
static const char* OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name | ( | const DiscoveredSubscription & | sub | ) | [inline, static, protected] |
Definition at line 612 of file DiscoveryBase.h.
static const char* OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name | ( | const DiscoveredPublication & | pub | ) | [inline, static, protected] |
Definition at line 609 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::has_dcps_key | ( | const DCPS::RepoId & | topicId | ) | const [inline, protected] |
Definition at line 1138 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_publication_key(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_subscription_key().
01139 { 01140 typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap; 01141 TNMap::const_iterator tn = topic_names_.find(topicId); 01142 if (tn == topic_names_.end()) return false; 01143 01144 typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap; 01145 typename TDMap::const_iterator td = topics_.find(tn->second); 01146 if (td == topics_.end()) return false; 01147 01148 return td->second.has_dcps_key_; 01149 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignore | ( | const DCPS::RepoId & | to_ignore | ) | [inline] |
Definition at line 204 of file DiscoveryBase.h.
Referenced by OpenDDS::RTPS::Spdp::init().
00205 { 00206 // Locked prior to call from Spdp. 00207 ignored_guids_.insert(to_ignore); 00208 { 00209 const DiscoveredPublicationIter iter = 00210 discovered_publications_.find(to_ignore); 00211 if (iter != discovered_publications_.end()) { 00212 // clean up tracking info 00213 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first); 00214 remove_from_bit(iter->second); 00215 OPENDDS_STRING topic_name = get_topic_name(iter->second); 00216 discovered_publications_.erase(iter); 00217 // break associations 00218 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00219 topics_.find(topic_name); 00220 if (top_it != topics_.end()) { 00221 match_endpoints(to_ignore, top_it->second, true /*remove*/); 00222 } 00223 return; 00224 } 00225 } 00226 { 00227 const DiscoveredSubscriptionIter iter = 00228 discovered_subscriptions_.find(to_ignore); 00229 if (iter != discovered_subscriptions_.end()) { 00230 // clean up tracking info 00231 topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first); 00232 remove_from_bit(iter->second); 00233 OPENDDS_STRING topic_name = get_topic_name(iter->second); 00234 discovered_subscriptions_.erase(iter); 00235 // break associations 00236 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00237 topics_.find(topic_name); 00238 if (top_it != topics_.end()) { 00239 match_endpoints(to_ignore, top_it->second, true /*remove*/); 00240 } 00241 return; 00242 } 00243 } 00244 { 00245 const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator 00246 iter = topic_names_.find(to_ignore); 00247 if (iter != topic_names_.end()) { 00248 ignored_topics_.insert(iter->second); 00249 // Remove all publications and subscriptions on this topic 00250 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00251 topics_.find(iter->second); 00252 if (top_it != topics_.end()) { 00253 TopicDetails& td = top_it->second; 00254 RepoIdSet::iterator ep; 00255 for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) { 00256 match_endpoints(*ep, td, true /*remove*/); 00257 if (shutting_down()) { return; } 00258 } 00259 } 00260 } 00261 } 00262 }
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring | ( | const char * | topic_name | ) | const [inline] |
Definition at line 267 of file DiscoveryBase.h.
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring | ( | const DCPS::RepoId & | guid | ) | const [inline] |
Definition at line 264 of file DiscoveryBase.h.
Referenced by OpenDDS::RTPS::Spdp::handle_participant_data().
00264 { 00265 return ignored_guids_.count(guid); 00266 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key | ( | DDS::BuiltinTopicKey_t & | key | ) | [inline, protected] |
Definition at line 1152 of file DiscoveryBase.h.
01153 { 01154 for (int idx = 0; idx < 3; ++idx) { 01155 CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]); 01156 if (ukey == 0xFFFFFFFF) { 01157 key.value[idx] = 0; 01158 } else { 01159 ++ukey; 01160 key.value[idx] = ukey; 01161 return; 01162 } 01163 } 01164 ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ") 01165 ACE_TEXT("ran out of builtin topic keys\n"))); 01166 }
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::is_opendds | ( | const GUID_t & | endpoint | ) | const [inline, protected, virtual] |
Definition at line 1090 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().
01091 { 01092 return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI, 01093 sizeof(DCPS::VENDORID_OCI)); 01094 }
RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid | ( | ) | [inline, protected] |
Definition at line 1129 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assert_topic().
01130 { 01131 RepoId guid; 01132 guid = participant_id_; 01133 guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC; 01134 assign_topic_key(guid); 01135 return guid; 01136 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match | ( | const RepoId & | writer, | |
const RepoId & | reader | |||
) | [inline, protected] |
Definition at line 755 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match_endpoints().
00756 { 00757 // 0. For discovered endpoints, we'll have the QoS info in the form of the 00758 // publication or subscription BIT data which doesn't use the same structures 00759 // for QoS. In those cases we can copy the individual QoS policies to temp 00760 // QoS structs: 00761 DDS::DataWriterQos tempDwQos; 00762 DDS::PublisherQos tempPubQos; 00763 DDS::DataReaderQos tempDrQos; 00764 DDS::SubscriberQos tempSubQos; 00765 ContentFilterProperty_t tempCfp; 00766 00767 // 1. collect details about the writer, which may be local or discovered 00768 const DDS::DataWriterQos* dwQos = 0; 00769 const DDS::PublisherQos* pubQos = 0; 00770 DCPS::TransportLocatorSeq* wTls = 0; 00771 00772 const LocalPublicationIter lpi = local_publications_.find(writer); 00773 DiscoveredPublicationIter dpi; 00774 bool writer_local = false, already_matched = false; 00775 if (lpi != local_publications_.end()) { 00776 writer_local = true; 00777 dwQos = &lpi->second.qos_; 00778 pubQos = &lpi->second.publisher_qos_; 00779 wTls = &lpi->second.trans_info_; 00780 already_matched = lpi->second.matched_endpoints_.count(reader); 00781 } else if ((dpi = discovered_publications_.find(writer)) 00782 != discovered_publications_.end()) { 00783 wTls = &dpi->second.writer_data_.writerProxy.allLocators; 00784 } else { 00785 return; // Possible and ok, since lock is released 00786 } 00787 00788 // 2. collect details about the reader, which may be local or discovered 00789 const DDS::DataReaderQos* drQos = 0; 00790 const DDS::SubscriberQos* subQos = 0; 00791 DCPS::TransportLocatorSeq* rTls = 0; 00792 const ContentFilterProperty_t* cfProp = 0; 00793 00794 const LocalSubscriptionIter lsi = local_subscriptions_.find(reader); 00795 DiscoveredSubscriptionIter dsi; 00796 bool reader_local = false; 00797 if (lsi != local_subscriptions_.end()) { 00798 reader_local = true; 00799 drQos = &lsi->second.qos_; 00800 subQos = &lsi->second.subscriber_qos_; 00801 rTls = &lsi->second.trans_info_; 00802 if (lsi->second.filterProperties.filterExpression[0] != 0) { 00803 tempCfp.filterExpression = lsi->second.filterProperties.filterExpression; 00804 tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters; 00805 } 00806 cfProp = &tempCfp; 00807 if (!already_matched) { 00808 already_matched = lsi->second.matched_endpoints_.count(writer); 00809 } 00810 } else if ((dsi = discovered_subscriptions_.find(reader)) 00811 != discovered_subscriptions_.end()) { 00812 if (!writer_local) { 00813 // this is a discovered/discovered match, nothing for us to do 00814 return; 00815 } 00816 rTls = &dsi->second.reader_data_.readerProxy.allLocators; 00817 00818 populate_transport_locator_sequence(rTls, dsi, reader); 00819 00820 const DDS::SubscriptionBuiltinTopicData& bit = 00821 dsi->second.reader_data_.ddsSubscriptionData; 00822 tempDrQos.durability = bit.durability; 00823 tempDrQos.deadline = bit.deadline; 00824 tempDrQos.latency_budget = bit.latency_budget; 00825 tempDrQos.liveliness = bit.liveliness; 00826 tempDrQos.reliability = bit.reliability; 00827 tempDrQos.destination_order = bit.destination_order; 00828 tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy(); 00829 tempDrQos.resource_limits = 00830 TheServiceParticipant->initial_ResourceLimitsQosPolicy(); 00831 tempDrQos.user_data = bit.user_data; 00832 tempDrQos.ownership = bit.ownership; 00833 tempDrQos.time_based_filter = bit.time_based_filter; 00834 tempDrQos.reader_data_lifecycle = 00835 TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy(); 00836 drQos = &tempDrQos; 00837 tempSubQos.presentation = bit.presentation; 00838 tempSubQos.partition = bit.partition; 00839 tempSubQos.group_data = bit.group_data; 00840 tempSubQos.entity_factory = 00841 TheServiceParticipant->initial_EntityFactoryQosPolicy(); 00842 subQos = &tempSubQos; 00843 cfProp = &dsi->second.reader_data_.contentFilterProperty; 00844 } else { 00845 return; // Possible and ok, since lock is released 00846 } 00847 00848 // This is really part of step 1, but we're doing it here just in case we 00849 // are in the discovered/discovered match and we don't need the QoS data. 00850 if (!writer_local) { 00851 const DDS::PublicationBuiltinTopicData& bit = 00852 dpi->second.writer_data_.ddsPublicationData; 00853 tempDwQos.durability = bit.durability; 00854 tempDwQos.durability_service = bit.durability_service; 00855 tempDwQos.deadline = bit.deadline; 00856 tempDwQos.latency_budget = bit.latency_budget; 00857 tempDwQos.liveliness = bit.liveliness; 00858 tempDwQos.reliability = bit.reliability; 00859 tempDwQos.destination_order = bit.destination_order; 00860 tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy(); 00861 tempDwQos.resource_limits = 00862 TheServiceParticipant->initial_ResourceLimitsQosPolicy(); 00863 tempDwQos.transport_priority = 00864 TheServiceParticipant->initial_TransportPriorityQosPolicy(); 00865 tempDwQos.lifespan = bit.lifespan; 00866 tempDwQos.user_data = bit.user_data; 00867 tempDwQos.ownership = bit.ownership; 00868 tempDwQos.ownership_strength = bit.ownership_strength; 00869 tempDwQos.writer_data_lifecycle = 00870 TheServiceParticipant->initial_WriterDataLifecycleQosPolicy(); 00871 dwQos = &tempDwQos; 00872 tempPubQos.presentation = bit.presentation; 00873 tempPubQos.partition = bit.partition; 00874 tempPubQos.group_data = bit.group_data; 00875 tempPubQos.entity_factory = 00876 TheServiceParticipant->initial_EntityFactoryQosPolicy(); 00877 pubQos = &tempPubQos; 00878 00879 populate_transport_locator_sequence(wTls, dpi, writer); 00880 } 00881 00882 // Need to release lock, below, for callbacks into DCPS which could 00883 // call into Spdp/Sedp. Note that this doesn't unlock, it just constructs 00884 // an ACE object which will be used below for unlocking. 00885 ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_); 00886 00887 // 3. check transport and QoS compatibility 00888 00889 // Copy entries from local publication and local subscription maps 00890 // prior to releasing lock 00891 DCPS::DataWriterCallbacks* dwr = 0; 00892 DCPS::DataReaderCallbacks* drr = 0; 00893 if (writer_local) { 00894 dwr = lpi->second.publication_; 00895 } 00896 if (reader_local) { 00897 drr = lsi->second.subscription_; 00898 } 00899 00900 DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()}; 00901 DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()}; 00902 00903 if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls, 00904 dwQos, drQos, pubQos, subQos)) { 00905 if (!writer_local) { 00906 RepoId writer_participant = writer; 00907 writer_participant.entityId = ENTITYID_PARTICIPANT; 00908 if (defer_writer(writer, writer_participant)) { 00909 return; 00910 } 00911 } 00912 if (!reader_local) { 00913 RepoId reader_participant = reader; 00914 reader_participant.entityId = ENTITYID_PARTICIPANT; 00915 if (defer_reader(reader, reader_participant)) { 00916 return; 00917 } 00918 } 00919 00920 bool call_writer = false, call_reader = false; 00921 if (writer_local) { 00922 call_writer = lpi->second.matched_endpoints_.insert(reader).second; 00923 } 00924 if (reader_local) { 00925 call_reader = lsi->second.matched_endpoints_.insert(writer).second; 00926 } 00927 if (!call_writer && !call_reader) { 00928 return; // nothing more to do 00929 } 00930 00931 #if defined(OPENDDS_SECURITY) 00932 if (is_security_enabled()) { 00933 if (call_reader) { 00934 RepoId writer_participant = writer; 00935 writer_participant.entityId = ENTITYID_PARTICIPANT; 00936 DatareaderCryptoHandleMap::const_iterator iter = local_reader_crypto_handles_.find(reader); 00937 if (iter != local_reader_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK 00938 DDS::Security::DatareaderCryptoHandle drch = iter->second; 00939 remote_writer_crypto_handles_[writer] = generate_remote_matched_writer_crypto_handle(writer_participant, drch); 00940 DatawriterCryptoTokenSeqMap::iterator t_iter = pending_remote_writer_crypto_tokens_.find(writer); 00941 if (t_iter != pending_remote_writer_crypto_tokens_.end()) { 00942 DDS::Security::SecurityException se; 00943 if (get_crypto_key_exchange()->set_remote_datawriter_crypto_tokens(iter->second, remote_writer_crypto_handles_[writer], t_iter->second, se) == false) { 00944 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00945 ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ") 00946 ACE_TEXT("Unable to set pending remote datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"), 00947 se.code, se.minor_code, se.message.in())); 00948 } 00949 pending_remote_writer_crypto_tokens_.erase(t_iter); 00950 } 00951 EndpointSecurityAttributesMap::const_iterator s_iter = local_reader_security_attribs_.find(reader); 00952 if (s_iter != local_reader_security_attribs_.end() && s_iter->second.is_submessage_protected) { // Yes, this is different for remote datawriters than readers (see 8.8.9.3 vs 8.8.9.2) 00953 create_and_send_datareader_crypto_tokens(drch, reader, remote_writer_crypto_handles_[writer], writer); 00954 } 00955 } 00956 } 00957 if (call_writer) { 00958 RepoId reader_participant = reader; 00959 reader_participant.entityId = ENTITYID_PARTICIPANT; 00960 DatawriterCryptoHandleMap::const_iterator iter = local_writer_crypto_handles_.find(writer); 00961 if (iter != local_writer_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK 00962 DDS::Security::DatawriterCryptoHandle dwch = iter->second; 00963 remote_reader_crypto_handles_[reader] = generate_remote_matched_reader_crypto_handle(reader_participant, dwch, (relay_only_readers_.count(reader) != 0)); 00964 DatareaderCryptoTokenSeqMap::iterator t_iter = pending_remote_reader_crypto_tokens_.find(reader); 00965 if (t_iter != pending_remote_reader_crypto_tokens_.end()) { 00966 DDS::Security::SecurityException se; 00967 if (get_crypto_key_exchange()->set_remote_datareader_crypto_tokens(iter->second, remote_reader_crypto_handles_[reader], t_iter->second, se) == false) { 00968 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ") 00969 ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ") 00970 ACE_TEXT("Unable to set pending remote datareader crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"), 00971 se.code, se.minor_code, se.message.in())); 00972 } 00973 pending_remote_reader_crypto_tokens_.erase(t_iter); 00974 } 00975 EndpointSecurityAttributesMap::const_iterator s_iter = local_writer_security_attribs_.find(writer); 00976 if (s_iter != local_writer_security_attribs_.end() && (s_iter->second.is_submessage_protected || s_iter->second.is_payload_protected)) { 00977 create_and_send_datawriter_crypto_tokens(dwch, writer, remote_reader_crypto_handles_[reader], reader); 00978 } 00979 } 00980 } 00981 } 00982 #endif 00983 00984 // Copy reader and writer association data prior to releasing lock 00985 #ifdef __SUNPRO_CC 00986 DCPS::ReaderAssociation ra; 00987 ra.readerTransInfo = *rTls; 00988 ra.readerId = reader; 00989 ra.subQos = *subQos; 00990 ra.readerQos = *drQos; 00991 ra.filterClassName = cfProp->filterClassName; 00992 ra.filterExpression = cfProp->filterExpression; 00993 ra.exprParams = cfProp->expressionParameters; 00994 DCPS::WriterAssociation wa; 00995 wa.writerTransInfo = *wTls; 00996 wa.writerId = writer; 00997 wa.pubQos = *pubQos; 00998 wa.writerQos = *dwQos; 00999 #else 01000 const DCPS::ReaderAssociation ra = 01001 {add_security_info(*rTls, writer, reader), reader, *subQos, *drQos, 01002 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC 01003 cfProp->filterClassName, cfProp->filterExpression, 01004 #else 01005 "", "", 01006 #endif 01007 cfProp->expressionParameters}; 01008 01009 const DCPS::WriterAssociation wa = 01010 {add_security_info(*wTls, writer, reader), writer, *pubQos, *dwQos}; 01011 #endif 01012 01013 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 01014 static const bool writer_active = true; 01015 01016 if (call_writer) { 01017 if (DCPS::DCPS_debug_level > 3) { 01018 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ") 01019 ACE_TEXT("adding writer association\n"))); 01020 } 01021 DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr); 01022 if (call_reader) { 01023 thr.activate(); 01024 } 01025 dwr->add_association(writer, ra, writer_active); 01026 if (call_reader) { 01027 thr.writer_done(); 01028 } 01029 01030 } else if (call_reader) { 01031 if (DCPS::DCPS_debug_level > 3) { 01032 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ") 01033 ACE_TEXT("adding reader association\n"))); 01034 } 01035 drr->add_association(reader, wa, !writer_active); 01036 } 01037 01038 // change this if 'writer_active' (above) changes 01039 if (call_writer && !call_reader && !is_opendds(reader)) { 01040 if (DCPS::DCPS_debug_level > 3) { 01041 ACE_DEBUG((LM_DEBUG, 01042 ACE_TEXT("(%P|%t) EndpointManager::match - ") 01043 ACE_TEXT("calling writer association_complete\n"))); 01044 } 01045 dwr->association_complete(reader); 01046 } 01047 01048 } else if (already_matched) { // break an existing associtaion 01049 if (writer_local) { 01050 lpi->second.matched_endpoints_.erase(reader); 01051 lpi->second.remote_opendds_associations_.erase(reader); 01052 } 01053 if (reader_local) { 01054 lsi->second.matched_endpoints_.erase(writer); 01055 lsi->second.remote_opendds_associations_.erase(writer); 01056 } 01057 ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock); 01058 if (writer_local) { 01059 DCPS::ReaderIdSeq reader_seq(1); 01060 reader_seq.length(1); 01061 reader_seq[0] = reader; 01062 dwr->remove_associations(reader_seq, false /*notify_lost*/); 01063 } 01064 if (reader_local) { 01065 DCPS::WriterIdSeq writer_seq(1); 01066 writer_seq.length(1); 01067 writer_seq[0] = writer; 01068 drr->remove_associations(writer_seq, false /*notify_lost*/); 01069 } 01070 01071 } else { // something was incompatible 01072 ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock); 01073 if (writer_local && writerStatus.count_since_last_send) { 01074 if (DCPS::DCPS_debug_level > 3) { 01075 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ") 01076 ACE_TEXT("writer incompatible\n"))); 01077 } 01078 dwr->update_incompatible_qos(writerStatus); 01079 } 01080 if (reader_local && readerStatus.count_since_last_send) { 01081 if (DCPS::DCPS_debug_level > 3) { 01082 ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ") 01083 ACE_TEXT("reader incompatible\n"))); 01084 } 01085 drr->update_incompatible_qos(readerStatus); 01086 } 01087 } 01088 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints | ( | DCPS::RepoId | repoId, | |
const TopicDetails & | td, | |||
bool | remove = false | |||
) | [inline, protected] |
Definition at line 669 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription(), and OpenDDS::RTPS::Sedp::Task::svc_i().
00671 { 00672 const bool reader = repoId.entityId.entityKind & 4; 00673 // Copy the endpoint set - lock can be released in match() 00674 RepoIdSet endpoints_copy = td.endpoints_; 00675 00676 for (RepoIdSet::const_iterator iter = endpoints_copy.begin(); 00677 iter != endpoints_copy.end(); ++iter) { 00678 // check to make sure it's a Reader/Writer or Writer/Reader match 00679 if (bool(iter->entityId.entityKind & 4) != reader) { 00680 if (remove) { 00681 remove_assoc(*iter, repoId); 00682 } else { 00683 match(reader ? *iter : repoId, reader ? repoId : *iter); 00684 } 00685 } 00686 } 00687 }
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP | ( | OPENDDS_STRING | , | |
TopicDetails | ||||
) | [protected] |
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assert_topic(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::has_dcps_key(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_topic().
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
OPENDDS_STRING | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
LocalSubscription | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
LocalPublication | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DDS::BuiltinTopicKey_t | , | |
DCPS::RepoId | , | |||
DCPS::BuiltinTopicKeyLess | ||||
) | [protected] |
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
DiscoveredPublication | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP | ( | DCPS::RepoId | , | |
DiscoveredSubscription | , | |||
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::has_dcps_key(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_SET | ( | OPENDDS_STRING | ) | [protected] |
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_SET_CMP | ( | DCPS::RepoId | , | |
DCPS::GUID_tKeyLessThan | ||||
) | [protected] |
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::populate_transport_locator_sequence | ( | DCPS::TransportLocatorSeq *& | tls, | |
DiscoveredPublicationIter & | iter, | |||
const RepoId & | reader | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::populate_transport_locator_sequence | ( | DCPS::TransportLocatorSeq *& | tls, | |
DiscoveredSubscriptionIter & | iter, | |||
const RepoId & | reader | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_assoc | ( | const RepoId & | remove_from, | |
const RepoId & | removing | |||
) | [inline, protected] |
Definition at line 690 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match_endpoints().
00692 { 00693 const bool reader = remove_from.entityId.entityKind & 4; 00694 if (reader) { 00695 const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from); 00696 if (lsi != local_subscriptions_.end()) { 00697 lsi->second.matched_endpoints_.erase(removing); 00698 DCPS::WriterIdSeq writer_seq(1); 00699 writer_seq.length(1); 00700 writer_seq[0] = removing; 00701 lsi->second.remote_opendds_associations_.erase(removing); 00702 lsi->second.subscription_->remove_associations(writer_seq, 00703 false /*notify_lost*/); 00704 // Update writer 00705 write_subscription_data(remove_from, lsi->second); 00706 } 00707 00708 } else { 00709 const LocalPublicationIter lpi = local_publications_.find(remove_from); 00710 if (lpi != local_publications_.end()) { 00711 lpi->second.matched_endpoints_.erase(removing); 00712 DCPS::ReaderIdSeq reader_seq(1); 00713 reader_seq.length(1); 00714 reader_seq[0] = removing; 00715 lpi->second.remote_opendds_associations_.erase(removing); 00716 lpi->second.publication_->remove_associations(reader_seq, 00717 false /*notify_lost*/); 00718 } 00719 } 00720 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit | ( | const DiscoveredSubscription & | sub | ) | [inline, protected] |
Definition at line 1123 of file DiscoveryBase.h.
01124 { 01125 sub_key_to_id_.erase(get_key(sub)); 01126 remove_from_bit_i(sub); 01127 }
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit | ( | const DiscoveredPublication & | pub | ) | [inline, protected] |
Definition at line 1117 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().
01118 { 01119 pub_key_to_id_.erase(get_key(pub)); 01120 remove_from_bit_i(pub); 01121 }
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit_i | ( | const DiscoveredSubscription & | ) | [inline, protected, virtual] |
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit_i | ( | const DiscoveredPublication & | ) | [inline, protected, virtual] |
Reimplemented in OpenDDS::RTPS::Sedp.
Definition at line 622 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_from_bit().
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_publication | ( | const DCPS::RepoId & | publicationId | ) | [inline] |
Definition at line 405 of file DiscoveryBase.h.
00406 { 00407 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00408 LocalPublicationIter iter = local_publications_.find(publicationId); 00409 if (iter != local_publications_.end()) { 00410 if (DDS::RETCODE_OK == remove_publication_i(publicationId)) 00411 { 00412 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_]; 00413 local_publications_.erase(publicationId); 00414 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00415 topics_.find(topic_name); 00416 if (top_it != topics_.end()) { 00417 match_endpoints(publicationId, top_it->second, true /*remove*/); 00418 top_it->second.endpoints_.erase(publicationId); 00419 } 00420 } else { 00421 ACE_ERROR((LM_ERROR, 00422 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ") 00423 ACE_TEXT("Failed to publish dispose msg\n"))); 00424 } 00425 } 00426 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_publication_i | ( | const RepoId & | publicationId | ) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication().
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_subscription | ( | const DCPS::RepoId & | subscriptionId | ) | [inline] |
Definition at line 526 of file DiscoveryBase.h.
00527 { 00528 ACE_GUARD(ACE_Thread_Mutex, g, lock_); 00529 LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId); 00530 if (iter != local_subscriptions_.end()) { 00531 if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId) 00532 /*subscriptions_writer_.write_unregister_dispose(subscriptionId)*/) { 00533 OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_]; 00534 local_subscriptions_.erase(subscriptionId); 00535 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00536 topics_.find(topic_name); 00537 if (top_it != topics_.end()) { 00538 match_endpoints(subscriptionId, top_it->second, true /*remove*/); 00539 top_it->second.endpoints_.erase(subscriptionId); 00540 } 00541 } else { 00542 ACE_ERROR((LM_ERROR, 00543 ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ") 00544 ACE_TEXT("Failed to publish dispose msg\n"))); 00545 } 00546 } 00547 }
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_subscription_i | ( | const RepoId & | subscriptionId | ) | [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription().
DCPS::TopicStatus OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_topic | ( | const RepoId & | topicId, | |
OPENDDS_STRING & | name | |||
) | [inline] |
Definition at line 297 of file DiscoveryBase.h.
00298 { 00299 ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR); 00300 name = topic_names_[topicId]; 00301 typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it = 00302 topics_.find(name); 00303 if (top_it != topics_.end()) { 00304 TopicDetails& td = top_it->second; 00305 if (td.endpoints_.empty()) { 00306 topics_.erase(name); 00307 } 00308 } 00309 00310 topic_names_.erase(topicId); 00311 return DCPS::REMOVED; 00312 }
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::shutting_down | ( | ) | const [protected, pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_publication_qos | ( | const DCPS::RepoId & | publicationId, | |
const DDS::DataWriterQos & | qos, | |||
const DDS::PublisherQos & | publisherQos | |||
) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_subscription_params | ( | const DCPS::RepoId & | subId, | |
const DDS::StringSeq & | params | |||
) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_subscription_qos | ( | const DCPS::RepoId & | subscriptionId, | |
const DDS::DataReaderQos & | qos, | |||
const DDS::SubscriberQos & | subscriberQos | |||
) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_topic_qos | ( | const DCPS::RepoId & | topicId, | |
const DDS::TopicQos & | qos, | |||
OPENDDS_STRING & | name | |||
) | [pure virtual] |
Implemented in OpenDDS::RTPS::Sedp, and OpenDDS::DCPS::StaticEndpointManager.
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::write_publication_data | ( | const DCPS::RepoId & | , | |
LocalPublication & | , | |||
const DCPS::RepoId & | reader = DCPS::GUID_UNKNOWN | |||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::RTPS::Sedp.
Definition at line 657 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().
00659 { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::write_subscription_data | ( | const DCPS::RepoId & | , | |
LocalSubscription & | , | |||
const DCPS::RepoId & | reader = DCPS::GUID_UNKNOWN | |||
) | [inline, protected, virtual] |
Reimplemented in OpenDDS::RTPS::Sedp.
Definition at line 664 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_assoc().
00666 { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
DiscoveredPublicationMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_ [protected] |
DiscoveredSubscriptionMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_ [protected] |
RepoIdSet OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignored_guids_ [protected] |
Definition at line 1218 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignoring().
LocalPublicationMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_ [protected] |
Definition at line 1220 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_assoc(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication().
LocalSubscriptionMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_ [protected] |
Definition at line 1221 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_assoc(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription().
ACE_Thread_Mutex& OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_ [protected] |
Definition at line 1215 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assert_topic(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::bit_key_to_repo_id(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_topic(), and OpenDDS::RTPS::Sedp::Task::svc_i().
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_ [protected] |
Definition at line 1216 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::make_topic_guid().
DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_ [protected] |
Definition at line 1228 of file DiscoveryBase.h.
BitKeyMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_ [protected] |
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::publication_counter_ [protected] |
Definition at line 1219 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_publication_key().
DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_ [protected] |
Definition at line 1228 of file DiscoveryBase.h.
BitKeyMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_ [protected] |
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::subscription_counter_ [protected] |
Definition at line 1219 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_subscription_key().
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_counter_ [protected] |
Definition at line 1219 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_topic_key().
TopicNameMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_ [protected] |
Definition at line 1225 of file DiscoveryBase.h.
Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assert_topic(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::has_dcps_key(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_topic().