OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ > Class Template Reference

#include <DiscoveryBase.h>

Collaboration diagram for OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >:
Collaboration graph
[legend]

List of all members.

Classes

struct  DiscoveredPublication
struct  DiscoveredSubscription
struct  LocalEndpoint
struct  LocalPublication
struct  LocalSubscription
struct  TopicDetails

Public Types

typedef DiscoveredParticipantData_ DiscoveredParticipantData

Public Member Functions

 EndpointManager (const RepoId &participant_id, ACE_Thread_Mutex &lock)
virtual ~EndpointManager ()
RepoId bit_key_to_repo_id (const char *bit_topic_name, const DDS::BuiltinTopicKey_t &key)
void ignore (const DCPS::RepoId &to_ignore)
bool ignoring (const DCPS::RepoId &guid) const
bool ignoring (const char *topic_name) const
DCPS::TopicStatus assert_topic (DCPS::RepoId_out topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey)
DCPS::TopicStatus remove_topic (const RepoId &topicId, OPENDDS_STRING &name)
virtual bool update_topic_qos (const DCPS::RepoId &topicId, const DDS::TopicQos &qos, OPENDDS_STRING &name)=0
DCPS::RepoId add_publication (const DCPS::RepoId &topicId, DCPS::DataWriterCallbacks *publication, const DDS::DataWriterQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos)
void remove_publication (const DCPS::RepoId &publicationId)
virtual bool update_publication_qos (const DCPS::RepoId &publicationId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)=0
DCPS::RepoId add_subscription (const DCPS::RepoId &topicId, DCPS::DataReaderCallbacks *subscription, const DDS::DataReaderQos &qos, const DCPS::TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params)
void remove_subscription (const DCPS::RepoId &subscriptionId)
virtual bool update_subscription_qos (const DCPS::RepoId &subscriptionId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)=0
virtual bool update_subscription_params (const DCPS::RepoId &subId, const DDS::StringSeq &params)=0
virtual void association_complete (const DCPS::RepoId &localId, const DCPS::RepoId &remoteId)=0
virtual bool disassociate (const DiscoveredParticipantData &pdata)=0

Protected Types

typedef
DiscoveredSubscriptionMap::iterator 
DiscoveredSubscriptionIter
typedef
DiscoveredPublicationMap::iterator 
DiscoveredPublicationIter
typedef
LocalPublicationMap::iterator 
LocalPublicationIter
typedef
LocalPublicationMap::const_iterator 
LocalPublicationCIter
typedef
LocalSubscriptionMap::iterator 
LocalSubscriptionIter
typedef
LocalSubscriptionMap::const_iterator 
LocalSubscriptionCIter

Protected Member Functions

typedef OPENDDS_MAP_CMP (DCPS::RepoId, DiscoveredSubscription, DCPS::GUID_tKeyLessThan) DiscoveredSubscriptionMap
typedef OPENDDS_MAP_CMP (DCPS::RepoId, DiscoveredPublication, DCPS::GUID_tKeyLessThan) DiscoveredPublicationMap
typedef OPENDDS_MAP_CMP (DDS::BuiltinTopicKey_t, DCPS::RepoId, DCPS::BuiltinTopicKeyLess) BitKeyMap
typedef OPENDDS_MAP_CMP (DCPS::RepoId, LocalPublication, DCPS::GUID_tKeyLessThan) LocalPublicationMap
typedef OPENDDS_MAP_CMP (DCPS::RepoId, LocalSubscription, DCPS::GUID_tKeyLessThan) LocalSubscriptionMap
typedef OPENDDS_MAP_CMP (DCPS::RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TopicNameMap
virtual void remove_from_bit_i (const DiscoveredPublication &)
virtual void remove_from_bit_i (const DiscoveredSubscription &)
virtual void assign_publication_key (RepoId &rid, const RepoId &topicId, const DDS::DataWriterQos &)
virtual void assign_subscription_key (RepoId &rid, const RepoId &topicId, const DDS::DataReaderQos &)
virtual void assign_topic_key (RepoId &guid)
virtual DDS::ReturnCode_t add_publication_i (const DCPS::RepoId &, LocalPublication &)
virtual DDS::ReturnCode_t write_publication_data (const DCPS::RepoId &, LocalPublication &, const DCPS::RepoId &reader=DCPS::GUID_UNKNOWN)
virtual DDS::ReturnCode_t remove_publication_i (const RepoId &publicationId)=0
virtual DDS::ReturnCode_t add_subscription_i (const DCPS::RepoId &, LocalSubscription &)
virtual DDS::ReturnCode_t write_subscription_data (const DCPS::RepoId &, LocalSubscription &, const DCPS::RepoId &reader=DCPS::GUID_UNKNOWN)
virtual DDS::ReturnCode_t remove_subscription_i (const RepoId &subscriptionId)=0
void match_endpoints (DCPS::RepoId repoId, const TopicDetails &td, bool remove=false)
void remove_assoc (const RepoId &remove_from, const RepoId &removing)
virtual DDS::DomainId_t get_domain_id () const
void match (const RepoId &writer, const RepoId &reader)
virtual bool is_opendds (const GUID_t &endpoint) const
virtual bool shutting_down () const =0
virtual void populate_transport_locator_sequence (DCPS::TransportLocatorSeq *&tls, DiscoveredSubscriptionIter &iter, const RepoId &reader)=0
virtual void populate_transport_locator_sequence (DCPS::TransportLocatorSeq *&tls, DiscoveredPublicationIter &iter, const RepoId &reader)=0
virtual DCPS::TransportLocatorSeq add_security_info (const DCPS::TransportLocatorSeq &locators, const RepoId &, const RepoId &)
virtual bool defer_writer (const RepoId &writer, const RepoId &writer_participant)=0
virtual bool defer_reader (const RepoId &writer, const RepoId &writer_participant)=0
void remove_from_bit (const DiscoveredPublication &pub)
void remove_from_bit (const DiscoveredSubscription &sub)
RepoId make_topic_guid ()
bool has_dcps_key (const DCPS::RepoId &topicId) const
void increment_key (DDS::BuiltinTopicKey_t &key)
 OPENDDS_MAP (OPENDDS_STRING, TopicDetails) topics_
 OPENDDS_SET (OPENDDS_STRING) ignored_topics_
 OPENDDS_SET_CMP (DCPS::RepoId, DCPS::GUID_tKeyLessThan) relay_only_readers_

Static Protected Member Functions

static const char * get_topic_name (const DiscoveredPublication &pub)
static const char * get_topic_name (const DiscoveredSubscription &sub)
static DDS::BuiltinTopicKey_t get_key (const DiscoveredPublication &pub)
static DDS::BuiltinTopicKey_t get_key (const DiscoveredSubscription &sub)

Protected Attributes

ACE_Thread_Mutexlock_
DCPS::RepoId participant_id_
BitKeyMap pub_key_to_id_
BitKeyMap sub_key_to_id_
RepoIdSet ignored_guids_
unsigned int publication_counter_
unsigned int subscription_counter_
unsigned int topic_counter_
LocalPublicationMap local_publications_
LocalSubscriptionMap local_subscriptions_
DiscoveredPublicationMap discovered_publications_
DiscoveredSubscriptionMap discovered_subscriptions_
TopicNameMap topic_names_
DDS::BuiltinTopicKey_t pub_bit_key_
DDS::BuiltinTopicKey_t sub_bit_key_

Detailed Description

template<typename DiscoveredParticipantData_>
class OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >

Definition at line 109 of file DiscoveryBase.h.


Member Typedef Documentation

template<typename DiscoveredParticipantData_>
typedef DiscoveredParticipantData_ OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredParticipantData

Definition at line 164 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef DiscoveredPublicationMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredPublicationIter [protected]

Definition at line 161 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef DiscoveredSubscriptionMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::DiscoveredSubscriptionIter [protected]

Definition at line 136 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef LocalPublicationMap::const_iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalPublicationCIter [protected]

Definition at line 600 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef LocalPublicationMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalPublicationIter [protected]

Definition at line 599 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef LocalSubscriptionMap::const_iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalSubscriptionCIter [protected]

Definition at line 605 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
typedef LocalSubscriptionMap::iterator OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::LocalSubscriptionIter [protected]

Definition at line 604 of file DiscoveryBase.h.


Constructor & Destructor Documentation

template<typename DiscoveredParticipantData_>
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::EndpointManager ( const RepoId participant_id,
ACE_Thread_Mutex lock 
) [inline]

Definition at line 174 of file DiscoveryBase.h.

00175         : lock_(lock)
00176         , participant_id_(participant_id)
00177         , publication_counter_(0)
00178         , subscription_counter_(0)
00179         , topic_counter_(0)
00180 
00181 #if defined(OPENDDS_SECURITY)
00182         , permissions_handle_(DDS::HANDLE_NIL)
00183 #endif
00184 
00185       {
00186 
00187       }

template<typename DiscoveredParticipantData_>
virtual OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::~EndpointManager (  )  [inline, virtual]

Definition at line 189 of file DiscoveryBase.h.

00189 { }


Member Function Documentation

template<typename DiscoveredParticipantData_>
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_publication ( const DCPS::RepoId topicId,
DCPS::DataWriterCallbacks publication,
const DDS::DataWriterQos qos,
const DCPS::TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos 
) [inline]

Definition at line 317 of file DiscoveryBase.h.

00322       {
00323         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00324 
00325         RepoId rid = participant_id_;
00326         assign_publication_key(rid, topicId, qos);
00327         LocalPublication& pb = local_publications_[rid];
00328         pb.topic_id_ = topicId;
00329         pb.publication_ = publication;
00330         pb.qos_ = qos;
00331         pb.trans_info_ = transInfo;
00332         pb.publisher_qos_ = publisherQos;
00333 
00334         const std::string& topic_name = topic_names_[topicId];
00335 
00336 #if defined(OPENDDS_SECURITY)
00337         if (is_security_enabled()) {
00338           DDS::Security::SecurityException ex;
00339 
00340           DDS::Security::TopicSecurityAttributes topic_sec_attr;
00341           if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00342             ACE_ERROR((LM_ERROR,
00343               ACE_TEXT("(%P|%t) ERROR: ")
00344               ACE_TEXT("DomainParticipant::add_publication, ")
00345               ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00346                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00347             return RepoId();
00348           }
00349 
00350           if (topic_sec_attr.is_write_protected == true) {
00351             if (!get_access_control()->check_create_datawriter(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00352                                                                publisherQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00353               ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00354                 ACE_TEXT("EndpointManager::add_publication() - ")
00355                 ACE_TEXT("Permissions check failed for local datawriter on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00356                   ex.code, ex.minor_code, ex.message.in()));
00357               return RepoId();
00358             }
00359           }
00360 
00361           if (!get_access_control()->get_datawriter_sec_attributes(get_permissions_handle(), topic_name.data(),
00362               publisherQos.partition, DDS::Security::DataTagQosPolicy(), pb.security_attribs_, ex)) {
00363             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00364                        ACE_TEXT("EndpointManager::add_publication() - ")
00365                        ACE_TEXT("Unable to get security attributes for local datawriter. Security Exception[%d.%d]: %C\n"),
00366                        ex.code, ex.minor_code, ex.message.in()));
00367             return RepoId();
00368           }
00369 
00370           if (pb.security_attribs_.is_submessage_protected || pb.security_attribs_.is_payload_protected) {
00371             DDS::Security::DatawriterCryptoHandle handle = get_crypto_key_factory()->register_local_datawriter(crypto_handle_, DDS::PropertySeq(), pb.security_attribs_, ex);
00372             if (handle == DDS::HANDLE_NIL) {
00373               ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00374                          ACE_TEXT("EndpointManager::add_publication() - ")
00375                          ACE_TEXT("Unable to get local datawriter crypto handle. Security Exception[%d.%d]: %C\n"),
00376                          ex.code, ex.minor_code, ex.message.in()));
00377             }
00378 
00379             local_writer_crypto_handles_[rid] = handle;
00380             local_writer_security_attribs_[rid] = pb.security_attribs_;
00381           }
00382         }
00383 #endif
00384 
00385         TopicDetails& td = topics_[topic_name];
00386         td.endpoints_.insert(rid);
00387 
00388         if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
00389           return RepoId();
00390         }
00391 
00392         if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
00393           return RepoId();
00394         }
00395 
00396         if (DCPS::DCPS_debug_level > 3) {
00397           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ")
00398                      ACE_TEXT("calling match_endpoints\n")));
00399         }
00400         match_endpoints(rid, td);
00401 
00402         return rid;
00403       }

template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_publication_i ( const DCPS::RepoId ,
LocalPublication  
) [inline, protected, virtual]

Reimplemented in OpenDDS::DCPS::StaticEndpointManager.

Definition at line 655 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().

00656                                                                              { return DDS::RETCODE_OK; }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual DCPS::TransportLocatorSeq OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_security_info ( const DCPS::TransportLocatorSeq locators,
const RepoId ,
const RepoId  
) [inline, protected, virtual]

Definition at line 1107 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().

01109       { return locators; }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_subscription ( const DCPS::RepoId topicId,
DCPS::DataReaderCallbacks subscription,
const DDS::DataReaderQos qos,
const DCPS::TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpr,
const DDS::StringSeq params 
) [inline]

Definition at line 432 of file DiscoveryBase.h.

00440       {
00441         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00442 
00443         RepoId rid = participant_id_;
00444         assign_subscription_key(rid, topicId, qos);
00445         LocalSubscription& sb = local_subscriptions_[rid];
00446         sb.topic_id_ = topicId;
00447         sb.subscription_ = subscription;
00448         sb.qos_ = qos;
00449         sb.trans_info_ = transInfo;
00450         sb.subscriber_qos_ = subscriberQos;
00451         sb.filterProperties.filterClassName = filterClassName;
00452         sb.filterProperties.filterExpression = filterExpr;
00453         sb.filterProperties.expressionParameters = params;
00454 
00455         const std::string& topic_name = topic_names_[topicId];
00456 
00457 #if defined(OPENDDS_SECURITY)
00458         if (is_security_enabled()) {
00459           DDS::Security::SecurityException ex;
00460 
00461           DDS::Security::TopicSecurityAttributes topic_sec_attr;
00462           if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00463             ACE_ERROR((LM_ERROR,
00464               ACE_TEXT("(%P|%t) ERROR: ")
00465               ACE_TEXT("DomainParticipant::add_subscription, ")
00466               ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00467                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00468             return RepoId();
00469           }
00470 
00471           if (topic_sec_attr.is_read_protected == true) {
00472             if (!get_access_control()->check_create_datareader(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00473                                                                subscriberQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00474               ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00475                 ACE_TEXT("EndpointManager::add_subscription() - ")
00476                 ACE_TEXT("Permissions check failed for local datareader on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00477                   ex.code, ex.minor_code, ex.message.in()));
00478               return RepoId();
00479             }
00480           }
00481 
00482           if (!get_access_control()->get_datareader_sec_attributes(get_permissions_handle(), topic_name.data(),
00483               subscriberQos.partition, DDS::Security::DataTagQosPolicy(), sb.security_attribs_, ex)) {
00484             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00485                        ACE_TEXT("EndpointManager::add_subscription() - ")
00486                        ACE_TEXT("Unable to get security attributes for local datareader. Security Exception[%d.%d]: %C\n"),
00487                        ex.code, ex.minor_code, ex.message.in()));
00488             return RepoId();
00489           }
00490 
00491           if (sb.security_attribs_.is_submessage_protected || sb.security_attribs_.is_payload_protected) {
00492             DDS::Security::DatareaderCryptoHandle handle = get_crypto_key_factory()->register_local_datareader(crypto_handle_, DDS::PropertySeq(), sb.security_attribs_, ex);
00493             if (handle == DDS::HANDLE_NIL) {
00494               ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00495                          ACE_TEXT("EndpointManager::add_subscription() - ")
00496                          ACE_TEXT("Unable to get local datareader crypto handle. Security Exception[%d.%d]: %C\n"),
00497                          ex.code, ex.minor_code, ex.message.in()));
00498             }
00499 
00500             local_reader_crypto_handles_[rid] = handle;
00501             local_reader_security_attribs_[rid] = sb.security_attribs_;
00502           }
00503         }
00504 #endif
00505 
00506         TopicDetails& td = topics_[topic_name];
00507         td.endpoints_.insert(rid);
00508 
00509         if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
00510           return RepoId();
00511         }
00512 
00513         if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
00514           return RepoId();
00515         }
00516 
00517         if (DCPS::DCPS_debug_level > 3) {
00518           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ")
00519                      ACE_TEXT("calling match_endpoints\n")));
00520         }
00521         match_endpoints(rid, td);
00522 
00523         return rid;
00524       }

template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::add_subscription_i ( const DCPS::RepoId ,
LocalSubscription  
) [inline, protected, virtual]

Reimplemented in OpenDDS::DCPS::StaticEndpointManager.

Definition at line 662 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription().

00663                                                                                { return DDS::RETCODE_OK; };

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
DCPS::TopicStatus OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assert_topic ( DCPS::RepoId_out  topicId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos,
bool  hasDcpsKey 
) [inline]

Definition at line 271 of file DiscoveryBase.h.

00274       {
00275         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00276         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
00277           topics_.find(topicName);
00278         if (iter != topics_.end()) { // types must match, RtpsDiscovery checked for us
00279           iter->second.qos_ = qos;
00280           iter->second.has_dcps_key_ = hasDcpsKey;
00281           topicId = iter->second.repo_id_;
00282           topic_names_[iter->second.repo_id_] = topicName;
00283           return DCPS::FOUND;
00284         }
00285 
00286         TopicDetails& td = topics_[topicName];
00287         td.data_type_ = dataTypeName;
00288         td.qos_ = qos;
00289         td.has_dcps_key_ = hasDcpsKey;
00290         td.repo_id_ = make_topic_guid();
00291         topicId = td.repo_id_;
00292         topic_names_[td.repo_id_] = topicName;
00293 
00294         return DCPS::CREATED;
00295       }

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_publication_key ( RepoId rid,
const RepoId topicId,
const DDS::DataWriterQos  
) [inline, protected, virtual]

Reimplemented in OpenDDS::DCPS::StaticEndpointManager.

Definition at line 625 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().

00627                                                                            {
00628         rid.entityId.entityKind =
00629           has_dcps_key(topicId)
00630           ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY
00631           : DCPS::ENTITYKIND_USER_WRITER_NO_KEY;
00632         assign(rid.entityId.entityKey, publication_counter_++);
00633       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_subscription_key ( RepoId rid,
const RepoId topicId,
const DDS::DataReaderQos  
) [inline, protected, virtual]

Reimplemented in OpenDDS::DCPS::StaticEndpointManager.

Definition at line 634 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription().

00636                                                                             {
00637         rid.entityId.entityKind =
00638           has_dcps_key(topicId)
00639           ? DCPS::ENTITYKIND_USER_READER_WITH_KEY
00640           : DCPS::ENTITYKIND_USER_READER_NO_KEY;
00641         assign(rid.entityId.entityKey, subscription_counter_++);
00642       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::assign_topic_key ( RepoId guid  )  [inline, protected, virtual]

Definition at line 643 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::make_topic_guid().

00643                                                   {
00644         assign(guid.entityId.entityKey, topic_counter_++);
00645 
00646         if (topic_counter_ == 0x1000000) {
00647           ACE_ERROR((LM_ERROR,
00648                      ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ")
00649                      ACE_TEXT("Exceeded Maximum number of topic entity keys!")
00650                      ACE_TEXT("Next key will be a duplicate!\n")));
00651           topic_counter_ = 0;
00652         }
00653       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::association_complete ( const DCPS::RepoId localId,
const DCPS::RepoId remoteId 
) [pure virtual]
template<typename DiscoveredParticipantData_>
RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::bit_key_to_repo_id ( const char *  bit_topic_name,
const DDS::BuiltinTopicKey_t key 
) [inline]

Definition at line 191 of file DiscoveryBase.h.

00193       {
00194         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00195         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) {
00196           return pub_key_to_id_[key];
00197         }
00198         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) {
00199           return sub_key_to_id_[key];
00200         }
00201         return RepoId();
00202       }

template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::defer_reader ( const RepoId writer,
const RepoId writer_participant 
) [protected, pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::defer_writer ( const RepoId writer,
const RepoId writer_participant 
) [protected, pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::disassociate ( const DiscoveredParticipantData pdata  )  [pure virtual]
template<typename DiscoveredParticipantData_>
virtual DDS::DomainId_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_domain_id (  )  const [inline, protected, virtual]
template<typename DiscoveredParticipantData_>
static DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_key ( const DiscoveredSubscription sub  )  [inline, static, protected]

Definition at line 618 of file DiscoveryBase.h.

00618                                                                              {
00619         return sub.reader_data_.ddsSubscriptionData.key;
00620       }

template<typename DiscoveredParticipantData_>
static DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_key ( const DiscoveredPublication pub  )  [inline, static, protected]

Definition at line 615 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_from_bit().

00615                                                                             {
00616         return pub.writer_data_.ddsPublicationData.key;
00617       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
static const char* OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name ( const DiscoveredSubscription sub  )  [inline, static, protected]

Definition at line 612 of file DiscoveryBase.h.

00612                                                                            {
00613         return sub.reader_data_.ddsSubscriptionData.topic_name;
00614       }

template<typename DiscoveredParticipantData_>
static const char* OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::get_topic_name ( const DiscoveredPublication pub  )  [inline, static, protected]

Definition at line 609 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().

00609                                                                           {
00610         return pub.writer_data_.ddsPublicationData.topic_name;
00611       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::has_dcps_key ( const DCPS::RepoId topicId  )  const [inline, protected]

Definition at line 1138 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_publication_key(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assign_subscription_key().

01139       {
01140         typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap;
01141         TNMap::const_iterator tn = topic_names_.find(topicId);
01142         if (tn == topic_names_.end()) return false;
01143 
01144         typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap;
01145         typename TDMap::const_iterator td = topics_.find(tn->second);
01146         if (td == topics_.end()) return false;
01147 
01148         return td->second.has_dcps_key_;
01149       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignore ( const DCPS::RepoId to_ignore  )  [inline]

Definition at line 204 of file DiscoveryBase.h.

Referenced by OpenDDS::RTPS::Spdp::init().

00205       {
00206         // Locked prior to call from Spdp.
00207         ignored_guids_.insert(to_ignore);
00208         {
00209           const DiscoveredPublicationIter iter =
00210             discovered_publications_.find(to_ignore);
00211           if (iter != discovered_publications_.end()) {
00212             // clean up tracking info
00213             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00214             remove_from_bit(iter->second);
00215             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00216             discovered_publications_.erase(iter);
00217             // break associations
00218             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00219               topics_.find(topic_name);
00220             if (top_it != topics_.end()) {
00221               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00222             }
00223             return;
00224           }
00225         }
00226         {
00227           const DiscoveredSubscriptionIter iter =
00228             discovered_subscriptions_.find(to_ignore);
00229           if (iter != discovered_subscriptions_.end()) {
00230             // clean up tracking info
00231             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00232             remove_from_bit(iter->second);
00233             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00234             discovered_subscriptions_.erase(iter);
00235             // break associations
00236             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00237               topics_.find(topic_name);
00238             if (top_it != topics_.end()) {
00239               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00240             }
00241             return;
00242           }
00243         }
00244         {
00245           const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator
00246             iter = topic_names_.find(to_ignore);
00247           if (iter != topic_names_.end()) {
00248             ignored_topics_.insert(iter->second);
00249             // Remove all publications and subscriptions on this topic
00250             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00251               topics_.find(iter->second);
00252             if (top_it != topics_.end()) {
00253               TopicDetails& td = top_it->second;
00254               RepoIdSet::iterator ep;
00255               for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) {
00256                 match_endpoints(*ep, td, true /*remove*/);
00257                 if (shutting_down()) { return; }
00258               }
00259             }
00260           }
00261         }
00262       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring ( const char *  topic_name  )  const [inline]

Definition at line 267 of file DiscoveryBase.h.

00267                                                   {
00268         return ignored_topics_.count(topic_name);
00269       }

template<typename DiscoveredParticipantData_>
bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignoring ( const DCPS::RepoId guid  )  const [inline]

Definition at line 264 of file DiscoveryBase.h.

Referenced by OpenDDS::RTPS::Spdp::handle_participant_data().

00264                                                   {
00265         return ignored_guids_.count(guid);
00266       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::increment_key ( DDS::BuiltinTopicKey_t key  )  [inline, protected]

Definition at line 1152 of file DiscoveryBase.h.

01153       {
01154         for (int idx = 0; idx < 3; ++idx) {
01155           CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]);
01156           if (ukey == 0xFFFFFFFF) {
01157             key.value[idx] = 0;
01158           } else {
01159             ++ukey;
01160             key.value[idx] = ukey;
01161             return;
01162           }
01163         }
01164         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ")
01165                    ACE_TEXT("ran out of builtin topic keys\n")));
01166       }

template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::is_opendds ( const GUID_t endpoint  )  const [inline, protected, virtual]

Definition at line 1090 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match().

01091       {
01092         return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI,
01093                             sizeof(DCPS::VENDORID_OCI));
01094       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::make_topic_guid (  )  [inline, protected]

Definition at line 1129 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::assert_topic().

01130       {
01131         RepoId guid;
01132         guid = participant_id_;
01133         guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC;
01134         assign_topic_key(guid);
01135         return guid;
01136       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match ( const RepoId writer,
const RepoId reader 
) [inline, protected]

Definition at line 755 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match_endpoints().

00756       {
00757         // 0. For discovered endpoints, we'll have the QoS info in the form of the
00758         // publication or subscription BIT data which doesn't use the same structures
00759         // for QoS.  In those cases we can copy the individual QoS policies to temp
00760         // QoS structs:
00761         DDS::DataWriterQos tempDwQos;
00762         DDS::PublisherQos tempPubQos;
00763         DDS::DataReaderQos tempDrQos;
00764         DDS::SubscriberQos tempSubQos;
00765         ContentFilterProperty_t tempCfp;
00766 
00767         // 1. collect details about the writer, which may be local or discovered
00768         const DDS::DataWriterQos* dwQos = 0;
00769         const DDS::PublisherQos* pubQos = 0;
00770         DCPS::TransportLocatorSeq* wTls = 0;
00771 
00772         const LocalPublicationIter lpi = local_publications_.find(writer);
00773         DiscoveredPublicationIter dpi;
00774         bool writer_local = false, already_matched = false;
00775         if (lpi != local_publications_.end()) {
00776           writer_local = true;
00777           dwQos = &lpi->second.qos_;
00778           pubQos = &lpi->second.publisher_qos_;
00779           wTls = &lpi->second.trans_info_;
00780           already_matched = lpi->second.matched_endpoints_.count(reader);
00781         } else if ((dpi = discovered_publications_.find(writer))
00782                    != discovered_publications_.end()) {
00783           wTls = &dpi->second.writer_data_.writerProxy.allLocators;
00784         } else {
00785           return; // Possible and ok, since lock is released
00786         }
00787 
00788         // 2. collect details about the reader, which may be local or discovered
00789         const DDS::DataReaderQos* drQos = 0;
00790         const DDS::SubscriberQos* subQos = 0;
00791         DCPS::TransportLocatorSeq* rTls = 0;
00792         const ContentFilterProperty_t* cfProp = 0;
00793 
00794         const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
00795         DiscoveredSubscriptionIter dsi;
00796         bool reader_local = false;
00797         if (lsi != local_subscriptions_.end()) {
00798           reader_local = true;
00799           drQos = &lsi->second.qos_;
00800           subQos = &lsi->second.subscriber_qos_;
00801           rTls = &lsi->second.trans_info_;
00802           if (lsi->second.filterProperties.filterExpression[0] != 0) {
00803             tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
00804             tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
00805           }
00806           cfProp = &tempCfp;
00807           if (!already_matched) {
00808             already_matched = lsi->second.matched_endpoints_.count(writer);
00809           }
00810         } else if ((dsi = discovered_subscriptions_.find(reader))
00811                    != discovered_subscriptions_.end()) {
00812           if (!writer_local) {
00813             // this is a discovered/discovered match, nothing for us to do
00814             return;
00815           }
00816           rTls = &dsi->second.reader_data_.readerProxy.allLocators;
00817 
00818           populate_transport_locator_sequence(rTls, dsi, reader);
00819 
00820           const DDS::SubscriptionBuiltinTopicData& bit =
00821             dsi->second.reader_data_.ddsSubscriptionData;
00822           tempDrQos.durability = bit.durability;
00823           tempDrQos.deadline = bit.deadline;
00824           tempDrQos.latency_budget = bit.latency_budget;
00825           tempDrQos.liveliness = bit.liveliness;
00826           tempDrQos.reliability = bit.reliability;
00827           tempDrQos.destination_order = bit.destination_order;
00828           tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00829           tempDrQos.resource_limits =
00830             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00831           tempDrQos.user_data = bit.user_data;
00832           tempDrQos.ownership = bit.ownership;
00833           tempDrQos.time_based_filter = bit.time_based_filter;
00834           tempDrQos.reader_data_lifecycle =
00835             TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
00836           drQos = &tempDrQos;
00837           tempSubQos.presentation = bit.presentation;
00838           tempSubQos.partition = bit.partition;
00839           tempSubQos.group_data = bit.group_data;
00840           tempSubQos.entity_factory =
00841             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00842           subQos = &tempSubQos;
00843           cfProp = &dsi->second.reader_data_.contentFilterProperty;
00844         } else {
00845           return; // Possible and ok, since lock is released
00846         }
00847 
00848         // This is really part of step 1, but we're doing it here just in case we
00849         // are in the discovered/discovered match and we don't need the QoS data.
00850         if (!writer_local) {
00851           const DDS::PublicationBuiltinTopicData& bit =
00852             dpi->second.writer_data_.ddsPublicationData;
00853           tempDwQos.durability = bit.durability;
00854           tempDwQos.durability_service = bit.durability_service;
00855           tempDwQos.deadline = bit.deadline;
00856           tempDwQos.latency_budget = bit.latency_budget;
00857           tempDwQos.liveliness = bit.liveliness;
00858           tempDwQos.reliability = bit.reliability;
00859           tempDwQos.destination_order = bit.destination_order;
00860           tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00861           tempDwQos.resource_limits =
00862             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00863           tempDwQos.transport_priority =
00864             TheServiceParticipant->initial_TransportPriorityQosPolicy();
00865           tempDwQos.lifespan = bit.lifespan;
00866           tempDwQos.user_data = bit.user_data;
00867           tempDwQos.ownership = bit.ownership;
00868           tempDwQos.ownership_strength = bit.ownership_strength;
00869           tempDwQos.writer_data_lifecycle =
00870             TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
00871           dwQos = &tempDwQos;
00872           tempPubQos.presentation = bit.presentation;
00873           tempPubQos.partition = bit.partition;
00874           tempPubQos.group_data = bit.group_data;
00875           tempPubQos.entity_factory =
00876             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00877           pubQos = &tempPubQos;
00878 
00879           populate_transport_locator_sequence(wTls, dpi, writer);
00880         }
00881 
00882         // Need to release lock, below, for callbacks into DCPS which could
00883         // call into Spdp/Sedp.  Note that this doesn't unlock, it just constructs
00884         // an ACE object which will be used below for unlocking.
00885         ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00886 
00887         // 3. check transport and QoS compatibility
00888 
00889         // Copy entries from local publication and local subscription maps
00890         // prior to releasing lock
00891         DCPS::DataWriterCallbacks* dwr = 0;
00892         DCPS::DataReaderCallbacks* drr = 0;
00893         if (writer_local) {
00894           dwr = lpi->second.publication_;
00895         }
00896         if (reader_local) {
00897           drr = lsi->second.subscription_;
00898         }
00899 
00900         DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00901         DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00902 
00903         if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
00904                                 dwQos, drQos, pubQos, subQos)) {
00905           if (!writer_local) {
00906             RepoId writer_participant = writer;
00907             writer_participant.entityId = ENTITYID_PARTICIPANT;
00908             if (defer_writer(writer, writer_participant)) {
00909               return;
00910             }
00911           }
00912           if (!reader_local) {
00913             RepoId reader_participant = reader;
00914             reader_participant.entityId = ENTITYID_PARTICIPANT;
00915             if (defer_reader(reader, reader_participant)) {
00916               return;
00917             }
00918           }
00919 
00920           bool call_writer = false, call_reader = false;
00921           if (writer_local) {
00922             call_writer = lpi->second.matched_endpoints_.insert(reader).second;
00923           }
00924           if (reader_local) {
00925             call_reader = lsi->second.matched_endpoints_.insert(writer).second;
00926           }
00927           if (!call_writer && !call_reader) {
00928             return; // nothing more to do
00929           }
00930 
00931 #if defined(OPENDDS_SECURITY)
00932           if (is_security_enabled()) {
00933             if (call_reader) {
00934               RepoId writer_participant = writer;
00935               writer_participant.entityId = ENTITYID_PARTICIPANT;
00936               DatareaderCryptoHandleMap::const_iterator iter = local_reader_crypto_handles_.find(reader);
00937               if (iter != local_reader_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK
00938                 DDS::Security::DatareaderCryptoHandle drch = iter->second;
00939                 remote_writer_crypto_handles_[writer] = generate_remote_matched_writer_crypto_handle(writer_participant, drch);
00940                 DatawriterCryptoTokenSeqMap::iterator t_iter = pending_remote_writer_crypto_tokens_.find(writer);
00941                 if (t_iter != pending_remote_writer_crypto_tokens_.end()) {
00942                   DDS::Security::SecurityException se;
00943                   if (get_crypto_key_exchange()->set_remote_datawriter_crypto_tokens(iter->second, remote_writer_crypto_handles_[writer], t_iter->second, se) == false) {
00944                     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00945                       ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00946                       ACE_TEXT("Unable to set pending remote datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00947                         se.code, se.minor_code, se.message.in()));
00948                   }
00949                   pending_remote_writer_crypto_tokens_.erase(t_iter);
00950                 }
00951                 EndpointSecurityAttributesMap::const_iterator s_iter = local_reader_security_attribs_.find(reader);
00952                 if (s_iter != local_reader_security_attribs_.end() && s_iter->second.is_submessage_protected) { // Yes, this is different for remote datawriters than readers (see 8.8.9.3 vs 8.8.9.2)
00953                   create_and_send_datareader_crypto_tokens(drch, reader, remote_writer_crypto_handles_[writer], writer);
00954                 }
00955               }
00956             }
00957             if (call_writer) {
00958               RepoId reader_participant = reader;
00959               reader_participant.entityId = ENTITYID_PARTICIPANT;
00960               DatawriterCryptoHandleMap::const_iterator iter = local_writer_crypto_handles_.find(writer);
00961               if (iter != local_writer_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK
00962                 DDS::Security::DatawriterCryptoHandle dwch = iter->second;
00963                 remote_reader_crypto_handles_[reader] = generate_remote_matched_reader_crypto_handle(reader_participant, dwch, (relay_only_readers_.count(reader) != 0));
00964                 DatareaderCryptoTokenSeqMap::iterator t_iter = pending_remote_reader_crypto_tokens_.find(reader);
00965                 if (t_iter != pending_remote_reader_crypto_tokens_.end()) {
00966                   DDS::Security::SecurityException se;
00967                   if (get_crypto_key_exchange()->set_remote_datareader_crypto_tokens(iter->second, remote_reader_crypto_handles_[reader], t_iter->second, se) == false) {
00968                     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00969                       ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00970                       ACE_TEXT("Unable to set pending remote datareader crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00971                         se.code, se.minor_code, se.message.in()));
00972                   }
00973                   pending_remote_reader_crypto_tokens_.erase(t_iter);
00974                 }
00975                 EndpointSecurityAttributesMap::const_iterator s_iter = local_writer_security_attribs_.find(writer);
00976                 if (s_iter != local_writer_security_attribs_.end() && (s_iter->second.is_submessage_protected || s_iter->second.is_payload_protected)) {
00977                   create_and_send_datawriter_crypto_tokens(dwch, writer, remote_reader_crypto_handles_[reader], reader);
00978                 }
00979               }
00980             }
00981           }
00982 #endif
00983 
00984           // Copy reader and writer association data prior to releasing lock
00985 #ifdef __SUNPRO_CC
00986           DCPS::ReaderAssociation ra;
00987           ra.readerTransInfo = *rTls;
00988           ra.readerId = reader;
00989           ra.subQos = *subQos;
00990           ra.readerQos = *drQos;
00991           ra.filterClassName = cfProp->filterClassName;
00992           ra.filterExpression = cfProp->filterExpression;
00993           ra.exprParams = cfProp->expressionParameters;
00994           DCPS::WriterAssociation wa;
00995           wa.writerTransInfo = *wTls;
00996           wa.writerId = writer;
00997           wa.pubQos = *pubQos;
00998           wa.writerQos = *dwQos;
00999 #else
01000           const DCPS::ReaderAssociation ra =
01001             {add_security_info(*rTls, writer, reader), reader, *subQos, *drQos,
01002 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01003              cfProp->filterClassName, cfProp->filterExpression,
01004 #else
01005              "", "",
01006 #endif
01007              cfProp->expressionParameters};
01008 
01009           const DCPS::WriterAssociation wa =
01010             {add_security_info(*wTls, writer, reader), writer, *pubQos, *dwQos};
01011 #endif
01012 
01013           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01014           static const bool writer_active = true;
01015 
01016           if (call_writer) {
01017             if (DCPS::DCPS_debug_level > 3) {
01018               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01019                          ACE_TEXT("adding writer association\n")));
01020             }
01021             DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr);
01022             if (call_reader) {
01023               thr.activate();
01024             }
01025             dwr->add_association(writer, ra, writer_active);
01026             if (call_reader) {
01027               thr.writer_done();
01028             }
01029 
01030           } else if (call_reader) {
01031             if (DCPS::DCPS_debug_level > 3) {
01032               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01033                          ACE_TEXT("adding reader association\n")));
01034             }
01035             drr->add_association(reader, wa, !writer_active);
01036           }
01037 
01038           // change this if 'writer_active' (above) changes
01039           if (call_writer && !call_reader && !is_opendds(reader)) {
01040             if (DCPS::DCPS_debug_level > 3) {
01041               ACE_DEBUG((LM_DEBUG,
01042                          ACE_TEXT("(%P|%t) EndpointManager::match - ")
01043                          ACE_TEXT("calling writer association_complete\n")));
01044             }
01045             dwr->association_complete(reader);
01046           }
01047 
01048         } else if (already_matched) { // break an existing associtaion
01049           if (writer_local) {
01050             lpi->second.matched_endpoints_.erase(reader);
01051             lpi->second.remote_opendds_associations_.erase(reader);
01052           }
01053           if (reader_local) {
01054             lsi->second.matched_endpoints_.erase(writer);
01055             lsi->second.remote_opendds_associations_.erase(writer);
01056           }
01057           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01058           if (writer_local) {
01059             DCPS::ReaderIdSeq reader_seq(1);
01060             reader_seq.length(1);
01061             reader_seq[0] = reader;
01062             dwr->remove_associations(reader_seq, false /*notify_lost*/);
01063           }
01064           if (reader_local) {
01065             DCPS::WriterIdSeq writer_seq(1);
01066             writer_seq.length(1);
01067             writer_seq[0] = writer;
01068             drr->remove_associations(writer_seq, false /*notify_lost*/);
01069           }
01070 
01071         } else { // something was incompatible
01072           ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01073           if (writer_local && writerStatus.count_since_last_send) {
01074             if (DCPS::DCPS_debug_level > 3) {
01075               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01076                          ACE_TEXT("writer incompatible\n")));
01077             }
01078             dwr->update_incompatible_qos(writerStatus);
01079           }
01080           if (reader_local && readerStatus.count_since_last_send) {
01081             if (DCPS::DCPS_debug_level > 3) {
01082               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01083                          ACE_TEXT("reader incompatible\n")));
01084             }
01085             drr->update_incompatible_qos(readerStatus);
01086           }
01087         }
01088       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::match_endpoints ( DCPS::RepoId  repoId,
const TopicDetails td,
bool  remove = false 
) [inline, protected]

Definition at line 669 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_publication(), OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_subscription(), and OpenDDS::RTPS::Sedp::Task::svc_i().

00671       {
00672         const bool reader = repoId.entityId.entityKind & 4;
00673         // Copy the endpoint set - lock can be released in match()
00674         RepoIdSet endpoints_copy = td.endpoints_;
00675 
00676         for (RepoIdSet::const_iterator iter = endpoints_copy.begin();
00677              iter != endpoints_copy.end(); ++iter) {
00678           // check to make sure it's a Reader/Writer or Writer/Reader match
00679           if (bool(iter->entityId.entityKind & 4) != reader) {
00680             if (remove) {
00681               remove_assoc(*iter, repoId);
00682             } else {
00683               match(reader ? *iter : repoId, reader ? repoId : *iter);
00684             }
00685           }
00686         }
00687       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP ( OPENDDS_STRING  ,
TopicDetails   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
OPENDDS_STRING  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
LocalSubscription  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
LocalPublication  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DDS::BuiltinTopicKey_t  ,
DCPS::RepoId  ,
DCPS::BuiltinTopicKeyLess   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
DiscoveredPublication  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
typedef OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_MAP_CMP ( DCPS::RepoId  ,
DiscoveredSubscription  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_SET ( OPENDDS_STRING   )  [protected]
template<typename DiscoveredParticipantData_>
OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::OPENDDS_SET_CMP ( DCPS::RepoId  ,
DCPS::GUID_tKeyLessThan   
) [protected]
template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::populate_transport_locator_sequence ( DCPS::TransportLocatorSeq *&  tls,
DiscoveredPublicationIter iter,
const RepoId reader 
) [protected, pure virtual]
template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::populate_transport_locator_sequence ( DCPS::TransportLocatorSeq *&  tls,
DiscoveredSubscriptionIter iter,
const RepoId reader 
) [protected, pure virtual]
template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_assoc ( const RepoId remove_from,
const RepoId removing 
) [inline, protected]

Definition at line 690 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::match_endpoints().

00692       {
00693         const bool reader = remove_from.entityId.entityKind & 4;
00694         if (reader) {
00695           const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
00696           if (lsi != local_subscriptions_.end()) {
00697             lsi->second.matched_endpoints_.erase(removing);
00698             DCPS::WriterIdSeq writer_seq(1);
00699             writer_seq.length(1);
00700             writer_seq[0] = removing;
00701             lsi->second.remote_opendds_associations_.erase(removing);
00702             lsi->second.subscription_->remove_associations(writer_seq,
00703                                                            false /*notify_lost*/);
00704             // Update writer
00705             write_subscription_data(remove_from, lsi->second);
00706           }
00707 
00708         } else {
00709           const LocalPublicationIter lpi = local_publications_.find(remove_from);
00710           if (lpi != local_publications_.end()) {
00711             lpi->second.matched_endpoints_.erase(removing);
00712             DCPS::ReaderIdSeq reader_seq(1);
00713             reader_seq.length(1);
00714             reader_seq[0] = removing;
00715             lpi->second.remote_opendds_associations_.erase(removing);
00716             lpi->second.publication_->remove_associations(reader_seq,
00717                                                           false /*notify_lost*/);
00718           }
00719         }
00720       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit ( const DiscoveredSubscription sub  )  [inline, protected]

Definition at line 1123 of file DiscoveryBase.h.

01124       {
01125         sub_key_to_id_.erase(get_key(sub));
01126         remove_from_bit_i(sub);
01127       }

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit ( const DiscoveredPublication pub  )  [inline, protected]

Definition at line 1117 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::ignore().

01118       {
01119         pub_key_to_id_.erase(get_key(pub));
01120         remove_from_bit_i(pub);
01121       }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit_i ( const DiscoveredSubscription  )  [inline, protected, virtual]

Reimplemented in OpenDDS::RTPS::Sedp.

Definition at line 623 of file DiscoveryBase.h.

00623 { }

template<typename DiscoveredParticipantData_>
virtual void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_from_bit_i ( const DiscoveredPublication  )  [inline, protected, virtual]

Reimplemented in OpenDDS::RTPS::Sedp.

Definition at line 622 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_from_bit().

00622 { }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_publication ( const DCPS::RepoId publicationId  )  [inline]

Definition at line 405 of file DiscoveryBase.h.

00406       {
00407         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00408         LocalPublicationIter iter = local_publications_.find(publicationId);
00409         if (iter != local_publications_.end()) {
00410           if (DDS::RETCODE_OK == remove_publication_i(publicationId))
00411             {
00412               OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00413               local_publications_.erase(publicationId);
00414               typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00415                 topics_.find(topic_name);
00416               if (top_it != topics_.end()) {
00417                 match_endpoints(publicationId, top_it->second, true /*remove*/);
00418                 top_it->second.endpoints_.erase(publicationId);
00419               }
00420             } else {
00421             ACE_ERROR((LM_ERROR,
00422                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ")
00423                        ACE_TEXT("Failed to publish dispose msg\n")));
00424           }
00425         }
00426       }

template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_publication_i ( const RepoId publicationId  )  [protected, pure virtual]
template<typename DiscoveredParticipantData_>
void OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_subscription ( const DCPS::RepoId subscriptionId  )  [inline]

Definition at line 526 of file DiscoveryBase.h.

00527       {
00528         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00529         LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00530         if (iter != local_subscriptions_.end()) {
00531           if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId)
00532               /*subscriptions_writer_.write_unregister_dispose(subscriptionId)*/) {
00533             OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00534             local_subscriptions_.erase(subscriptionId);
00535             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00536               topics_.find(topic_name);
00537             if (top_it != topics_.end()) {
00538               match_endpoints(subscriptionId, top_it->second, true /*remove*/);
00539               top_it->second.endpoints_.erase(subscriptionId);
00540             }
00541           } else {
00542             ACE_ERROR((LM_ERROR,
00543                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ")
00544                        ACE_TEXT("Failed to publish dispose msg\n")));
00545           }
00546         }
00547       }

template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_subscription_i ( const RepoId subscriptionId  )  [protected, pure virtual]
template<typename DiscoveredParticipantData_>
DCPS::TopicStatus OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::remove_topic ( const RepoId topicId,
OPENDDS_STRING &  name 
) [inline]

Definition at line 297 of file DiscoveryBase.h.

00298       {
00299         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00300         name = topic_names_[topicId];
00301         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00302           topics_.find(name);
00303         if (top_it != topics_.end()) {
00304           TopicDetails& td = top_it->second;
00305           if (td.endpoints_.empty()) {
00306             topics_.erase(name);
00307           }
00308         }
00309 
00310         topic_names_.erase(topicId);
00311         return DCPS::REMOVED;
00312       }

template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::shutting_down (  )  const [protected, pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_publication_qos ( const DCPS::RepoId publicationId,
const DDS::DataWriterQos qos,
const DDS::PublisherQos publisherQos 
) [pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_subscription_params ( const DCPS::RepoId subId,
const DDS::StringSeq params 
) [pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_subscription_qos ( const DCPS::RepoId subscriptionId,
const DDS::DataReaderQos qos,
const DDS::SubscriberQos subscriberQos 
) [pure virtual]
template<typename DiscoveredParticipantData_>
virtual bool OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::update_topic_qos ( const DCPS::RepoId topicId,
const DDS::TopicQos qos,
OPENDDS_STRING &  name 
) [pure virtual]
template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::write_publication_data ( const DCPS::RepoId ,
LocalPublication ,
const DCPS::RepoId reader = DCPS::GUID_UNKNOWN 
) [inline, protected, virtual]

Reimplemented in OpenDDS::RTPS::Sedp.

Definition at line 657 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_publication().

00659                                                                                                     { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }

Here is the caller graph for this function:

template<typename DiscoveredParticipantData_>
virtual DDS::ReturnCode_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::write_subscription_data ( const DCPS::RepoId ,
LocalSubscription ,
const DCPS::RepoId reader = DCPS::GUID_UNKNOWN 
) [inline, protected, virtual]

Reimplemented in OpenDDS::RTPS::Sedp.

Definition at line 664 of file DiscoveryBase.h.

Referenced by OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::add_subscription(), and OpenDDS::DCPS::EndpointManager< Security::SPDPdiscoveredParticipantData >::remove_assoc().

00666                                                                                                      { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }

Here is the caller graph for this function:


Member Data Documentation

template<typename DiscoveredParticipantData_>
DiscoveredPublicationMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_publications_ [protected]
template<typename DiscoveredParticipantData_>
DiscoveredSubscriptionMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::discovered_subscriptions_ [protected]
template<typename DiscoveredParticipantData_>
RepoIdSet OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::ignored_guids_ [protected]
template<typename DiscoveredParticipantData_>
LocalPublicationMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_publications_ [protected]
template<typename DiscoveredParticipantData_>
LocalSubscriptionMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::local_subscriptions_ [protected]
template<typename DiscoveredParticipantData_>
ACE_Thread_Mutex& OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::lock_ [protected]
template<typename DiscoveredParticipantData_>
DCPS::RepoId OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::participant_id_ [protected]
template<typename DiscoveredParticipantData_>
DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_bit_key_ [protected]

Definition at line 1228 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
BitKeyMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::pub_key_to_id_ [protected]
template<typename DiscoveredParticipantData_>
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::publication_counter_ [protected]
template<typename DiscoveredParticipantData_>
DDS::BuiltinTopicKey_t OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_bit_key_ [protected]

Definition at line 1228 of file DiscoveryBase.h.

template<typename DiscoveredParticipantData_>
BitKeyMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::sub_key_to_id_ [protected]
template<typename DiscoveredParticipantData_>
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::subscription_counter_ [protected]
template<typename DiscoveredParticipantData_>
unsigned int OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_counter_ [protected]
template<typename DiscoveredParticipantData_>
TopicNameMap OpenDDS::DCPS::EndpointManager< DiscoveredParticipantData_ >::topic_names_ [protected]

The documentation for this class was generated from the following file:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1