DiscoveryBase.h

Go to the documentation of this file.
00001 /*
00002  * Distributed under the OpenDDS License.
00003  * See: http://www.opendds.org/license.html
00004  */
00005 
00006 #ifndef OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00007 #define OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00008 
00009 #include "dds/DCPS/Discovery.h"
00010 #include "dds/DCPS/GuidUtils.h"
00011 #include "dds/DCPS/DCPS_Utils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DCPS/DataReaderImpl_T.h"
00018 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00019 #include "dds/DdsSecurityCoreC.h"
00020 
00021 #include "ace/Select_Reactor.h"
00022 #include "ace/Condition_Thread_Mutex.h"
00023 
00024 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00025 #pragma once
00026 #endif /* ACE_LACKS_PRAGMA_ONCE */
00027 
00028 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00029 
00030 namespace OpenDDS {
00031   namespace DCPS {
00032     typedef DataReaderImpl_T<DDS::ParticipantBuiltinTopicData> ParticipantBuiltinTopicDataDataReaderImpl;
00033     typedef DataReaderImpl_T<DDS::PublicationBuiltinTopicData> PublicationBuiltinTopicDataDataReaderImpl;
00034     typedef DataReaderImpl_T<DDS::SubscriptionBuiltinTopicData> SubscriptionBuiltinTopicDataDataReaderImpl;
00035     typedef DataReaderImpl_T<DDS::TopicBuiltinTopicData> TopicBuiltinTopicDataDataReaderImpl;
00036 
00037 #if defined(OPENDDS_SECURITY)
00038     typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatareaderCryptoHandle, DCPS::GUID_tKeyLessThan) DatareaderCryptoHandleMap;
00039     typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatawriterCryptoHandle, DCPS::GUID_tKeyLessThan) DatawriterCryptoHandleMap;
00040     typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatareaderCryptoTokenSeq, DCPS::GUID_tKeyLessThan) DatareaderCryptoTokenSeqMap;
00041     typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::DatawriterCryptoTokenSeq, DCPS::GUID_tKeyLessThan) DatawriterCryptoTokenSeqMap;
00042     typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::EndpointSecurityAttributes, DCPS::GUID_tKeyLessThan) EndpointSecurityAttributesMap;
00043 
00044     typedef enum {
00045       AS_UNKNOWN,
00046       AS_VALIDATING_REMOTE,
00047       AS_HANDSHAKE_REQUEST,
00048       AS_HANDSHAKE_REQUEST_SENT,
00049       AS_HANDSHAKE_REPLY,
00050       AS_HANDSHAKE_REPLY_SENT,
00051       AS_AUTHENTICATED,
00052       AS_UNAUTHENTICATED
00053     } AuthState;
00054 #endif
00055 
00056     inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00057     {
00058       lhs[0] = static_cast<CORBA::Octet>(rhs);
00059       lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00060       lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00061     }
00062 
00063     struct DcpsUpcalls : ACE_Task_Base {
00064       DcpsUpcalls(DCPS::DataReaderCallbacks* drr,
00065                   const RepoId& reader,
00066                   const DCPS::WriterAssociation& wa,
00067                   bool active,
00068                   DCPS::DataWriterCallbacks* dwr)
00069         : drr_(drr), reader_(reader), wa_(wa), active_(active), dwr_(dwr)
00070         , reader_done_(false), writer_done_(false), cnd_(mtx_)
00071       {}
00072 
00073       int svc()
00074       {
00075         drr_->add_association(reader_, wa_, active_);
00076         {
00077           ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_, -1);
00078           reader_done_ = true;
00079           cnd_.signal();
00080           while (!writer_done_) {
00081             cnd_.wait();
00082           }
00083         }
00084         dwr_->association_complete(reader_);
00085         return 0;
00086       }
00087 
00088       void writer_done()
00089       {
00090         {
00091           ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
00092           writer_done_ = true;
00093           cnd_.signal();
00094         }
00095         wait();
00096       }
00097 
00098       DCPS::DataReaderCallbacks* const drr_;
00099       const RepoId& reader_;
00100       const DCPS::WriterAssociation& wa_;
00101       bool active_;
00102       DCPS::DataWriterCallbacks* const dwr_;
00103       bool reader_done_, writer_done_;
00104       ACE_Thread_Mutex mtx_;
00105       ACE_Condition_Thread_Mutex cnd_;
00106     };
00107 
00108     template <typename DiscoveredParticipantData_>
00109     class EndpointManager {
00110     protected:
00111 
00112       struct DiscoveredSubscription {
00113         DiscoveredSubscription()
00114         : bit_ih_(DDS::HANDLE_NIL)
00115         {
00116         }
00117 
00118         explicit DiscoveredSubscription(const OpenDDS::DCPS::DiscoveredReaderData& r)
00119         : reader_data_(r)
00120         , bit_ih_(DDS::HANDLE_NIL)
00121         {
00122         }
00123 
00124         OpenDDS::DCPS::DiscoveredReaderData reader_data_;
00125         DDS::InstanceHandle_t bit_ih_;
00126 
00127 #if defined(OPENDDS_SECURITY)
00128         DDS::Security::EndpointSecurityAttributes security_attribs_;
00129 #endif
00130 
00131       };
00132 
00133       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredSubscription,
00134                               DCPS::GUID_tKeyLessThan) DiscoveredSubscriptionMap;
00135 
00136       typedef typename DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
00137 
00138       struct DiscoveredPublication {
00139         DiscoveredPublication()
00140         : bit_ih_(DDS::HANDLE_NIL)
00141         {
00142         }
00143 
00144         explicit DiscoveredPublication(const OpenDDS::DCPS::DiscoveredWriterData& w)
00145         : writer_data_(w)
00146         , bit_ih_(DDS::HANDLE_NIL)
00147         {
00148         }
00149 
00150         OpenDDS::DCPS::DiscoveredWriterData writer_data_;
00151         DDS::InstanceHandle_t bit_ih_;
00152 
00153 #if defined(OPENDDS_SECURITY)
00154         DDS::Security::EndpointSecurityAttributes security_attribs_;
00155 #endif
00156 
00157       };
00158 
00159       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredPublication,
00160                               DCPS::GUID_tKeyLessThan) DiscoveredPublicationMap;
00161       typedef typename DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
00162 
00163     public:
00164       typedef DiscoveredParticipantData_ DiscoveredParticipantData;
00165 
00166       struct TopicDetails {
00167         OPENDDS_STRING data_type_;
00168         DDS::TopicQos qos_;
00169         DCPS::RepoId repo_id_;
00170         bool has_dcps_key_;
00171         RepoIdSet endpoints_;
00172       };
00173 
00174       EndpointManager(const RepoId& participant_id, ACE_Thread_Mutex& lock)
00175         : lock_(lock)
00176         , participant_id_(participant_id)
00177         , publication_counter_(0)
00178         , subscription_counter_(0)
00179         , topic_counter_(0)
00180 
00181 #if defined(OPENDDS_SECURITY)
00182         , permissions_handle_(DDS::HANDLE_NIL)
00183 #endif
00184 
00185       {
00186 
00187       }
00188 
00189       virtual ~EndpointManager() { }
00190 
00191       RepoId bit_key_to_repo_id(const char* bit_topic_name,
00192                                 const DDS::BuiltinTopicKey_t& key)
00193       {
00194         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00195         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) {
00196           return pub_key_to_id_[key];
00197         }
00198         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) {
00199           return sub_key_to_id_[key];
00200         }
00201         return RepoId();
00202       }
00203 
00204       void ignore(const DCPS::RepoId& to_ignore)
00205       {
00206         // Locked prior to call from Spdp.
00207         ignored_guids_.insert(to_ignore);
00208         {
00209           const DiscoveredPublicationIter iter =
00210             discovered_publications_.find(to_ignore);
00211           if (iter != discovered_publications_.end()) {
00212             // clean up tracking info
00213             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00214             remove_from_bit(iter->second);
00215             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00216             discovered_publications_.erase(iter);
00217             // break associations
00218             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00219               topics_.find(topic_name);
00220             if (top_it != topics_.end()) {
00221               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00222             }
00223             return;
00224           }
00225         }
00226         {
00227           const DiscoveredSubscriptionIter iter =
00228             discovered_subscriptions_.find(to_ignore);
00229           if (iter != discovered_subscriptions_.end()) {
00230             // clean up tracking info
00231             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00232             remove_from_bit(iter->second);
00233             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00234             discovered_subscriptions_.erase(iter);
00235             // break associations
00236             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00237               topics_.find(topic_name);
00238             if (top_it != topics_.end()) {
00239               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00240             }
00241             return;
00242           }
00243         }
00244         {
00245           const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator
00246             iter = topic_names_.find(to_ignore);
00247           if (iter != topic_names_.end()) {
00248             ignored_topics_.insert(iter->second);
00249             // Remove all publications and subscriptions on this topic
00250             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00251               topics_.find(iter->second);
00252             if (top_it != topics_.end()) {
00253               TopicDetails& td = top_it->second;
00254               RepoIdSet::iterator ep;
00255               for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) {
00256                 match_endpoints(*ep, td, true /*remove*/);
00257                 if (shutting_down()) { return; }
00258               }
00259             }
00260           }
00261         }
00262       }
00263 
00264       bool ignoring(const DCPS::RepoId& guid) const {
00265         return ignored_guids_.count(guid);
00266       }
00267       bool ignoring(const char* topic_name) const {
00268         return ignored_topics_.count(topic_name);
00269       }
00270 
00271       DCPS::TopicStatus assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00272                                      const char* dataTypeName, const DDS::TopicQos& qos,
00273                                      bool hasDcpsKey)
00274       {
00275         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00276         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
00277           topics_.find(topicName);
00278         if (iter != topics_.end()) { // types must match, RtpsDiscovery checked for us
00279           iter->second.qos_ = qos;
00280           iter->second.has_dcps_key_ = hasDcpsKey;
00281           topicId = iter->second.repo_id_;
00282           topic_names_[iter->second.repo_id_] = topicName;
00283           return DCPS::FOUND;
00284         }
00285 
00286         TopicDetails& td = topics_[topicName];
00287         td.data_type_ = dataTypeName;
00288         td.qos_ = qos;
00289         td.has_dcps_key_ = hasDcpsKey;
00290         td.repo_id_ = make_topic_guid();
00291         topicId = td.repo_id_;
00292         topic_names_[td.repo_id_] = topicName;
00293 
00294         return DCPS::CREATED;
00295       }
00296 
00297       DCPS::TopicStatus remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00298       {
00299         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00300         name = topic_names_[topicId];
00301         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00302           topics_.find(name);
00303         if (top_it != topics_.end()) {
00304           TopicDetails& td = top_it->second;
00305           if (td.endpoints_.empty()) {
00306             topics_.erase(name);
00307           }
00308         }
00309 
00310         topic_names_.erase(topicId);
00311         return DCPS::REMOVED;
00312       }
00313 
00314       virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00315                                     OPENDDS_STRING& name) = 0;
00316 
00317       DCPS::RepoId add_publication(const DCPS::RepoId& topicId,
00318                                    DCPS::DataWriterCallbacks* publication,
00319                                    const DDS::DataWriterQos& qos,
00320                                    const DCPS::TransportLocatorSeq& transInfo,
00321                                    const DDS::PublisherQos& publisherQos)
00322       {
00323         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00324 
00325         RepoId rid = participant_id_;
00326         assign_publication_key(rid, topicId, qos);
00327         LocalPublication& pb = local_publications_[rid];
00328         pb.topic_id_ = topicId;
00329         pb.publication_ = publication;
00330         pb.qos_ = qos;
00331         pb.trans_info_ = transInfo;
00332         pb.publisher_qos_ = publisherQos;
00333 
00334         const std::string& topic_name = topic_names_[topicId];
00335 
00336 #if defined(OPENDDS_SECURITY)
00337         if (is_security_enabled()) {
00338           DDS::Security::SecurityException ex;
00339 
00340           DDS::Security::TopicSecurityAttributes topic_sec_attr;
00341           if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00342             ACE_ERROR((LM_ERROR,
00343               ACE_TEXT("(%P|%t) ERROR: ")
00344               ACE_TEXT("DomainParticipant::add_publication, ")
00345               ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00346                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00347             return RepoId();
00348           }
00349 
00350           if (topic_sec_attr.is_write_protected == true) {
00351             if (!get_access_control()->check_create_datawriter(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00352                                                                publisherQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00353               ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00354                 ACE_TEXT("EndpointManager::add_publication() - ")
00355                 ACE_TEXT("Permissions check failed for local datawriter on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00356                   ex.code, ex.minor_code, ex.message.in()));
00357               return RepoId();
00358             }
00359           }
00360 
00361           if (!get_access_control()->get_datawriter_sec_attributes(get_permissions_handle(), topic_name.data(),
00362               publisherQos.partition, DDS::Security::DataTagQosPolicy(), pb.security_attribs_, ex)) {
00363             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00364                        ACE_TEXT("EndpointManager::add_publication() - ")
00365                        ACE_TEXT("Unable to get security attributes for local datawriter. Security Exception[%d.%d]: %C\n"),
00366                        ex.code, ex.minor_code, ex.message.in()));
00367             return RepoId();
00368           }
00369 
00370           if (pb.security_attribs_.is_submessage_protected || pb.security_attribs_.is_payload_protected) {
00371             DDS::Security::DatawriterCryptoHandle handle = get_crypto_key_factory()->register_local_datawriter(crypto_handle_, DDS::PropertySeq(), pb.security_attribs_, ex);
00372             if (handle == DDS::HANDLE_NIL) {
00373               ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00374                          ACE_TEXT("EndpointManager::add_publication() - ")
00375                          ACE_TEXT("Unable to get local datawriter crypto handle. Security Exception[%d.%d]: %C\n"),
00376                          ex.code, ex.minor_code, ex.message.in()));
00377             }
00378 
00379             local_writer_crypto_handles_[rid] = handle;
00380             local_writer_security_attribs_[rid] = pb.security_attribs_;
00381           }
00382         }
00383 #endif
00384 
00385         TopicDetails& td = topics_[topic_name];
00386         td.endpoints_.insert(rid);
00387 
00388         if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
00389           return RepoId();
00390         }
00391 
00392         if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
00393           return RepoId();
00394         }
00395 
00396         if (DCPS::DCPS_debug_level > 3) {
00397           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ")
00398                      ACE_TEXT("calling match_endpoints\n")));
00399         }
00400         match_endpoints(rid, td);
00401 
00402         return rid;
00403       }
00404 
00405       void remove_publication(const DCPS::RepoId& publicationId)
00406       {
00407         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00408         LocalPublicationIter iter = local_publications_.find(publicationId);
00409         if (iter != local_publications_.end()) {
00410           if (DDS::RETCODE_OK == remove_publication_i(publicationId))
00411             {
00412               OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00413               local_publications_.erase(publicationId);
00414               typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00415                 topics_.find(topic_name);
00416               if (top_it != topics_.end()) {
00417                 match_endpoints(publicationId, top_it->second, true /*remove*/);
00418                 top_it->second.endpoints_.erase(publicationId);
00419               }
00420             } else {
00421             ACE_ERROR((LM_ERROR,
00422                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ")
00423                        ACE_TEXT("Failed to publish dispose msg\n")));
00424           }
00425         }
00426       }
00427 
00428       virtual bool update_publication_qos(const DCPS::RepoId& publicationId,
00429                                           const DDS::DataWriterQos& qos,
00430                                           const DDS::PublisherQos& publisherQos) = 0;
00431 
00432       DCPS::RepoId add_subscription(const DCPS::RepoId& topicId,
00433                                     DCPS::DataReaderCallbacks* subscription,
00434                                     const DDS::DataReaderQos& qos,
00435                                     const DCPS::TransportLocatorSeq& transInfo,
00436                                     const DDS::SubscriberQos& subscriberQos,
00437                                     const char* filterClassName,
00438                                     const char* filterExpr,
00439                                     const DDS::StringSeq& params)
00440       {
00441         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00442 
00443         RepoId rid = participant_id_;
00444         assign_subscription_key(rid, topicId, qos);
00445         LocalSubscription& sb = local_subscriptions_[rid];
00446         sb.topic_id_ = topicId;
00447         sb.subscription_ = subscription;
00448         sb.qos_ = qos;
00449         sb.trans_info_ = transInfo;
00450         sb.subscriber_qos_ = subscriberQos;
00451         sb.filterProperties.filterClassName = filterClassName;
00452         sb.filterProperties.filterExpression = filterExpr;
00453         sb.filterProperties.expressionParameters = params;
00454 
00455         const std::string& topic_name = topic_names_[topicId];
00456 
00457 #if defined(OPENDDS_SECURITY)
00458         if (is_security_enabled()) {
00459           DDS::Security::SecurityException ex;
00460 
00461           DDS::Security::TopicSecurityAttributes topic_sec_attr;
00462           if (!get_access_control()->get_topic_sec_attributes(get_permissions_handle(), topic_name.data(), topic_sec_attr, ex)) {
00463             ACE_ERROR((LM_ERROR,
00464               ACE_TEXT("(%P|%t) ERROR: ")
00465               ACE_TEXT("DomainParticipant::add_subscription, ")
00466               ACE_TEXT("Unable to get security attributes for topic '%C'. SecurityException[%d.%d]: %C\n"),
00467                 topic_name.data(), ex.code, ex.minor_code, ex.message.in()));
00468             return RepoId();
00469           }
00470 
00471           if (topic_sec_attr.is_read_protected == true) {
00472             if (!get_access_control()->check_create_datareader(get_permissions_handle(), get_domain_id(), topic_name.data(), qos,
00473                                                                subscriberQos.partition, DDS::Security::DataTagQosPolicy(), ex)) {
00474               ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
00475                 ACE_TEXT("EndpointManager::add_subscription() - ")
00476                 ACE_TEXT("Permissions check failed for local datareader on topic '%C'. Security Exception[%d.%d]: %C\n"), topic_name.data(),
00477                   ex.code, ex.minor_code, ex.message.in()));
00478               return RepoId();
00479             }
00480           }
00481 
00482           if (!get_access_control()->get_datareader_sec_attributes(get_permissions_handle(), topic_name.data(),
00483               subscriberQos.partition, DDS::Security::DataTagQosPolicy(), sb.security_attribs_, ex)) {
00484             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00485                        ACE_TEXT("EndpointManager::add_subscription() - ")
00486                        ACE_TEXT("Unable to get security attributes for local datareader. Security Exception[%d.%d]: %C\n"),
00487                        ex.code, ex.minor_code, ex.message.in()));
00488             return RepoId();
00489           }
00490 
00491           if (sb.security_attribs_.is_submessage_protected || sb.security_attribs_.is_payload_protected) {
00492             DDS::Security::DatareaderCryptoHandle handle = get_crypto_key_factory()->register_local_datareader(crypto_handle_, DDS::PropertySeq(), sb.security_attribs_, ex);
00493             if (handle == DDS::HANDLE_NIL) {
00494               ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00495                          ACE_TEXT("EndpointManager::add_subscription() - ")
00496                          ACE_TEXT("Unable to get local datareader crypto handle. Security Exception[%d.%d]: %C\n"),
00497                          ex.code, ex.minor_code, ex.message.in()));
00498             }
00499 
00500             local_reader_crypto_handles_[rid] = handle;
00501             local_reader_security_attribs_[rid] = sb.security_attribs_;
00502           }
00503         }
00504 #endif
00505 
00506         TopicDetails& td = topics_[topic_name];
00507         td.endpoints_.insert(rid);
00508 
00509         if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
00510           return RepoId();
00511         }
00512 
00513         if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
00514           return RepoId();
00515         }
00516 
00517         if (DCPS::DCPS_debug_level > 3) {
00518           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ")
00519                      ACE_TEXT("calling match_endpoints\n")));
00520         }
00521         match_endpoints(rid, td);
00522 
00523         return rid;
00524       }
00525 
00526       void remove_subscription(const DCPS::RepoId& subscriptionId)
00527       {
00528         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00529         LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00530         if (iter != local_subscriptions_.end()) {
00531           if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId)
00532               /*subscriptions_writer_.write_unregister_dispose(subscriptionId)*/) {
00533             OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00534             local_subscriptions_.erase(subscriptionId);
00535             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00536               topics_.find(topic_name);
00537             if (top_it != topics_.end()) {
00538               match_endpoints(subscriptionId, top_it->second, true /*remove*/);
00539               top_it->second.endpoints_.erase(subscriptionId);
00540             }
00541           } else {
00542             ACE_ERROR((LM_ERROR,
00543                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ")
00544                        ACE_TEXT("Failed to publish dispose msg\n")));
00545           }
00546         }
00547       }
00548 
00549       virtual bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00550                                            const DDS::DataReaderQos& qos,
00551                                            const DDS::SubscriberQos& subscriberQos) = 0;
00552 
00553       virtual bool update_subscription_params(const DCPS::RepoId& subId,
00554                                               const DDS::StringSeq& params) = 0;
00555 
00556       virtual void association_complete(const DCPS::RepoId& localId,
00557                                         const DCPS::RepoId& remoteId) = 0;
00558 
00559       virtual bool disassociate(const DiscoveredParticipantData& pdata) = 0;
00560 
00561     protected:
00562       struct LocalEndpoint {
00563         LocalEndpoint() : topic_id_(DCPS::GUID_UNKNOWN), sequence_(DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {}
00564         DCPS::RepoId topic_id_;
00565         DCPS::TransportLocatorSeq trans_info_;
00566         RepoIdSet matched_endpoints_;
00567         DCPS::SequenceNumber sequence_;
00568         RepoIdSet remote_opendds_associations_;
00569       };
00570 
00571       struct LocalPublication : LocalEndpoint {
00572         DCPS::DataWriterCallbacks* publication_;
00573         DDS::DataWriterQos qos_;
00574         DDS::PublisherQos publisher_qos_;
00575 
00576 #if defined(OPENDDS_SECURITY)
00577         DDS::Security::EndpointSecurityAttributes security_attribs_;
00578 #endif
00579 
00580       };
00581 
00582       struct LocalSubscription : LocalEndpoint {
00583         DCPS::DataReaderCallbacks* subscription_;
00584         DDS::DataReaderQos qos_;
00585         DDS::SubscriberQos subscriber_qos_;
00586         OpenDDS::DCPS::ContentFilterProperty_t filterProperties;
00587 
00588 #if defined(OPENDDS_SECURITY)
00589         DDS::Security::EndpointSecurityAttributes security_attribs_;
00590 #endif
00591 
00592       };
00593 
00594       typedef OPENDDS_MAP_CMP(DDS::BuiltinTopicKey_t, DCPS::RepoId,
00595                               DCPS::BuiltinTopicKeyLess) BitKeyMap;
00596 
00597       typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalPublication,
00598                               DCPS::GUID_tKeyLessThan) LocalPublicationMap;
00599       typedef typename LocalPublicationMap::iterator LocalPublicationIter;
00600       typedef typename LocalPublicationMap::const_iterator LocalPublicationCIter;
00601 
00602       typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalSubscription,
00603                               DCPS::GUID_tKeyLessThan) LocalSubscriptionMap;
00604       typedef typename LocalSubscriptionMap::iterator LocalSubscriptionIter;
00605       typedef typename LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
00606 
00607       typedef typename OPENDDS_MAP_CMP(DCPS::RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TopicNameMap;
00608 
00609       static const char* get_topic_name(const DiscoveredPublication& pub) {
00610         return pub.writer_data_.ddsPublicationData.topic_name;
00611       }
00612       static const char* get_topic_name(const DiscoveredSubscription& sub) {
00613         return sub.reader_data_.ddsSubscriptionData.topic_name;
00614       }
00615       static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication& pub) {
00616         return pub.writer_data_.ddsPublicationData.key;
00617       }
00618       static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription& sub) {
00619         return sub.reader_data_.ddsSubscriptionData.key;
00620       }
00621 
00622       virtual void remove_from_bit_i(const DiscoveredPublication& /*pub*/) { }
00623       virtual void remove_from_bit_i(const DiscoveredSubscription& /*sub*/) { }
00624 
00625       virtual void assign_publication_key(RepoId& rid,
00626                                           const RepoId& topicId,
00627                                           const DDS::DataWriterQos& /*qos*/) {
00628         rid.entityId.entityKind =
00629           has_dcps_key(topicId)
00630           ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY
00631           : DCPS::ENTITYKIND_USER_WRITER_NO_KEY;
00632         assign(rid.entityId.entityKey, publication_counter_++);
00633       }
00634       virtual void assign_subscription_key(RepoId& rid,
00635                                            const RepoId& topicId,
00636                                            const DDS::DataReaderQos& /*qos*/) {
00637         rid.entityId.entityKind =
00638           has_dcps_key(topicId)
00639           ? DCPS::ENTITYKIND_USER_READER_WITH_KEY
00640           : DCPS::ENTITYKIND_USER_READER_NO_KEY;
00641         assign(rid.entityId.entityKey, subscription_counter_++);
00642       }
00643       virtual void assign_topic_key(RepoId& guid) {
00644         assign(guid.entityId.entityKey, topic_counter_++);
00645 
00646         if (topic_counter_ == 0x1000000) {
00647           ACE_ERROR((LM_ERROR,
00648                      ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ")
00649                      ACE_TEXT("Exceeded Maximum number of topic entity keys!")
00650                      ACE_TEXT("Next key will be a duplicate!\n")));
00651           topic_counter_ = 0;
00652         }
00653       }
00654 
00655       virtual DDS::ReturnCode_t add_publication_i(const DCPS::RepoId& /*rid*/,
00656                                                   LocalPublication& /*pub*/) { return DDS::RETCODE_OK; }
00657       virtual DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& /*rid*/,
00658                                                        LocalPublication& /*pub*/,
00659                                                        const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00660       virtual DDS::ReturnCode_t remove_publication_i(const RepoId& publicationId) = 0;
00661 
00662       virtual DDS::ReturnCode_t add_subscription_i(const DCPS::RepoId& /*rid*/,
00663                                                    LocalSubscription& /*pub*/) { return DDS::RETCODE_OK; };
00664       virtual DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& /*rid*/,
00665                                                         LocalSubscription& /*pub*/,
00666                                                         const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00667       virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& subscriptionId) = 0;
00668 
00669       void match_endpoints(DCPS::RepoId repoId, const TopicDetails& td,
00670                            bool remove = false)
00671       {
00672         const bool reader = repoId.entityId.entityKind & 4;
00673         // Copy the endpoint set - lock can be released in match()
00674         RepoIdSet endpoints_copy = td.endpoints_;
00675 
00676         for (RepoIdSet::const_iterator iter = endpoints_copy.begin();
00677              iter != endpoints_copy.end(); ++iter) {
00678           // check to make sure it's a Reader/Writer or Writer/Reader match
00679           if (bool(iter->entityId.entityKind & 4) != reader) {
00680             if (remove) {
00681               remove_assoc(*iter, repoId);
00682             } else {
00683               match(reader ? *iter : repoId, reader ? repoId : *iter);
00684             }
00685           }
00686         }
00687       }
00688 
00689       void
00690       remove_assoc(const RepoId& remove_from,
00691                    const RepoId& removing)
00692       {
00693         const bool reader = remove_from.entityId.entityKind & 4;
00694         if (reader) {
00695           const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
00696           if (lsi != local_subscriptions_.end()) {
00697             lsi->second.matched_endpoints_.erase(removing);
00698             DCPS::WriterIdSeq writer_seq(1);
00699             writer_seq.length(1);
00700             writer_seq[0] = removing;
00701             lsi->second.remote_opendds_associations_.erase(removing);
00702             lsi->second.subscription_->remove_associations(writer_seq,
00703                                                            false /*notify_lost*/);
00704             // Update writer
00705             write_subscription_data(remove_from, lsi->second);
00706           }
00707 
00708         } else {
00709           const LocalPublicationIter lpi = local_publications_.find(remove_from);
00710           if (lpi != local_publications_.end()) {
00711             lpi->second.matched_endpoints_.erase(removing);
00712             DCPS::ReaderIdSeq reader_seq(1);
00713             reader_seq.length(1);
00714             reader_seq[0] = removing;
00715             lpi->second.remote_opendds_associations_.erase(removing);
00716             lpi->second.publication_->remove_associations(reader_seq,
00717                                                           false /*notify_lost*/);
00718           }
00719         }
00720       }
00721 
00722 #if defined(OPENDDS_SECURITY)
00723       virtual DDS::Security::DatawriterCryptoHandle
00724       generate_remote_matched_writer_crypto_handle(const RepoId&, const DDS::Security::DatareaderCryptoHandle&)
00725       {
00726         return DDS::HANDLE_NIL;
00727       }
00728 
00729       virtual DDS::Security::DatareaderCryptoHandle
00730       generate_remote_matched_reader_crypto_handle(const RepoId&, const DDS::Security::DatawriterCryptoHandle&, bool)
00731       {
00732         return DDS::HANDLE_NIL;
00733       }
00734 
00735       virtual void
00736       create_and_send_datareader_crypto_tokens(const DDS::Security::DatareaderCryptoHandle&, const DCPS::RepoId&, const DDS::Security::DatawriterCryptoHandle&, const DCPS::RepoId&)
00737       {
00738         return;
00739       }
00740 
00741       virtual void
00742       create_and_send_datawriter_crypto_tokens(const DDS::Security::DatawriterCryptoHandle&, const DCPS::RepoId&, const DDS::Security::DatareaderCryptoHandle&, const DCPS::RepoId&)
00743       {
00744         return;
00745       }
00746 #endif
00747 
00748       virtual DDS::DomainId_t
00749       get_domain_id() const
00750       {
00751         return -1;
00752       }
00753 
00754       void
00755       match(const RepoId& writer, const RepoId& reader)
00756       {
00757         // 0. For discovered endpoints, we'll have the QoS info in the form of the
00758         // publication or subscription BIT data which doesn't use the same structures
00759         // for QoS.  In those cases we can copy the individual QoS policies to temp
00760         // QoS structs:
00761         DDS::DataWriterQos tempDwQos;
00762         DDS::PublisherQos tempPubQos;
00763         DDS::DataReaderQos tempDrQos;
00764         DDS::SubscriberQos tempSubQos;
00765         ContentFilterProperty_t tempCfp;
00766 
00767         // 1. collect details about the writer, which may be local or discovered
00768         const DDS::DataWriterQos* dwQos = 0;
00769         const DDS::PublisherQos* pubQos = 0;
00770         DCPS::TransportLocatorSeq* wTls = 0;
00771 
00772         const LocalPublicationIter lpi = local_publications_.find(writer);
00773         DiscoveredPublicationIter dpi;
00774         bool writer_local = false, already_matched = false;
00775         if (lpi != local_publications_.end()) {
00776           writer_local = true;
00777           dwQos = &lpi->second.qos_;
00778           pubQos = &lpi->second.publisher_qos_;
00779           wTls = &lpi->second.trans_info_;
00780           already_matched = lpi->second.matched_endpoints_.count(reader);
00781         } else if ((dpi = discovered_publications_.find(writer))
00782                    != discovered_publications_.end()) {
00783           wTls = &dpi->second.writer_data_.writerProxy.allLocators;
00784         } else {
00785           return; // Possible and ok, since lock is released
00786         }
00787 
00788         // 2. collect details about the reader, which may be local or discovered
00789         const DDS::DataReaderQos* drQos = 0;
00790         const DDS::SubscriberQos* subQos = 0;
00791         DCPS::TransportLocatorSeq* rTls = 0;
00792         const ContentFilterProperty_t* cfProp = 0;
00793 
00794         const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
00795         DiscoveredSubscriptionIter dsi;
00796         bool reader_local = false;
00797         if (lsi != local_subscriptions_.end()) {
00798           reader_local = true;
00799           drQos = &lsi->second.qos_;
00800           subQos = &lsi->second.subscriber_qos_;
00801           rTls = &lsi->second.trans_info_;
00802           if (lsi->second.filterProperties.filterExpression[0] != 0) {
00803             tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
00804             tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
00805           }
00806           cfProp = &tempCfp;
00807           if (!already_matched) {
00808             already_matched = lsi->second.matched_endpoints_.count(writer);
00809           }
00810         } else if ((dsi = discovered_subscriptions_.find(reader))
00811                    != discovered_subscriptions_.end()) {
00812           if (!writer_local) {
00813             // this is a discovered/discovered match, nothing for us to do
00814             return;
00815           }
00816           rTls = &dsi->second.reader_data_.readerProxy.allLocators;
00817 
00818           populate_transport_locator_sequence(rTls, dsi, reader);
00819 
00820           const DDS::SubscriptionBuiltinTopicData& bit =
00821             dsi->second.reader_data_.ddsSubscriptionData;
00822           tempDrQos.durability = bit.durability;
00823           tempDrQos.deadline = bit.deadline;
00824           tempDrQos.latency_budget = bit.latency_budget;
00825           tempDrQos.liveliness = bit.liveliness;
00826           tempDrQos.reliability = bit.reliability;
00827           tempDrQos.destination_order = bit.destination_order;
00828           tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00829           tempDrQos.resource_limits =
00830             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00831           tempDrQos.user_data = bit.user_data;
00832           tempDrQos.ownership = bit.ownership;
00833           tempDrQos.time_based_filter = bit.time_based_filter;
00834           tempDrQos.reader_data_lifecycle =
00835             TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
00836           drQos = &tempDrQos;
00837           tempSubQos.presentation = bit.presentation;
00838           tempSubQos.partition = bit.partition;
00839           tempSubQos.group_data = bit.group_data;
00840           tempSubQos.entity_factory =
00841             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00842           subQos = &tempSubQos;
00843           cfProp = &dsi->second.reader_data_.contentFilterProperty;
00844         } else {
00845           return; // Possible and ok, since lock is released
00846         }
00847 
00848         // This is really part of step 1, but we're doing it here just in case we
00849         // are in the discovered/discovered match and we don't need the QoS data.
00850         if (!writer_local) {
00851           const DDS::PublicationBuiltinTopicData& bit =
00852             dpi->second.writer_data_.ddsPublicationData;
00853           tempDwQos.durability = bit.durability;
00854           tempDwQos.durability_service = bit.durability_service;
00855           tempDwQos.deadline = bit.deadline;
00856           tempDwQos.latency_budget = bit.latency_budget;
00857           tempDwQos.liveliness = bit.liveliness;
00858           tempDwQos.reliability = bit.reliability;
00859           tempDwQos.destination_order = bit.destination_order;
00860           tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00861           tempDwQos.resource_limits =
00862             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00863           tempDwQos.transport_priority =
00864             TheServiceParticipant->initial_TransportPriorityQosPolicy();
00865           tempDwQos.lifespan = bit.lifespan;
00866           tempDwQos.user_data = bit.user_data;
00867           tempDwQos.ownership = bit.ownership;
00868           tempDwQos.ownership_strength = bit.ownership_strength;
00869           tempDwQos.writer_data_lifecycle =
00870             TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
00871           dwQos = &tempDwQos;
00872           tempPubQos.presentation = bit.presentation;
00873           tempPubQos.partition = bit.partition;
00874           tempPubQos.group_data = bit.group_data;
00875           tempPubQos.entity_factory =
00876             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00877           pubQos = &tempPubQos;
00878 
00879           populate_transport_locator_sequence(wTls, dpi, writer);
00880         }
00881 
00882         // Need to release lock, below, for callbacks into DCPS which could
00883         // call into Spdp/Sedp.  Note that this doesn't unlock, it just constructs
00884         // an ACE object which will be used below for unlocking.
00885         ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00886 
00887         // 3. check transport and QoS compatibility
00888 
00889         // Copy entries from local publication and local subscription maps
00890         // prior to releasing lock
00891         DCPS::DataWriterCallbacks* dwr = 0;
00892         DCPS::DataReaderCallbacks* drr = 0;
00893         if (writer_local) {
00894           dwr = lpi->second.publication_;
00895         }
00896         if (reader_local) {
00897           drr = lsi->second.subscription_;
00898         }
00899 
00900         DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00901         DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00902 
00903         if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
00904                                 dwQos, drQos, pubQos, subQos)) {
00905           if (!writer_local) {
00906             RepoId writer_participant = writer;
00907             writer_participant.entityId = ENTITYID_PARTICIPANT;
00908             if (defer_writer(writer, writer_participant)) {
00909               return;
00910             }
00911           }
00912           if (!reader_local) {
00913             RepoId reader_participant = reader;
00914             reader_participant.entityId = ENTITYID_PARTICIPANT;
00915             if (defer_reader(reader, reader_participant)) {
00916               return;
00917             }
00918           }
00919 
00920           bool call_writer = false, call_reader = false;
00921           if (writer_local) {
00922             call_writer = lpi->second.matched_endpoints_.insert(reader).second;
00923           }
00924           if (reader_local) {
00925             call_reader = lsi->second.matched_endpoints_.insert(writer).second;
00926           }
00927           if (!call_writer && !call_reader) {
00928             return; // nothing more to do
00929           }
00930 
00931 #if defined(OPENDDS_SECURITY)
00932           if (is_security_enabled()) {
00933             if (call_reader) {
00934               RepoId writer_participant = writer;
00935               writer_participant.entityId = ENTITYID_PARTICIPANT;
00936               DatareaderCryptoHandleMap::const_iterator iter = local_reader_crypto_handles_.find(reader);
00937               if (iter != local_reader_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK
00938                 DDS::Security::DatareaderCryptoHandle drch = iter->second;
00939                 remote_writer_crypto_handles_[writer] = generate_remote_matched_writer_crypto_handle(writer_participant, drch);
00940                 DatawriterCryptoTokenSeqMap::iterator t_iter = pending_remote_writer_crypto_tokens_.find(writer);
00941                 if (t_iter != pending_remote_writer_crypto_tokens_.end()) {
00942                   DDS::Security::SecurityException se;
00943                   if (get_crypto_key_exchange()->set_remote_datawriter_crypto_tokens(iter->second, remote_writer_crypto_handles_[writer], t_iter->second, se) == false) {
00944                     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00945                       ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00946                       ACE_TEXT("Unable to set pending remote datawriter crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00947                         se.code, se.minor_code, se.message.in()));
00948                   }
00949                   pending_remote_writer_crypto_tokens_.erase(t_iter);
00950                 }
00951                 EndpointSecurityAttributesMap::const_iterator s_iter = local_reader_security_attribs_.find(reader);
00952                 if (s_iter != local_reader_security_attribs_.end() && s_iter->second.is_submessage_protected) { // Yes, this is different for remote datawriters than readers (see 8.8.9.3 vs 8.8.9.2)
00953                   create_and_send_datareader_crypto_tokens(drch, reader, remote_writer_crypto_handles_[writer], writer);
00954                 }
00955               }
00956             }
00957             if (call_writer) {
00958               RepoId reader_participant = reader;
00959               reader_participant.entityId = ENTITYID_PARTICIPANT;
00960               DatawriterCryptoHandleMap::const_iterator iter = local_writer_crypto_handles_.find(writer);
00961               if (iter != local_writer_crypto_handles_.end()) { // It might not exist due to security attributes, and that's OK
00962                 DDS::Security::DatawriterCryptoHandle dwch = iter->second;
00963                 remote_reader_crypto_handles_[reader] = generate_remote_matched_reader_crypto_handle(reader_participant, dwch, (relay_only_readers_.count(reader) != 0));
00964                 DatareaderCryptoTokenSeqMap::iterator t_iter = pending_remote_reader_crypto_tokens_.find(reader);
00965                 if (t_iter != pending_remote_reader_crypto_tokens_.end()) {
00966                   DDS::Security::SecurityException se;
00967                   if (get_crypto_key_exchange()->set_remote_datareader_crypto_tokens(iter->second, remote_reader_crypto_handles_[reader], t_iter->second, se) == false) {
00968                     ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
00969                       ACE_TEXT("(%P|%t) ERROR: DiscoveryBase::match() - ")
00970                       ACE_TEXT("Unable to set pending remote datareader crypto tokens with crypto key exchange plugin. Security Exception[%d.%d]: %C\n"),
00971                         se.code, se.minor_code, se.message.in()));
00972                   }
00973                   pending_remote_reader_crypto_tokens_.erase(t_iter);
00974                 }
00975                 EndpointSecurityAttributesMap::const_iterator s_iter = local_writer_security_attribs_.find(writer);
00976                 if (s_iter != local_writer_security_attribs_.end() && (s_iter->second.is_submessage_protected || s_iter->second.is_payload_protected)) {
00977                   create_and_send_datawriter_crypto_tokens(dwch, writer, remote_reader_crypto_handles_[reader], reader);
00978                 }
00979               }
00980             }
00981           }
00982 #endif
00983 
00984           // Copy reader and writer association data prior to releasing lock
00985 #ifdef __SUNPRO_CC
00986           DCPS::ReaderAssociation ra;
00987           ra.readerTransInfo = *rTls;
00988           ra.readerId = reader;
00989           ra.subQos = *subQos;
00990           ra.readerQos = *drQos;
00991           ra.filterClassName = cfProp->filterClassName;
00992           ra.filterExpression = cfProp->filterExpression;
00993           ra.exprParams = cfProp->expressionParameters;
00994           DCPS::WriterAssociation wa;
00995           wa.writerTransInfo = *wTls;
00996           wa.writerId = writer;
00997           wa.pubQos = *pubQos;
00998           wa.writerQos = *dwQos;
00999 #else
01000           const DCPS::ReaderAssociation ra =
01001             {add_security_info(*rTls, writer, reader), reader, *subQos, *drQos,
01002 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
01003              cfProp->filterClassName, cfProp->filterExpression,
01004 #else
01005              "", "",
01006 #endif
01007              cfProp->expressionParameters};
01008 
01009           const DCPS::WriterAssociation wa =
01010             {add_security_info(*wTls, writer, reader), writer, *pubQos, *dwQos};
01011 #endif
01012 
01013           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01014           static const bool writer_active = true;
01015 
01016           if (call_writer) {
01017             if (DCPS::DCPS_debug_level > 3) {
01018               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01019                          ACE_TEXT("adding writer association\n")));
01020             }
01021             DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr);
01022             if (call_reader) {
01023               thr.activate();
01024             }
01025             dwr->add_association(writer, ra, writer_active);
01026             if (call_reader) {
01027               thr.writer_done();
01028             }
01029 
01030           } else if (call_reader) {
01031             if (DCPS::DCPS_debug_level > 3) {
01032               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01033                          ACE_TEXT("adding reader association\n")));
01034             }
01035             drr->add_association(reader, wa, !writer_active);
01036           }
01037 
01038           // change this if 'writer_active' (above) changes
01039           if (call_writer && !call_reader && !is_opendds(reader)) {
01040             if (DCPS::DCPS_debug_level > 3) {
01041               ACE_DEBUG((LM_DEBUG,
01042                          ACE_TEXT("(%P|%t) EndpointManager::match - ")
01043                          ACE_TEXT("calling writer association_complete\n")));
01044             }
01045             dwr->association_complete(reader);
01046           }
01047 
01048         } else if (already_matched) { // break an existing associtaion
01049           if (writer_local) {
01050             lpi->second.matched_endpoints_.erase(reader);
01051             lpi->second.remote_opendds_associations_.erase(reader);
01052           }
01053           if (reader_local) {
01054             lsi->second.matched_endpoints_.erase(writer);
01055             lsi->second.remote_opendds_associations_.erase(writer);
01056           }
01057           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
01058           if (writer_local) {
01059             DCPS::ReaderIdSeq reader_seq(1);
01060             reader_seq.length(1);
01061             reader_seq[0] = reader;
01062             dwr->remove_associations(reader_seq, false /*notify_lost*/);
01063           }
01064           if (reader_local) {
01065             DCPS::WriterIdSeq writer_seq(1);
01066             writer_seq.length(1);
01067             writer_seq[0] = writer;
01068             drr->remove_associations(writer_seq, false /*notify_lost*/);
01069           }
01070 
01071         } else { // something was incompatible
01072           ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
01073           if (writer_local && writerStatus.count_since_last_send) {
01074             if (DCPS::DCPS_debug_level > 3) {
01075               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01076                          ACE_TEXT("writer incompatible\n")));
01077             }
01078             dwr->update_incompatible_qos(writerStatus);
01079           }
01080           if (reader_local && readerStatus.count_since_last_send) {
01081             if (DCPS::DCPS_debug_level > 3) {
01082               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
01083                          ACE_TEXT("reader incompatible\n")));
01084             }
01085             drr->update_incompatible_qos(readerStatus);
01086           }
01087         }
01088       }
01089 
01090       virtual bool is_opendds(const GUID_t& endpoint) const
01091       {
01092         return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI,
01093                             sizeof(DCPS::VENDORID_OCI));
01094       }
01095 
01096       virtual bool shutting_down() const = 0;
01097 
01098       virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
01099                                                        DiscoveredSubscriptionIter& iter,
01100                                                        const RepoId& reader) = 0;
01101 
01102       virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
01103                                                        DiscoveredPublicationIter& iter,
01104                                                        const RepoId& reader) = 0;
01105 
01106       virtual DCPS::TransportLocatorSeq
01107       add_security_info(const DCPS::TransportLocatorSeq& locators,
01108                         const RepoId& /*writer*/, const RepoId& /*reader*/)
01109       { return locators; }
01110 
01111       virtual bool defer_writer(const RepoId& writer,
01112                                 const RepoId& writer_participant) = 0;
01113 
01114       virtual bool defer_reader(const RepoId& writer,
01115                                 const RepoId& writer_participant) = 0;
01116 
01117       void remove_from_bit(const DiscoveredPublication& pub)
01118       {
01119         pub_key_to_id_.erase(get_key(pub));
01120         remove_from_bit_i(pub);
01121       }
01122 
01123       void remove_from_bit(const DiscoveredSubscription& sub)
01124       {
01125         sub_key_to_id_.erase(get_key(sub));
01126         remove_from_bit_i(sub);
01127       }
01128 
01129       RepoId make_topic_guid()
01130       {
01131         RepoId guid;
01132         guid = participant_id_;
01133         guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC;
01134         assign_topic_key(guid);
01135         return guid;
01136       }
01137 
01138       bool has_dcps_key(const DCPS::RepoId& topicId) const
01139       {
01140         typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap;
01141         TNMap::const_iterator tn = topic_names_.find(topicId);
01142         if (tn == topic_names_.end()) return false;
01143 
01144         typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap;
01145         typename TDMap::const_iterator td = topics_.find(tn->second);
01146         if (td == topics_.end()) return false;
01147 
01148         return td->second.has_dcps_key_;
01149       }
01150 
01151       void
01152       increment_key(DDS::BuiltinTopicKey_t& key)
01153       {
01154         for (int idx = 0; idx < 3; ++idx) {
01155           CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]);
01156           if (ukey == 0xFFFFFFFF) {
01157             key.value[idx] = 0;
01158           } else {
01159             ++ukey;
01160             key.value[idx] = ukey;
01161             return;
01162           }
01163         }
01164         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ")
01165                    ACE_TEXT("ran out of builtin topic keys\n")));
01166       }
01167 
01168 #if defined(OPENDDS_SECURITY)
01169       inline bool is_security_enabled()
01170       {
01171         return (permissions_handle_ != DDS::HANDLE_NIL) && (access_control_ != 0);
01172       }
01173 
01174       inline void set_permissions_handle(DDS::Security::PermissionsHandle h)
01175       {
01176         permissions_handle_ = h;
01177       }
01178 
01179       inline DDS::Security::PermissionsHandle get_permissions_handle() const
01180       {
01181         return permissions_handle_;
01182       }
01183 
01184       inline void set_access_control(DDS::Security::AccessControl_var acl)
01185       {
01186         access_control_ = acl;
01187       }
01188 
01189       inline DDS::Security::AccessControl_var get_access_control() const
01190       {
01191         return access_control_;
01192       }
01193 
01194       inline void set_crypto_key_factory(DDS::Security::CryptoKeyFactory_var ckf)
01195       {
01196         crypto_key_factory_ = ckf;
01197       }
01198 
01199       inline DDS::Security::CryptoKeyFactory_var get_crypto_key_factory() const
01200       {
01201         return crypto_key_factory_;
01202       }
01203 
01204       inline void set_crypto_key_exchange(DDS::Security::CryptoKeyExchange_var ckf)
01205       {
01206         crypto_key_exchange_ = ckf;
01207       }
01208 
01209       inline DDS::Security::CryptoKeyExchange_var get_crypto_key_exchange() const
01210       {
01211         return crypto_key_exchange_;
01212       }
01213 #endif
01214 
01215       ACE_Thread_Mutex& lock_;
01216       DCPS::RepoId participant_id_;
01217       BitKeyMap pub_key_to_id_, sub_key_to_id_;
01218       RepoIdSet ignored_guids_;
01219       unsigned int publication_counter_, subscription_counter_, topic_counter_;
01220       LocalPublicationMap local_publications_;
01221       LocalSubscriptionMap local_subscriptions_;
01222       DiscoveredPublicationMap discovered_publications_;
01223       DiscoveredSubscriptionMap discovered_subscriptions_;
01224       OPENDDS_MAP(OPENDDS_STRING, TopicDetails) topics_;
01225       TopicNameMap topic_names_;
01226       OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
01227       OPENDDS_SET_CMP(DCPS::RepoId, DCPS::GUID_tKeyLessThan) relay_only_readers_;
01228       DDS::BuiltinTopicKey_t pub_bit_key_, sub_bit_key_;
01229 
01230 #if defined(OPENDDS_SECURITY)
01231       DDS::Security::AccessControl_var access_control_;
01232       DDS::Security::CryptoKeyFactory_var crypto_key_factory_;
01233       DDS::Security::CryptoKeyExchange_var crypto_key_exchange_;
01234 
01235       DDS::Security::PermissionsHandle permissions_handle_;
01236       DDS::Security::ParticipantCryptoHandle crypto_handle_;
01237 
01238       DatareaderCryptoHandleMap local_reader_crypto_handles_;
01239       DatawriterCryptoHandleMap local_writer_crypto_handles_;
01240 
01241       EndpointSecurityAttributesMap local_reader_security_attribs_;
01242       EndpointSecurityAttributesMap local_writer_security_attribs_;
01243 
01244       DatareaderCryptoHandleMap remote_reader_crypto_handles_;
01245       DatawriterCryptoHandleMap remote_writer_crypto_handles_;
01246 
01247       DatareaderCryptoTokenSeqMap pending_remote_reader_crypto_tokens_;
01248       DatawriterCryptoTokenSeqMap pending_remote_writer_crypto_tokens_;
01249 #endif
01250 
01251     };
01252 
01253     template <typename EndpointManagerType>
01254     class LocalParticipant : public DCPS::RcObject {
01255     public:
01256       typedef typename EndpointManagerType::DiscoveredParticipantData DiscoveredParticipantData;
01257       typedef typename EndpointManagerType::TopicDetails TopicDetails;
01258 
01259       LocalParticipant (const DDS::DomainParticipantQos& qos)
01260         : qos_(qos)
01261       { }
01262 
01263       virtual ~LocalParticipant() { }
01264 
01265       DCPS::RepoId bit_key_to_repo_id(const char* bit_topic_name,
01266                                       const DDS::BuiltinTopicKey_t& key)
01267       {
01268         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PARTICIPANT_TOPIC)) {
01269           RepoId guid;
01270           std::memcpy(guid.guidPrefix, key.value, sizeof(DDS::BuiltinTopicKeyValue));
01271           guid.entityId = ENTITYID_PARTICIPANT;
01272           return guid;
01273 
01274         } else {
01275           return endpoint_manager().bit_key_to_repo_id(bit_topic_name, key);
01276         }
01277       }
01278 
01279       void ignore_domain_participant(const RepoId& ignoreId)
01280       {
01281         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01282         endpoint_manager().ignore(ignoreId);
01283 
01284         const DiscoveredParticipantIter iter = participants_.find(ignoreId);
01285         if (iter != participants_.end()) {
01286           remove_discovered_participant(iter);
01287         }
01288       }
01289 
01290       virtual bool
01291       announce_domain_participant_qos()
01292       {
01293         return true;
01294       }
01295 
01296       bool
01297       update_domain_participant_qos(const DDS::DomainParticipantQos& qos)
01298       {
01299         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01300         qos_ = qos;
01301         return announce_domain_participant_qos();
01302       }
01303 
01304       DCPS::TopicStatus
01305       assert_topic(DCPS::RepoId_out topicId, const char* topicName,
01306                    const char* dataTypeName, const DDS::TopicQos& qos,
01307                    bool hasDcpsKey)
01308       {
01309         if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
01310           if (DCPS::DCPS_debug_level) {
01311             ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
01312                        ACE_TEXT("topic or type name length limit (256) exceeded\n")));
01313           }
01314           return DCPS::PRECONDITION_NOT_MET;
01315         }
01316 
01317         return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey);
01318       }
01319 
01320       DCPS::TopicStatus
01321       remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
01322       {
01323         return endpoint_manager().remove_topic(topicId, name);
01324       }
01325 
01326       void
01327       ignore_topic(const RepoId& ignoreId)
01328       {
01329         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01330         endpoint_manager().ignore(ignoreId);
01331       }
01332 
01333       bool
01334       update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
01335                        OPENDDS_STRING& name)
01336       {
01337         return endpoint_manager().update_topic_qos(topicId, qos, name);
01338       }
01339 
01340       RepoId
01341       add_publication(const RepoId& topicId,
01342                       DCPS::DataWriterCallbacks* publication,
01343                       const DDS::DataWriterQos& qos,
01344                       const DCPS::TransportLocatorSeq& transInfo,
01345                       const DDS::PublisherQos& publisherQos)
01346       {
01347         return endpoint_manager().add_publication(topicId, publication, qos,
01348                                                   transInfo, publisherQos);
01349       }
01350 
01351       void
01352       remove_publication(const RepoId& publicationId)
01353       {
01354         endpoint_manager().remove_publication(publicationId);
01355       }
01356 
01357       void
01358       ignore_publication(const RepoId& ignoreId)
01359       {
01360         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01361         return endpoint_manager().ignore(ignoreId);
01362       }
01363 
01364       bool
01365       update_publication_qos(const RepoId& publicationId,
01366                              const DDS::DataWriterQos& qos,
01367                              const DDS::PublisherQos& publisherQos)
01368       {
01369         return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
01370       }
01371 
01372       RepoId
01373       add_subscription(const RepoId& topicId,
01374                        DCPS::DataReaderCallbacks* subscription,
01375                        const DDS::DataReaderQos& qos,
01376                        const DCPS::TransportLocatorSeq& transInfo,
01377                        const DDS::SubscriberQos& subscriberQos,
01378                        const char* filterClassName,
01379                        const char* filterExpr,
01380                        const DDS::StringSeq& params)
01381       {
01382         return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
01383                                                    subscriberQos, filterClassName, filterExpr, params);
01384       }
01385 
01386       void
01387       remove_subscription(const RepoId& subscriptionId)
01388       {
01389         endpoint_manager().remove_subscription(subscriptionId);
01390       }
01391 
01392       void
01393       ignore_subscription(const RepoId& ignoreId)
01394       {
01395         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01396         return endpoint_manager().ignore(ignoreId);
01397       }
01398 
01399       bool
01400       update_subscription_qos(const RepoId& subscriptionId,
01401                               const DDS::DataReaderQos& qos,
01402                               const DDS::SubscriberQos& subscriberQos)
01403       {
01404         return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
01405       }
01406 
01407       bool
01408       update_subscription_params(const RepoId& subId,
01409                                  const DDS::StringSeq& params)
01410       {
01411         return endpoint_manager().update_subscription_params(subId, params);
01412       }
01413 
01414       void
01415       association_complete(const RepoId& localId, const RepoId& remoteId)
01416       {
01417         endpoint_manager().association_complete(localId, remoteId);
01418       }
01419 
01420       DDS::Subscriber_var bit_subscriber() const { return bit_subscriber_; }
01421 
01422     protected:
01423 
01424       struct DiscoveredParticipant {
01425 
01426         DiscoveredParticipant() :
01427 #if defined(OPENDDS_SECURITY)
01428           bit_ih_(0),
01429           has_last_stateless_msg_(false),
01430           last_stateless_msg_time_(0, 0),
01431           auth_started_time_(0, 0),
01432           auth_state_(AS_UNKNOWN)
01433 #else
01434           bit_ih_(0)
01435 #endif
01436         {
01437 
01438         }
01439 
01440         DiscoveredParticipant(const DiscoveredParticipantData& p, const ACE_Time_Value& t) :
01441 #if defined(OPENDDS_SECURITY)
01442           pdata_(p),
01443           last_seen_(t),
01444           bit_ih_(DDS::HANDLE_NIL),
01445           has_last_stateless_msg_(false),
01446           last_stateless_msg_time_(0, 0),
01447           auth_started_time_(0, 0),
01448           auth_state_(AS_UNKNOWN)
01449 #else
01450         pdata_(p),
01451         last_seen_(t),
01452         bit_ih_(DDS::HANDLE_NIL)
01453 #endif
01454         {
01455 
01456         }
01457 
01458         DiscoveredParticipantData pdata_;
01459         ACE_Time_Value last_seen_;
01460         DDS::InstanceHandle_t bit_ih_;
01461 
01462 #if defined(OPENDDS_SECURITY)
01463         bool has_last_stateless_msg_;
01464         ACE_Time_Value last_stateless_msg_time_;
01465         DDS::Security::ParticipantStatelessMessage last_stateless_msg_;
01466 
01467         ACE_Time_Value auth_started_time_;
01468         AuthState auth_state_;
01469 
01470         DDS::Security::IdentityToken identity_token_;
01471         DDS::Security::PermissionsToken permissions_token_;
01472         DDS::Security::PropertyQosPolicy property_qos_;
01473         DDS::Security::ParticipantSecurityInfo security_info_;
01474         DDS::Security::IdentityStatusToken identity_status_token_;
01475         DDS::Security::IdentityHandle identity_handle_;
01476         DDS::Security::HandshakeHandle handshake_handle_;
01477         DDS::Security::AuthRequestMessageToken local_auth_request_token_;
01478         DDS::Security::AuthRequestMessageToken remote_auth_request_token_;
01479         DDS::Security::AuthenticatedPeerCredentialToken authenticated_peer_credential_token_;
01480         DDS::Security::SharedSecretHandle_var shared_secret_handle_;
01481         DDS::Security::PermissionsHandle permissions_handle_;
01482         DDS::Security::ParticipantCryptoHandle crypto_handle_;
01483         DDS::Security::ParticipantCryptoTokenSeq crypto_tokens_;
01484 #endif
01485 
01486       };
01487 
01488       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredParticipant,
01489                               DCPS::GUID_tKeyLessThan) DiscoveredParticipantMap;
01490       typedef typename DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
01491       typedef typename DiscoveredParticipantMap::const_iterator
01492         DiscoveredParticipantConstIter;
01493 
01494 #if defined(OPENDDS_SECURITY)
01495       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DDS::Security::AuthRequestMessageToken, DCPS::GUID_tKeyLessThan) PendingRemoteAuthTokenMap;
01496 #endif
01497 
01498       virtual EndpointManagerType& endpoint_manager() = 0;
01499 
01500       void remove_discovered_participant(DiscoveredParticipantIter iter)
01501       {
01502         bool removed = endpoint_manager().disassociate(iter->second.pdata_);
01503         if (removed) {
01504 #ifndef DDS_HAS_MINIMUM_BIT
01505           ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
01506           // bit may be null if the DomainParticipant is shutting down
01507           if (bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) {
01508             bit->set_instance_state(iter->second.bit_ih_,
01509                                     DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01510           }
01511 #endif /* DDS_HAS_MINIMUM_BIT */
01512           if (DCPS::DCPS_debug_level > 3) {
01513             DCPS::GuidConverter conv(iter->first);
01514             ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
01515                        ACE_TEXT(" - erasing %C\n"), OPENDDS_STRING(conv).c_str()));
01516           }
01517           participants_.erase(iter);
01518         }
01519       }
01520 
01521 #ifndef DDS_HAS_MINIMUM_BIT
01522       ParticipantBuiltinTopicDataDataReaderImpl* part_bit()
01523       {
01524         if (!bit_subscriber_.in())
01525           return 0;
01526 
01527         DDS::DataReader_var d =
01528           bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01529         return dynamic_cast<ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01530       }
01531 #endif /* DDS_HAS_MINIMUM_BIT */
01532 
01533       mutable ACE_Thread_Mutex lock_;
01534       DDS::Subscriber_var bit_subscriber_;
01535       DDS::DomainParticipantQos qos_;
01536       DiscoveredParticipantMap participants_;
01537 
01538 #if defined(OPENDDS_SECURITY)
01539       PendingRemoteAuthTokenMap pending_remote_auth_tokens_;
01540 #endif
01541 
01542     };
01543 
01544     template<typename Participant>
01545     class PeerDiscovery : public Discovery {
01546     public:
01547       typedef typename Participant::TopicDetails TopicDetails;
01548 
01549       explicit PeerDiscovery(const RepoKey& key) : Discovery(key) { }
01550 
01551       ~PeerDiscovery() {
01552         reactor_runner_.end();
01553       }
01554 
01555       virtual DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant) {
01556         using namespace DCPS;
01557         if (create_bit_topics(participant) != DDS::RETCODE_OK) {
01558           return 0;
01559         }
01560 
01561         DDS::Subscriber_var bit_subscriber =
01562           participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
01563                                          DDS::SubscriberListener::_nil(),
01564                                          DEFAULT_STATUS_MASK);
01565         SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
01566 
01567         DDS::DataReaderQos dr_qos;
01568         sub->get_default_datareader_qos(dr_qos);
01569         dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01570 
01571 #ifndef DDS_HAS_MINIMUM_BIT
01572         DDS::TopicDescription_var bit_part_topic =
01573           participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
01574         create_bit_dr(bit_part_topic, BUILT_IN_PARTICIPANT_TOPIC_TYPE,
01575                       sub, dr_qos);
01576 
01577         DDS::TopicDescription_var bit_topic_topic =
01578           participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
01579         create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
01580                       sub, dr_qos);
01581 
01582         DDS::TopicDescription_var bit_pub_topic =
01583           participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
01584         create_bit_dr(bit_pub_topic, BUILT_IN_PUBLICATION_TOPIC_TYPE,
01585                       sub, dr_qos);
01586 
01587         DDS::TopicDescription_var bit_sub_topic =
01588           participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
01589         create_bit_dr(bit_sub_topic, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE,
01590                       sub, dr_qos);
01591 
01592         const DDS::ReturnCode_t ret = bit_subscriber->enable();
01593         if (ret != DDS::RETCODE_OK) {
01594           if (DCPS_debug_level) {
01595             ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) PeerDiscovery::init_bit")
01596                        ACE_TEXT(" - Error %d enabling subscriber\n"), ret));
01597           }
01598           return 0;
01599         }
01600 #endif /* DDS_HAS_MINIMUM_BIT */
01601 
01602         get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
01603 
01604         return bit_subscriber._retn();
01605       }
01606 
01607       virtual void fini_bit(DCPS::DomainParticipantImpl* participant)
01608       {
01609         get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
01610       }
01611 
01612       virtual OpenDDS::DCPS::RepoId bit_key_to_repo_id(DCPS::DomainParticipantImpl* participant,
01613                                                        const char* bit_topic_name,
01614                                                        const DDS::BuiltinTopicKey_t& key) const
01615       {
01616         return get_part(participant->get_domain_id(), participant->get_id())
01617           ->bit_key_to_repo_id(bit_topic_name, key);
01618       }
01619 
01620       virtual bool attach_participant(DDS::DomainId_t /*domainId*/,
01621                                       const OpenDDS::DCPS::RepoId& /*participantId*/)
01622       {
01623         return false; // This is just for DCPSInfoRepo?
01624       }
01625 
01626       virtual bool remove_domain_participant(DDS::DomainId_t domain_id,
01627                                              const OpenDDS::DCPS::RepoId& participantId)
01628       {
01629         // Use reference counting to ensure participant
01630         // does not get deleted until lock as been released.
01631         ParticipantHandle participant;
01632         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01633         typename DomainParticipantMap::iterator domain = participants_.find(domain_id);
01634         if (domain == participants_.end()) {
01635           return false;
01636         }
01637         typename ParticipantMap::iterator part = domain->second.find(participantId);
01638         if (part == domain->second.end()) {
01639           return false;
01640         }
01641         participant = part->second;
01642         domain->second.erase(part);
01643         if (domain->second.empty()) {
01644           participants_.erase(domain);
01645         }
01646 
01647         return true;
01648       }
01649 
01650       virtual bool ignore_domain_participant(DDS::DomainId_t domain,
01651                                              const OpenDDS::DCPS::RepoId& myParticipantId,
01652                                              const OpenDDS::DCPS::RepoId& ignoreId)
01653       {
01654         get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
01655         return true;
01656       }
01657 
01658       virtual bool update_domain_participant_qos(DDS::DomainId_t domain,
01659                                                  const OpenDDS::DCPS::RepoId& participant,
01660                                                  const DDS::DomainParticipantQos& qos)
01661       {
01662         return get_part(domain, participant)->update_domain_participant_qos(qos);
01663       }
01664 
01665       virtual DCPS::TopicStatus assert_topic(OpenDDS::DCPS::RepoId_out topicId,
01666                                              DDS::DomainId_t domainId,
01667                                              const OpenDDS::DCPS::RepoId& participantId,
01668                                              const char* topicName,
01669                                              const char* dataTypeName,
01670                                              const DDS::TopicQos& qos,
01671                                              bool hasDcpsKey)
01672       {
01673         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01674         typename OPENDDS_MAP(DDS::DomainId_t,
01675                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01676           topics_.find(domainId);
01677         if (topic_it != topics_.end()) {
01678           const typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator it =
01679             topic_it->second.find(topicName);
01680           if (it != topic_it->second.end()
01681               && it->second.data_type_ != dataTypeName) {
01682             topicId = GUID_UNKNOWN;
01683             return DCPS::CONFLICTING_TYPENAME;
01684           }
01685         }
01686 
01687         // Verified its safe to hold lock during call to assert_topic
01688         const DCPS::TopicStatus stat =
01689           participants_[domainId][participantId]->assert_topic(topicId, topicName,
01690                                                                dataTypeName, qos,
01691                                                                hasDcpsKey);
01692         if (stat == DCPS::CREATED || stat == DCPS::FOUND) { // qos change (FOUND)
01693           TopicDetails& td = topics_[domainId][topicName];
01694           td.data_type_ = dataTypeName;
01695           td.qos_ = qos;
01696           td.repo_id_ = topicId;
01697           ++topic_use_[domainId][topicName];
01698         }
01699         return stat;
01700       }
01701 
01702       virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char* topicName,
01703                                            CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
01704                                            OpenDDS::DCPS::RepoId_out topicId)
01705       {
01706         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01707         typename OPENDDS_MAP(DDS::DomainId_t,
01708                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01709           topics_.find(domainId);
01710         if (topic_it == topics_.end()) {
01711           return DCPS::NOT_FOUND;
01712         }
01713         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
01714           topic_it->second.find(topicName);
01715         if (iter == topic_it->second.end()) {
01716           return DCPS::NOT_FOUND;
01717         }
01718         TopicDetails& td = iter->second;
01719         dataTypeName = td.data_type_.c_str();
01720         qos = new DDS::TopicQos(td.qos_);
01721         topicId = td.repo_id_;
01722         ++topic_use_[domainId][topicName];
01723         return DCPS::FOUND;
01724       }
01725 
01726       virtual DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId,
01727                                              const OpenDDS::DCPS::RepoId& participantId,
01728                                              const OpenDDS::DCPS::RepoId& topicId)
01729       {
01730         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01731         typename OPENDDS_MAP(DDS::DomainId_t,
01732                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01733           topics_.find(domainId);
01734         if (topic_it == topics_.end()) {
01735           return DCPS::NOT_FOUND;
01736         }
01737 
01738         OPENDDS_STRING name;
01739         // Safe to hold lock while calling remove topic
01740         const DCPS::TopicStatus stat =
01741           participants_[domainId][participantId]->remove_topic(topicId, name);
01742 
01743         if (stat == DCPS::REMOVED) {
01744           if (0 == --topic_use_[domainId][name]) {
01745             topic_use_[domainId].erase(name);
01746             if (topic_it->second.empty()) {
01747               topic_use_.erase(domainId);
01748             }
01749             topic_it->second.erase(name);
01750             if (topic_it->second.empty()) {
01751               topics_.erase(topic_it);
01752             }
01753           }
01754         }
01755         return stat;
01756       }
01757 
01758       virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId& myParticipantId,
01759                                 const OpenDDS::DCPS::RepoId& ignoreId)
01760       {
01761         get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
01762         return true;
01763       }
01764 
01765       virtual bool update_topic_qos(const OpenDDS::DCPS::RepoId& topicId, DDS::DomainId_t domainId,
01766                                     const OpenDDS::DCPS::RepoId& participantId, const DDS::TopicQos& qos)
01767       {
01768         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01769         OPENDDS_STRING name;
01770         // Safe to hold lock while calling update_topic_qos
01771         if (participants_[domainId][participantId]->update_topic_qos(topicId,
01772                                                                      qos, name)) {
01773           topics_[domainId][name].qos_ = qos;
01774           return true;
01775         }
01776         return false;
01777       }
01778 
01779       virtual OpenDDS::DCPS::RepoId add_publication(DDS::DomainId_t domainId,
01780                                                     const OpenDDS::DCPS::RepoId& participantId,
01781                                                     const OpenDDS::DCPS::RepoId& topicId,
01782                                                     DCPS::DataWriterCallbacks* publication,
01783                                                     const DDS::DataWriterQos& qos,
01784                                                     const DCPS::TransportLocatorSeq& transInfo,
01785                                                     const DDS::PublisherQos& publisherQos)
01786       {
01787         return get_part(domainId, participantId)->add_publication(topicId, publication, qos, transInfo, publisherQos);
01788       }
01789 
01790       virtual bool remove_publication(DDS::DomainId_t domainId,
01791                                       const OpenDDS::DCPS::RepoId& participantId,
01792                                       const OpenDDS::DCPS::RepoId& publicationId)
01793       {
01794         get_part(domainId, participantId)->remove_publication(publicationId);
01795         return true;
01796       }
01797 
01798       virtual bool ignore_publication(DDS::DomainId_t domainId,
01799                                       const OpenDDS::DCPS::RepoId& participantId,
01800                                       const OpenDDS::DCPS::RepoId& ignoreId)
01801       {
01802         get_part(domainId, participantId)->ignore_publication(ignoreId);
01803         return true;
01804       }
01805 
01806       virtual bool update_publication_qos(DDS::DomainId_t domainId,
01807                                           const OpenDDS::DCPS::RepoId& partId,
01808                                           const OpenDDS::DCPS::RepoId& dwId,
01809                                           const DDS::DataWriterQos& qos,
01810                                           const DDS::PublisherQos& publisherQos)
01811       {
01812         return get_part(domainId, partId)->update_publication_qos(dwId, qos,
01813                                                                   publisherQos);
01814       }
01815 
01816       virtual OpenDDS::DCPS::RepoId add_subscription(DDS::DomainId_t domainId,
01817                                                      const OpenDDS::DCPS::RepoId& participantId,
01818                                                      const OpenDDS::DCPS::RepoId& topicId,
01819                                                      DCPS::DataReaderCallbacks* subscription,
01820                                                      const DDS::DataReaderQos& qos,
01821                                                      const DCPS::TransportLocatorSeq& transInfo,
01822                                                      const DDS::SubscriberQos& subscriberQos,
01823                                                      const char* filterClassName,
01824                                                      const char* filterExpr,
01825                                                      const DDS::StringSeq& params)
01826       {
01827         return get_part(domainId, participantId)->add_subscription(topicId, subscription, qos, transInfo, subscriberQos, filterClassName, filterExpr, params);
01828       }
01829 
01830       virtual bool remove_subscription(DDS::DomainId_t domainId,
01831                                        const OpenDDS::DCPS::RepoId& participantId,
01832                                        const OpenDDS::DCPS::RepoId& subscriptionId)
01833       {
01834         get_part(domainId, participantId)->remove_subscription(subscriptionId);
01835         return true;
01836       }
01837 
01838       virtual bool ignore_subscription(DDS::DomainId_t domainId,
01839                                        const OpenDDS::DCPS::RepoId& participantId,
01840                                        const OpenDDS::DCPS::RepoId& ignoreId)
01841       {
01842         get_part(domainId, participantId)->ignore_subscription(ignoreId);
01843         return true;
01844       }
01845 
01846       virtual bool update_subscription_qos(DDS::DomainId_t domainId,
01847                                            const OpenDDS::DCPS::RepoId& partId,
01848                                            const OpenDDS::DCPS::RepoId& drId,
01849                                            const DDS::DataReaderQos& qos,
01850                                            const DDS::SubscriberQos& subQos)
01851       {
01852         return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
01853       }
01854 
01855       virtual bool update_subscription_params(DDS::DomainId_t domainId,
01856                                               const OpenDDS::DCPS::RepoId& partId,
01857                                               const OpenDDS::DCPS::RepoId& subId,
01858                                               const DDS::StringSeq& params)
01859       {
01860         return get_part(domainId, partId)->update_subscription_params(subId, params);
01861       }
01862 
01863       virtual void association_complete(DDS::DomainId_t domainId,
01864                                         const OpenDDS::DCPS::RepoId& participantId,
01865                                         const OpenDDS::DCPS::RepoId& localId,
01866                                         const OpenDDS::DCPS::RepoId& remoteId)
01867       {
01868         get_part(domainId, participantId)->association_complete(localId, remoteId);
01869       }
01870 
01871       ACE_Reactor*
01872       reactor()
01873       {
01874         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, reactor_runner_.mtx_, 0);
01875         if (!reactor_runner_.reactor_) {
01876           reactor_runner_.reactor_.reset(new ACE_Reactor(new ACE_Select_Reactor, true));
01877           reactor_runner_.activate();
01878         }
01879         return reactor_runner_.reactor_.get();
01880       }
01881 
01882     protected:
01883 
01884       typedef DCPS::RcHandle<Participant> ParticipantHandle;
01885       typedef OPENDDS_MAP_CMP(DCPS::RepoId, ParticipantHandle, DCPS::GUID_tKeyLessThan) ParticipantMap;
01886       typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
01887 
01888       ParticipantHandle
01889         get_part(const DDS::DomainId_t domain_id,
01890                  const OpenDDS::DCPS::RepoId& part_id) const
01891       {
01892         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ParticipantHandle());
01893         typename DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
01894         if (domain == participants_.end()) {
01895           return ParticipantHandle();
01896         }
01897         typename ParticipantMap::const_iterator part = domain->second.find(part_id);
01898         if (part == domain->second.end()) {
01899           return ParticipantHandle();
01900         }
01901         return part->second;
01902       }
01903 
01904       void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
01905                          DCPS::SubscriberImpl* sub,
01906                          const DDS::DataReaderQos& qos)
01907       {
01908         using namespace DCPS;
01909         TopicDescriptionImpl* bit_topic_i =
01910           dynamic_cast<TopicDescriptionImpl*>(topic);
01911 
01912         DDS::DomainParticipant_var participant = sub->get_participant();
01913         DomainParticipantImpl* participant_i =
01914           dynamic_cast<DomainParticipantImpl*>(participant.in());
01915 
01916         TypeSupport_var type_support =
01917           Registered_Data_Types->lookup(participant, type);
01918 
01919         DDS::DataReader_var dr = type_support->create_datareader();
01920         OpenDDS::DCPS::DataReaderImpl* dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dr.in());
01921 
01922         dri->init(bit_topic_i, qos, 0 /*listener*/, 0 /*mask*/, participant_i, sub);
01923         dri->disable_transport();
01924         dri->enable();
01925       }
01926 
01927       mutable ACE_Thread_Mutex lock_;
01928 
01929       // Before participants_ so destroyed after.
01930       struct ReactorRunner : ACE_Task_Base {
01931       ReactorRunner()  {}
01932 
01933         int svc()
01934         {
01935           reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
01936           reactor_->run_reactor_event_loop();
01937           return 0;
01938         }
01939 
01940         void end()
01941         {
01942           ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
01943           if (reactor_) {
01944             reactor_->end_reactor_event_loop();
01945             wait();
01946           }
01947         }
01948 
01949         unique_ptr<ACE_Reactor> reactor_;
01950         ACE_Thread_Mutex mtx_;
01951       } reactor_runner_;
01952 
01953       DomainParticipantMap participants_;
01954       OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, TopicDetails) ) topics_;
01955       OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, unsigned int) ) topic_use_;
01956     };
01957 
01958   } // namespace DCPS
01959 } // namespace OpenDDS
01960 
01961 OPENDDS_END_VERSIONED_NAMESPACE_DECL
01962 
01963 #endif /* OPENDDS_DCPS_DISCOVERYBASE_H  */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1