DiscoveryBase.h

Go to the documentation of this file.
00001 /*
00002  * Distributed under the OpenDDS License.
00003  * See: http://www.opendds.org/license.html
00004  */
00005 
00006 #ifndef OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00007 #define OPENDDS_DDS_DCPS_DISCOVERYBASE_H
00008 
00009 #include "dds/DCPS/Discovery.h"
00010 #include "dds/DCPS/GuidUtils.h"
00011 #include "dds/DCPS/DCPS_Utils.h"
00012 #include "dds/DCPS/DomainParticipantImpl.h"
00013 #include "dds/DCPS/Marked_Default_Qos.h"
00014 #include "dds/DCPS/SubscriberImpl.h"
00015 #include "dds/DCPS/BuiltInTopicUtils.h"
00016 #include "dds/DCPS/Registered_Data_Types.h"
00017 #include "dds/DdsDcpsCoreTypeSupportImpl.h"
00018 #include "ace/Select_Reactor.h"
00019 
00020 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00021 #pragma once
00022 #endif /* ACE_LACKS_PRAGMA_ONCE */
00023 
00024 namespace OpenDDS {
00025   namespace DCPS {
00026 
00027     inline void assign(DCPS::EntityKey_t& lhs, unsigned int rhs)
00028     {
00029       lhs[0] = static_cast<CORBA::Octet>(rhs);
00030       lhs[1] = static_cast<CORBA::Octet>(rhs >> 8);
00031       lhs[2] = static_cast<CORBA::Octet>(rhs >> 16);
00032     }
00033 
00034     struct DcpsUpcalls : ACE_Task_Base {
00035       DcpsUpcalls(DCPS::DataReaderCallbacks* drr,
00036                   const RepoId& reader,
00037                   const DCPS::WriterAssociation& wa,
00038                   bool active,
00039                   DCPS::DataWriterCallbacks* dwr)
00040         : drr_(drr), reader_(reader), wa_(wa), active_(active), dwr_(dwr)
00041         , reader_done_(false), writer_done_(false), cnd_(mtx_)
00042       {}
00043 
00044       int svc()
00045       {
00046         drr_->add_association(reader_, wa_, active_);
00047         {
00048           ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mtx_, -1);
00049           reader_done_ = true;
00050           cnd_.signal();
00051           while (!writer_done_) {
00052             cnd_.wait();
00053           }
00054         }
00055         dwr_->association_complete(reader_);
00056         return 0;
00057       }
00058 
00059       void writer_done()
00060       {
00061         {
00062           ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
00063           writer_done_ = true;
00064           cnd_.signal();
00065         }
00066         wait();
00067       }
00068 
00069       DCPS::DataReaderCallbacks* const drr_;
00070       const RepoId& reader_;
00071       const DCPS::WriterAssociation& wa_;
00072       bool active_;
00073       DCPS::DataWriterCallbacks* const dwr_;
00074       bool reader_done_, writer_done_;
00075       ACE_Thread_Mutex mtx_;
00076       ACE_Condition_Thread_Mutex cnd_;
00077     };
00078 
00079     template <typename DiscoveredParticipantData_>
00080     class EndpointManager {
00081     protected:
00082       struct DiscoveredSubscription {
00083         DiscoveredSubscription() : bit_ih_(DDS::HANDLE_NIL) {}
00084         explicit DiscoveredSubscription(const OpenDDS::DCPS::DiscoveredReaderData& r)
00085           : reader_data_(r), bit_ih_(DDS::HANDLE_NIL) {}
00086         OpenDDS::DCPS::DiscoveredReaderData reader_data_;
00087         DDS::InstanceHandle_t bit_ih_;
00088       };
00089       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredSubscription,
00090                               DCPS::GUID_tKeyLessThan) DiscoveredSubscriptionMap;
00091       typedef typename DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
00092 
00093       struct DiscoveredPublication {
00094         DiscoveredPublication() : bit_ih_(DDS::HANDLE_NIL) {}
00095         explicit DiscoveredPublication(const OpenDDS::DCPS::DiscoveredWriterData& w)
00096           : writer_data_(w), bit_ih_(DDS::HANDLE_NIL) {}
00097         OpenDDS::DCPS::DiscoveredWriterData writer_data_;
00098         DDS::InstanceHandle_t bit_ih_;
00099       };
00100 
00101       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredPublication,
00102                               DCPS::GUID_tKeyLessThan) DiscoveredPublicationMap;
00103       typedef typename DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
00104 
00105     public:
00106       typedef DiscoveredParticipantData_ DiscoveredParticipantData;
00107 
00108       struct TopicDetails {
00109         OPENDDS_STRING data_type_;
00110         DDS::TopicQos qos_;
00111         DCPS::RepoId repo_id_;
00112         bool has_dcps_key_;
00113         RepoIdSet endpoints_;
00114       };
00115 
00116       EndpointManager(const RepoId& participant_id, ACE_Thread_Mutex& lock)
00117         : lock_(lock)
00118         , participant_id_(participant_id)
00119         , publication_counter_(0)
00120         , subscription_counter_(0)
00121         , topic_counter_(0)
00122       { }
00123 
00124       virtual ~EndpointManager() { }
00125 
00126       RepoId bit_key_to_repo_id(const char* bit_topic_name,
00127                                 const DDS::BuiltinTopicKey_t& key)
00128       {
00129         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00130         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PUBLICATION_TOPIC)) {
00131           return pub_key_to_id_[key];
00132         }
00133         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_SUBSCRIPTION_TOPIC)) {
00134           return sub_key_to_id_[key];
00135         }
00136         return RepoId();
00137       }
00138 
00139       void ignore(const DCPS::RepoId& to_ignore)
00140       {
00141         // Locked prior to call from Spdp.
00142         ignored_guids_.insert(to_ignore);
00143         {
00144           const DiscoveredPublicationIter iter =
00145             discovered_publications_.find(to_ignore);
00146           if (iter != discovered_publications_.end()) {
00147             // clean up tracking info
00148             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00149             remove_from_bit(iter->second);
00150             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00151             discovered_publications_.erase(iter);
00152             // break associations
00153             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00154               topics_.find(topic_name);
00155             if (top_it != topics_.end()) {
00156               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00157             }
00158             return;
00159           }
00160         }
00161         {
00162           const DiscoveredSubscriptionIter iter =
00163             discovered_subscriptions_.find(to_ignore);
00164           if (iter != discovered_subscriptions_.end()) {
00165             // clean up tracking info
00166             topics_[get_topic_name(iter->second)].endpoints_.erase(iter->first);
00167             remove_from_bit(iter->second);
00168             OPENDDS_STRING topic_name = get_topic_name(iter->second);
00169             discovered_subscriptions_.erase(iter);
00170             // break associations
00171             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00172               topics_.find(topic_name);
00173             if (top_it != topics_.end()) {
00174               match_endpoints(to_ignore, top_it->second, true /*remove*/);
00175             }
00176             return;
00177           }
00178         }
00179         {
00180           const OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan)::iterator
00181             iter = topic_names_.find(to_ignore);
00182           if (iter != topic_names_.end()) {
00183             ignored_topics_.insert(iter->second);
00184             // Remove all publications and subscriptions on this topic
00185             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00186               topics_.find(iter->second);
00187             if (top_it != topics_.end()) {
00188               TopicDetails& td = top_it->second;
00189               RepoIdSet::iterator ep;
00190               for (ep = td.endpoints_.begin(); ep!= td.endpoints_.end(); ++ep) {
00191                 match_endpoints(*ep, td, true /*remove*/);
00192                 if (shutting_down()) { return; }
00193               }
00194             }
00195           }
00196         }
00197       }
00198 
00199       bool ignoring(const DCPS::RepoId& guid) const {
00200         return ignored_guids_.count(guid);
00201       }
00202       bool ignoring(const char* topic_name) const {
00203         return ignored_topics_.count(topic_name);
00204       }
00205 
00206       DCPS::TopicStatus assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00207                                      const char* dataTypeName, const DDS::TopicQos& qos,
00208                                      bool hasDcpsKey)
00209       {
00210         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00211         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
00212           topics_.find(topicName);
00213         if (iter != topics_.end()) { // types must match, RtpsDiscovery checked for us
00214           iter->second.qos_ = qos;
00215           iter->second.has_dcps_key_ = hasDcpsKey;
00216           topicId = iter->second.repo_id_;
00217           topic_names_[iter->second.repo_id_] = topicName;
00218           return DCPS::FOUND;
00219         }
00220 
00221         TopicDetails& td = topics_[topicName];
00222         td.data_type_ = dataTypeName;
00223         td.qos_ = qos;
00224         td.has_dcps_key_ = hasDcpsKey;
00225         td.repo_id_ = make_topic_guid();
00226         topicId = td.repo_id_;
00227         topic_names_[td.repo_id_] = topicName;
00228 
00229         return DCPS::CREATED;
00230       }
00231 
00232       DCPS::TopicStatus remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00233       {
00234         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
00235         name = topic_names_[topicId];
00236         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00237           topics_.find(name);
00238         if (top_it != topics_.end()) {
00239           TopicDetails& td = top_it->second;
00240           if (td.endpoints_.empty()) {
00241             topics_.erase(name);
00242           }
00243         }
00244 
00245         topic_names_.erase(topicId);
00246         return DCPS::REMOVED;
00247       }
00248 
00249       virtual bool update_topic_qos(const DCPS::RepoId& topicId, const DDS::TopicQos& qos,
00250                                     OPENDDS_STRING& name) = 0;
00251 
00252       DCPS::RepoId add_publication(const DCPS::RepoId& topicId,
00253                                    DCPS::DataWriterCallbacks* publication,
00254                                    const DDS::DataWriterQos& qos,
00255                                    const DCPS::TransportLocatorSeq& transInfo,
00256                                    const DDS::PublisherQos& publisherQos)
00257       {
00258         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00259         RepoId rid = participant_id_;
00260         assign_publication_key(rid, topicId, qos);
00261         LocalPublication& pb = local_publications_[rid];
00262         pb.topic_id_ = topicId;
00263         pb.publication_ = publication;
00264         pb.qos_ = qos;
00265         pb.trans_info_ = transInfo;
00266         pb.publisher_qos_ = publisherQos;
00267         TopicDetails& td = topics_[topic_names_[topicId]];
00268         td.endpoints_.insert(rid);
00269 
00270         if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
00271           return RepoId();
00272         }
00273 
00274         if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
00275           return RepoId();
00276         }
00277 
00278         if (DCPS::DCPS_debug_level > 3) {
00279           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_publication - ")
00280                      ACE_TEXT("calling match_endpoints\n")));
00281         }
00282         match_endpoints(rid, td);
00283 
00284         return rid;
00285       }
00286 
00287       void remove_publication(const DCPS::RepoId& publicationId)
00288       {
00289         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00290         LocalPublicationIter iter = local_publications_.find(publicationId);
00291         if (iter != local_publications_.end()) {
00292           if (DDS::RETCODE_OK == remove_publication_i(publicationId))
00293             {
00294               OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00295               local_publications_.erase(publicationId);
00296               typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00297                 topics_.find(topic_name);
00298               if (top_it != topics_.end()) {
00299                 match_endpoints(publicationId, top_it->second, true /*remove*/);
00300                 top_it->second.endpoints_.erase(publicationId);
00301               }
00302             } else {
00303             ACE_DEBUG((LM_ERROR,
00304                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_publication - ")
00305                        ACE_TEXT("Failed to publish dispose msg\n")));
00306           }
00307         }
00308       }
00309 
00310       virtual bool update_publication_qos(const DCPS::RepoId& publicationId,
00311                                           const DDS::DataWriterQos& qos,
00312                                           const DDS::PublisherQos& publisherQos) = 0;
00313 
00314       DCPS::RepoId add_subscription(const DCPS::RepoId& topicId,
00315                                     DCPS::DataReaderCallbacks* subscription,
00316                                     const DDS::DataReaderQos& qos,
00317                                     const DCPS::TransportLocatorSeq& transInfo,
00318                                     const DDS::SubscriberQos& subscriberQos,
00319                                     const char* filterClassName,
00320                                     const char* filterExpr,
00321                                     const DDS::StringSeq& params)
00322       {
00323         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, RepoId());
00324         RepoId rid = participant_id_;
00325         assign_subscription_key(rid, topicId, qos);
00326         LocalSubscription& sb = local_subscriptions_[rid];
00327         sb.topic_id_ = topicId;
00328         sb.subscription_ = subscription;
00329         sb.qos_ = qos;
00330         sb.trans_info_ = transInfo;
00331         sb.subscriber_qos_ = subscriberQos;
00332         sb.filterProperties.filterClassName = filterClassName;
00333         sb.filterProperties.filterExpression = filterExpr;
00334         sb.filterProperties.expressionParameters = params;
00335 
00336         TopicDetails& td = topics_[topic_names_[topicId]];
00337         td.endpoints_.insert(rid);
00338 
00339         if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
00340           return RepoId();
00341         }
00342 
00343         if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
00344           return RepoId();
00345         }
00346 
00347         if (DCPS::DCPS_debug_level > 3) {
00348           ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::add_subscription - ")
00349                      ACE_TEXT("calling match_endpoints\n")));
00350         }
00351         match_endpoints(rid, td);
00352 
00353         return rid;
00354       }
00355 
00356       void remove_subscription(const DCPS::RepoId& subscriptionId)
00357       {
00358         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00359         LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
00360         if (iter != local_subscriptions_.end()) {
00361           if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId)
00362               /*subscriptions_writer_.write_unregister_dispose(subscriptionId)*/) {
00363             OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
00364             local_subscriptions_.erase(subscriptionId);
00365             typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator top_it =
00366               topics_.find(topic_name);
00367             if (top_it != topics_.end()) {
00368               match_endpoints(subscriptionId, top_it->second, true /*remove*/);
00369               top_it->second.endpoints_.erase(subscriptionId);
00370             }
00371           } else {
00372             ACE_DEBUG((LM_ERROR,
00373                        ACE_TEXT("(%P|%t) ERROR: EndpointManager::remove_subscription - ")
00374                        ACE_TEXT("Failed to publish dispose msg\n")));
00375           }
00376         }
00377       }
00378 
00379       virtual bool update_subscription_qos(const DCPS::RepoId& subscriptionId,
00380                                            const DDS::DataReaderQos& qos,
00381                                            const DDS::SubscriberQos& subscriberQos) = 0;
00382 
00383       virtual bool update_subscription_params(const DCPS::RepoId& subId,
00384                                               const DDS::StringSeq& params) = 0;
00385 
00386       virtual void association_complete(const DCPS::RepoId& localId,
00387                                         const DCPS::RepoId& remoteId) = 0;
00388 
00389       virtual bool disassociate(const DiscoveredParticipantData& pdata) = 0;
00390 
00391     protected:
00392       struct LocalEndpoint {
00393         LocalEndpoint() : topic_id_(DCPS::GUID_UNKNOWN), sequence_(DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {}
00394         DCPS::RepoId topic_id_;
00395         DCPS::TransportLocatorSeq trans_info_;
00396         RepoIdSet matched_endpoints_;
00397         DCPS::SequenceNumber sequence_;
00398         RepoIdSet remote_opendds_associations_;
00399       };
00400 
00401       struct LocalPublication : LocalEndpoint {
00402         DCPS::DataWriterCallbacks* publication_;
00403         DDS::DataWriterQos qos_;
00404         DDS::PublisherQos publisher_qos_;
00405       };
00406 
00407       struct LocalSubscription : LocalEndpoint {
00408         DCPS::DataReaderCallbacks* subscription_;
00409         DDS::DataReaderQos qos_;
00410         DDS::SubscriberQos subscriber_qos_;
00411         OpenDDS::DCPS::ContentFilterProperty_t filterProperties;
00412       };
00413 
00414       typedef OPENDDS_MAP_CMP(DDS::BuiltinTopicKey_t, DCPS::RepoId,
00415                               DCPS::BuiltinTopicKeyLess) BitKeyMap;
00416 
00417       typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalPublication,
00418                               DCPS::GUID_tKeyLessThan) LocalPublicationMap;
00419       typedef typename LocalPublicationMap::iterator LocalPublicationIter;
00420       typedef typename LocalPublicationMap::const_iterator LocalPublicationCIter;
00421 
00422       typedef OPENDDS_MAP_CMP(DCPS::RepoId, LocalSubscription,
00423                               DCPS::GUID_tKeyLessThan) LocalSubscriptionMap;
00424       typedef typename LocalSubscriptionMap::iterator LocalSubscriptionIter;
00425       typedef typename LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
00426 
00427       typedef typename OPENDDS_MAP_CMP(DCPS::RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TopicNameMap;
00428 
00429       static const char* get_topic_name(const DiscoveredPublication& pub) {
00430         return pub.writer_data_.ddsPublicationData.topic_name;
00431       }
00432       static const char* get_topic_name(const DiscoveredSubscription& sub) {
00433         return sub.reader_data_.ddsSubscriptionData.topic_name;
00434       }
00435       static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication& pub) {
00436         return pub.writer_data_.ddsPublicationData.key;
00437       }
00438       static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription& sub) {
00439         return sub.reader_data_.ddsSubscriptionData.key;
00440       }
00441 
00442       virtual void remove_from_bit_i(const DiscoveredPublication& /*pub*/) { }
00443       virtual void remove_from_bit_i(const DiscoveredSubscription& /*sub*/) { }
00444 
00445       virtual void assign_publication_key(RepoId& rid,
00446                                           const RepoId& topicId,
00447                                           const DDS::DataWriterQos& /*qos*/) {
00448         rid.entityId.entityKind =
00449           has_dcps_key(topicId)
00450           ? DCPS::ENTITYKIND_USER_WRITER_WITH_KEY
00451           : DCPS::ENTITYKIND_USER_WRITER_NO_KEY;
00452         assign(rid.entityId.entityKey, publication_counter_++);
00453       }
00454       virtual void assign_subscription_key(RepoId& rid,
00455                                            const RepoId& topicId,
00456                                            const DDS::DataReaderQos& /*qos*/) {
00457         rid.entityId.entityKind =
00458           has_dcps_key(topicId)
00459           ? DCPS::ENTITYKIND_USER_READER_WITH_KEY
00460           : DCPS::ENTITYKIND_USER_READER_NO_KEY;
00461         assign(rid.entityId.entityKey, subscription_counter_++);
00462       }
00463       virtual void assign_topic_key(RepoId& guid) {
00464         assign(guid.entityId.entityKey, topic_counter_++);
00465 
00466         if (topic_counter_ == 0x1000000) {
00467           ACE_DEBUG((LM_ERROR,
00468                      ACE_TEXT("(%P|%t) ERROR: EndpointManager::make_topic_guid: ")
00469                      ACE_TEXT("Exceeded Maximum number of topic entity keys!")
00470                      ACE_TEXT("Next key will be a duplicate!\n")));
00471           topic_counter_ = 0;
00472         }
00473       }
00474 
00475       virtual DDS::ReturnCode_t add_publication_i(const DCPS::RepoId& /*rid*/,
00476                                                   LocalPublication& /*pub*/) { return DDS::RETCODE_OK; }
00477       virtual DDS::ReturnCode_t write_publication_data(const DCPS::RepoId& /*rid*/,
00478                                                        LocalPublication& /*pub*/,
00479                                                        const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00480       virtual DDS::ReturnCode_t remove_publication_i(const RepoId& publicationId) = 0;
00481 
00482       virtual DDS::ReturnCode_t add_subscription_i(const DCPS::RepoId& /*rid*/,
00483                                                    LocalSubscription& /*pub*/) { return DDS::RETCODE_OK; };
00484       virtual DDS::ReturnCode_t write_subscription_data(const DCPS::RepoId& /*rid*/,
00485                                                         LocalSubscription& /*pub*/,
00486                                                         const DCPS::RepoId& reader = DCPS::GUID_UNKNOWN) { ACE_UNUSED_ARG(reader); return DDS::RETCODE_OK; }
00487       virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& subscriptionId) = 0;
00488 
00489       void match_endpoints(DCPS::RepoId repoId, const TopicDetails& td,
00490                            bool remove = false)
00491       {
00492         const bool reader = repoId.entityId.entityKind & 4;
00493         // Copy the endpoint set - lock can be released in match()
00494         RepoIdSet endpoints_copy = td.endpoints_;
00495 
00496         for (RepoIdSet::const_iterator iter = endpoints_copy.begin();
00497              iter != endpoints_copy.end(); ++iter) {
00498           // check to make sure it's a Reader/Writer or Writer/Reader match
00499           if (bool(iter->entityId.entityKind & 4) != reader) {
00500             if (remove) {
00501               remove_assoc(*iter, repoId);
00502             } else {
00503               match(reader ? *iter : repoId, reader ? repoId : *iter);
00504             }
00505           }
00506         }
00507       }
00508 
00509       void
00510       remove_assoc(const RepoId& remove_from,
00511                    const RepoId& removing)
00512       {
00513         const bool reader = remove_from.entityId.entityKind & 4;
00514         if (reader) {
00515           const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
00516           if (lsi != local_subscriptions_.end()) {
00517             lsi->second.matched_endpoints_.erase(removing);
00518             DCPS::WriterIdSeq writer_seq(1);
00519             writer_seq.length(1);
00520             writer_seq[0] = removing;
00521             lsi->second.remote_opendds_associations_.erase(removing);
00522             lsi->second.subscription_->remove_associations(writer_seq,
00523                                                            false /*notify_lost*/);
00524             // Update writer
00525             write_subscription_data(remove_from, lsi->second);
00526           }
00527 
00528         } else {
00529           const LocalPublicationIter lpi = local_publications_.find(remove_from);
00530           if (lpi != local_publications_.end()) {
00531             lpi->second.matched_endpoints_.erase(removing);
00532             DCPS::ReaderIdSeq reader_seq(1);
00533             reader_seq.length(1);
00534             reader_seq[0] = removing;
00535             lpi->second.remote_opendds_associations_.erase(removing);
00536             lpi->second.publication_->remove_associations(reader_seq,
00537                                                           false /*notify_lost*/);
00538           }
00539         }
00540       }
00541 
00542       void
00543       match(const RepoId& writer, const RepoId& reader)
00544       {
00545         // 0. For discovered endpoints, we'll have the QoS info in the form of the
00546         // publication or subscription BIT data which doesn't use the same structures
00547         // for QoS.  In those cases we can copy the individual QoS policies to temp
00548         // QoS structs:
00549         DDS::DataWriterQos tempDwQos;
00550         DDS::PublisherQos tempPubQos;
00551         DDS::DataReaderQos tempDrQos;
00552         DDS::SubscriberQos tempSubQos;
00553         ContentFilterProperty_t tempCfp;
00554 
00555         // 1. collect details about the writer, which may be local or discovered
00556         const DDS::DataWriterQos* dwQos = 0;
00557         const DDS::PublisherQos* pubQos = 0;
00558         DCPS::TransportLocatorSeq* wTls = 0;
00559 
00560         const LocalPublicationIter lpi = local_publications_.find(writer);
00561         DiscoveredPublicationIter dpi;
00562         bool writer_local = false, already_matched = false;
00563         if (lpi != local_publications_.end()) {
00564           writer_local = true;
00565           dwQos = &lpi->second.qos_;
00566           pubQos = &lpi->second.publisher_qos_;
00567           wTls = &lpi->second.trans_info_;
00568           already_matched = lpi->second.matched_endpoints_.count(reader);
00569         } else if ((dpi = discovered_publications_.find(writer))
00570                    != discovered_publications_.end()) {
00571           wTls = &dpi->second.writer_data_.writerProxy.allLocators;
00572         } else {
00573           return; // Possible and ok, since lock is released
00574         }
00575 
00576         // 2. collect details about the reader, which may be local or discovered
00577         const DDS::DataReaderQos* drQos = 0;
00578         const DDS::SubscriberQos* subQos = 0;
00579         DCPS::TransportLocatorSeq* rTls = 0;
00580         const ContentFilterProperty_t* cfProp = 0;
00581 
00582         const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
00583         DiscoveredSubscriptionIter dsi;
00584         bool reader_local = false;
00585         if (lsi != local_subscriptions_.end()) {
00586           reader_local = true;
00587           drQos = &lsi->second.qos_;
00588           subQos = &lsi->second.subscriber_qos_;
00589           rTls = &lsi->second.trans_info_;
00590           if (lsi->second.filterProperties.filterExpression[0] != 0) {
00591             tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
00592             tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
00593           }
00594           cfProp = &tempCfp;
00595           if (!already_matched) {
00596             already_matched = lsi->second.matched_endpoints_.count(writer);
00597           }
00598         } else if ((dsi = discovered_subscriptions_.find(reader))
00599                    != discovered_subscriptions_.end()) {
00600           if (!writer_local) {
00601             // this is a discovered/discovered match, nothing for us to do
00602             return;
00603           }
00604           rTls = &dsi->second.reader_data_.readerProxy.allLocators;
00605 
00606           populate_transport_locator_sequence(rTls, dsi, reader);
00607 
00608           const DDS::SubscriptionBuiltinTopicData& bit =
00609             dsi->second.reader_data_.ddsSubscriptionData;
00610           tempDrQos.durability = bit.durability;
00611           tempDrQos.deadline = bit.deadline;
00612           tempDrQos.latency_budget = bit.latency_budget;
00613           tempDrQos.liveliness = bit.liveliness;
00614           tempDrQos.reliability = bit.reliability;
00615           tempDrQos.destination_order = bit.destination_order;
00616           tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00617           tempDrQos.resource_limits =
00618             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00619           tempDrQos.user_data = bit.user_data;
00620           tempDrQos.ownership = bit.ownership;
00621           tempDrQos.time_based_filter = bit.time_based_filter;
00622           tempDrQos.reader_data_lifecycle =
00623             TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
00624           drQos = &tempDrQos;
00625           tempSubQos.presentation = bit.presentation;
00626           tempSubQos.partition = bit.partition;
00627           tempSubQos.group_data = bit.group_data;
00628           tempSubQos.entity_factory =
00629             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00630           subQos = &tempSubQos;
00631           cfProp = &dsi->second.reader_data_.contentFilterProperty;
00632         } else {
00633           return; // Possible and ok, since lock is released
00634         }
00635 
00636         // This is really part of step 1, but we're doing it here just in case we
00637         // are in the discovered/discovered match and we don't need the QoS data.
00638         if (!writer_local) {
00639           const DDS::PublicationBuiltinTopicData& bit =
00640             dpi->second.writer_data_.ddsPublicationData;
00641           tempDwQos.durability = bit.durability;
00642           tempDwQos.durability_service = bit.durability_service;
00643           tempDwQos.deadline = bit.deadline;
00644           tempDwQos.latency_budget = bit.latency_budget;
00645           tempDwQos.liveliness = bit.liveliness;
00646           tempDwQos.reliability = bit.reliability;
00647           tempDwQos.destination_order = bit.destination_order;
00648           tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
00649           tempDwQos.resource_limits =
00650             TheServiceParticipant->initial_ResourceLimitsQosPolicy();
00651           tempDwQos.transport_priority =
00652             TheServiceParticipant->initial_TransportPriorityQosPolicy();
00653           tempDwQos.lifespan = bit.lifespan;
00654           tempDwQos.user_data = bit.user_data;
00655           tempDwQos.ownership = bit.ownership;
00656           tempDwQos.ownership_strength = bit.ownership_strength;
00657           tempDwQos.writer_data_lifecycle =
00658             TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
00659           dwQos = &tempDwQos;
00660           tempPubQos.presentation = bit.presentation;
00661           tempPubQos.partition = bit.partition;
00662           tempPubQos.group_data = bit.group_data;
00663           tempPubQos.entity_factory =
00664             TheServiceParticipant->initial_EntityFactoryQosPolicy();
00665           pubQos = &tempPubQos;
00666 
00667           populate_transport_locator_sequence(wTls, dpi, writer);
00668         }
00669 
00670         // Need to release lock, below, for callbacks into DCPS which could
00671         // call into Spdp/Sedp.  Note that this doesn't unlock, it just constructs
00672         // an ACE object which will be used below for unlocking.
00673         ACE_Reverse_Lock<ACE_Thread_Mutex> rev_lock(lock_);
00674 
00675         // 3. check transport and QoS compatibility
00676 
00677         // Copy entries from local publication and local subscription maps
00678         // prior to releasing lock
00679         DCPS::DataWriterCallbacks* dwr = 0;
00680         DCPS::DataReaderCallbacks* drr = 0;
00681         if (writer_local) {
00682           dwr = lpi->second.publication_;
00683         }
00684         if (reader_local) {
00685           drr = lsi->second.subscription_;
00686         }
00687 
00688         DCPS::IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00689         DCPS::IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
00690 
00691         if (DCPS::compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
00692                                 dwQos, drQos, pubQos, subQos)) {
00693           if (!writer_local) {
00694             RepoId writer_participant = writer;
00695             writer_participant.entityId = ENTITYID_PARTICIPANT;
00696             if (defer_writer(writer, writer_participant)) {
00697               return;
00698             }
00699           }
00700           if (!reader_local) {
00701             RepoId reader_participant = reader;
00702             reader_participant.entityId = ENTITYID_PARTICIPANT;
00703             if (defer_reader(reader, reader_participant)) {
00704               return;
00705             }
00706           }
00707 
00708           bool call_writer = false, call_reader = false;
00709           if (writer_local) {
00710             call_writer = lpi->second.matched_endpoints_.insert(reader).second;
00711           }
00712           if (reader_local) {
00713             call_reader = lsi->second.matched_endpoints_.insert(writer).second;
00714           }
00715           if (!call_writer && !call_reader) {
00716             return; // nothing more to do
00717           }
00718           // Copy reader and writer association data prior to releasing lock
00719 #ifdef __SUNPRO_CC
00720           DCPS::ReaderAssociation ra;
00721           ra.readerTransInfo = *rTls;
00722           ra.readerId = reader;
00723           ra.subQos = *subQos;
00724           ra.readerQos = *drQos;
00725           ra.filterClassName = cfProp->filterClassName;
00726           ra.filterExpression = cfProp->filterExpression;
00727           ra.exprParams = cfProp->expressionParameters;
00728           DCPS::WriterAssociation wa;
00729           wa.writerTransInfo = *wTls;
00730           wa.writerId = writer;
00731           wa.pubQos = *pubQos;
00732           wa.writerQos = *dwQos;
00733 #else
00734           const DCPS::ReaderAssociation ra =
00735             {*rTls, reader, *subQos, *drQos,
00736 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
00737              cfProp->filterClassName, cfProp->filterExpression,
00738 #else
00739              "", "",
00740 #endif
00741              cfProp->expressionParameters};
00742 
00743           const DCPS::WriterAssociation wa = {*wTls, writer, *pubQos, *dwQos};
00744 #endif
00745 
00746           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00747           static const bool writer_active = true;
00748 
00749           if (call_writer) {
00750             if (DCPS::DCPS_debug_level > 3) {
00751               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00752                          ACE_TEXT("adding writer association\n")));
00753             }
00754             DcpsUpcalls thr(drr, reader, wa, !writer_active, dwr);
00755             if (call_reader) {
00756               thr.activate();
00757             }
00758             dwr->add_association(writer, ra, writer_active);
00759             if (call_reader) {
00760               thr.writer_done();
00761             }
00762 
00763           } else if (call_reader) {
00764             if (DCPS::DCPS_debug_level > 3) {
00765               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00766                          ACE_TEXT("adding reader association\n")));
00767             }
00768             drr->add_association(reader, wa, !writer_active);
00769           }
00770 
00771           // change this if 'writer_active' (above) changes
00772           if (call_writer && !call_reader && !is_opendds(reader)) {
00773             if (DCPS::DCPS_debug_level > 3) {
00774               ACE_DEBUG((LM_DEBUG,
00775                          ACE_TEXT("(%P|%t) EndpointManager::match - ")
00776                          ACE_TEXT("calling writer association_complete\n")));
00777             }
00778             dwr->association_complete(reader);
00779           }
00780 
00781         } else if (already_matched) { // break an existing associtaion
00782           if (writer_local) {
00783             lpi->second.matched_endpoints_.erase(reader);
00784             lpi->second.remote_opendds_associations_.erase(reader);
00785           }
00786           if (reader_local) {
00787             lsi->second.matched_endpoints_.erase(writer);
00788             lsi->second.remote_opendds_associations_.erase(writer);
00789           }
00790           ACE_GUARD(ACE_Reverse_Lock<ACE_Thread_Mutex>, rg, rev_lock);
00791           if (writer_local) {
00792             DCPS::ReaderIdSeq reader_seq(1);
00793             reader_seq.length(1);
00794             reader_seq[0] = reader;
00795             dwr->remove_associations(reader_seq, false /*notify_lost*/);
00796           }
00797           if (reader_local) {
00798             DCPS::WriterIdSeq writer_seq(1);
00799             writer_seq.length(1);
00800             writer_seq[0] = writer;
00801             drr->remove_associations(writer_seq, false /*notify_lost*/);
00802           }
00803 
00804         } else { // something was incompatible
00805           ACE_GUARD(ACE_Reverse_Lock< ACE_Thread_Mutex>, rg, rev_lock);
00806           if (writer_local && writerStatus.count_since_last_send) {
00807             if (DCPS::DCPS_debug_level > 3) {
00808               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00809                          ACE_TEXT("writer incompatible\n")));
00810             }
00811             dwr->update_incompatible_qos(writerStatus);
00812           }
00813           if (reader_local && readerStatus.count_since_last_send) {
00814             if (DCPS::DCPS_debug_level > 3) {
00815               ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) EndpointManager::match - ")
00816                          ACE_TEXT("reader incompatible\n")));
00817             }
00818             drr->update_incompatible_qos(readerStatus);
00819           }
00820         }
00821       }
00822 
00823       static bool is_opendds(const GUID_t& endpoint)
00824       {
00825         return !std::memcmp(endpoint.guidPrefix, DCPS::VENDORID_OCI,
00826                             sizeof(DCPS::VENDORID_OCI));
00827       }
00828 
00829       virtual bool shutting_down() const = 0;
00830 
00831       virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00832                                                        DiscoveredSubscriptionIter& iter,
00833                                                        const RepoId& reader) = 0;
00834 
00835       virtual void populate_transport_locator_sequence(DCPS::TransportLocatorSeq*& tls,
00836                                                        DiscoveredPublicationIter& iter,
00837                                                        const RepoId& reader) = 0;
00838 
00839       virtual bool defer_writer(const RepoId& writer,
00840                                 const RepoId& writer_participant) = 0;
00841 
00842       virtual bool defer_reader(const RepoId& writer,
00843                                 const RepoId& writer_participant) = 0;
00844 
00845       void remove_from_bit(const DiscoveredPublication& pub)
00846       {
00847         pub_key_to_id_.erase(get_key(pub));
00848         remove_from_bit_i(pub);
00849       }
00850 
00851       void remove_from_bit(const DiscoveredSubscription& sub)
00852       {
00853         sub_key_to_id_.erase(get_key(sub));
00854         remove_from_bit_i(sub);
00855       }
00856 
00857       RepoId make_topic_guid()
00858       {
00859         RepoId guid;
00860         guid = participant_id_;
00861         guid.entityId.entityKind = DCPS::ENTITYKIND_OPENDDS_TOPIC;
00862         assign_topic_key(guid);
00863         return guid;
00864       }
00865 
00866       bool has_dcps_key(const DCPS::RepoId& topicId) const
00867       {
00868         typedef OPENDDS_MAP_CMP(RepoId, OPENDDS_STRING, DCPS::GUID_tKeyLessThan) TNMap;
00869         TNMap::const_iterator tn = topic_names_.find(topicId);
00870         if (tn == topic_names_.end()) return false;
00871 
00872         typedef OPENDDS_MAP(OPENDDS_STRING, TopicDetails) TDMap;
00873         typename TDMap::const_iterator td = topics_.find(tn->second);
00874         if (td == topics_.end()) return false;
00875 
00876         return td->second.has_dcps_key_;
00877       }
00878 
00879       void
00880       increment_key(DDS::BuiltinTopicKey_t& key)
00881       {
00882         for (int idx = 0; idx < 3; ++idx) {
00883           CORBA::ULong ukey = static_cast<CORBA::ULong>(key.value[idx]);
00884           if (ukey == 0xFFFFFFFF) {
00885             key.value[idx] = 0;
00886           } else {
00887             ++ukey;
00888             key.value[idx] = ukey;
00889             return;
00890           }
00891         }
00892         ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) EndpointManager::increment_key - ")
00893                    ACE_TEXT("ran out of builtin topic keys\n")));
00894       }
00895 
00896       ACE_Thread_Mutex& lock_;
00897       DCPS::RepoId participant_id_;
00898       BitKeyMap pub_key_to_id_, sub_key_to_id_;
00899       RepoIdSet ignored_guids_;
00900       unsigned int publication_counter_, subscription_counter_, topic_counter_;
00901       LocalPublicationMap local_publications_;
00902       LocalSubscriptionMap local_subscriptions_;
00903       DiscoveredPublicationMap discovered_publications_;
00904       DiscoveredSubscriptionMap discovered_subscriptions_;
00905       OPENDDS_MAP(OPENDDS_STRING, TopicDetails) topics_;
00906       TopicNameMap topic_names_;
00907       OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
00908       DDS::BuiltinTopicKey_t pub_bit_key_, sub_bit_key_;
00909     };
00910 
00911     template <typename EndpointManagerType>
00912     class LocalParticipant : public DCPS::RcObject<ACE_SYNCH_MUTEX> {
00913     public:
00914       typedef typename EndpointManagerType::DiscoveredParticipantData DiscoveredParticipantData;
00915       typedef typename EndpointManagerType::TopicDetails TopicDetails;
00916 
00917       LocalParticipant (const DDS::DomainParticipantQos& qos)
00918         : qos_(qos)
00919       { }
00920 
00921       virtual ~LocalParticipant() { }
00922 
00923       DCPS::RepoId bit_key_to_repo_id(const char* bit_topic_name,
00924                                       const DDS::BuiltinTopicKey_t& key)
00925       {
00926         if (0 == std::strcmp(bit_topic_name, DCPS::BUILT_IN_PARTICIPANT_TOPIC)) {
00927           RepoId guid;
00928           std::memcpy(guid.guidPrefix, key.value, sizeof(DDS::BuiltinTopicKeyValue));
00929           guid.entityId = ENTITYID_PARTICIPANT;
00930           return guid;
00931 
00932         } else {
00933           return endpoint_manager().bit_key_to_repo_id(bit_topic_name, key);
00934         }
00935       }
00936 
00937       void ignore_domain_participant(const RepoId& ignoreId)
00938       {
00939         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00940         endpoint_manager().ignore(ignoreId);
00941 
00942         const DiscoveredParticipantIter iter = participants_.find(ignoreId);
00943         if (iter != participants_.end()) {
00944           remove_discovered_participant(iter);
00945         }
00946       }
00947 
00948       bool
00949       update_domain_participant_qos(const DDS::DomainParticipantQos& qos)
00950       {
00951         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
00952         qos_ = qos;
00953         return true;
00954       }
00955 
00956       DCPS::TopicStatus
00957       assert_topic(DCPS::RepoId_out topicId, const char* topicName,
00958                    const char* dataTypeName, const DDS::TopicQos& qos,
00959                    bool hasDcpsKey)
00960       {
00961         if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
00962           if (DCPS::DCPS_debug_level) {
00963             ACE_DEBUG((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
00964                        ACE_TEXT("topic or type name length limit (256) exceeded\n")));
00965           }
00966           return DCPS::PRECONDITION_NOT_MET;
00967         }
00968 
00969         return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey);
00970       }
00971 
00972       DCPS::TopicStatus
00973       remove_topic(const RepoId& topicId, OPENDDS_STRING& name)
00974       {
00975         return endpoint_manager().remove_topic(topicId, name);
00976       }
00977 
00978       void
00979       ignore_topic(const RepoId& ignoreId)
00980       {
00981         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
00982         endpoint_manager().ignore(ignoreId);
00983       }
00984 
00985       bool
00986       update_topic_qos(const RepoId& topicId, const DDS::TopicQos& qos,
00987                        OPENDDS_STRING& name)
00988       {
00989         return endpoint_manager().update_topic_qos(topicId, qos, name);
00990       }
00991 
00992       RepoId
00993       add_publication(const RepoId& topicId,
00994                       DCPS::DataWriterCallbacks* publication,
00995                       const DDS::DataWriterQos& qos,
00996                       const DCPS::TransportLocatorSeq& transInfo,
00997                       const DDS::PublisherQos& publisherQos)
00998       {
00999         return endpoint_manager().add_publication(topicId, publication, qos,
01000                                                   transInfo, publisherQos);
01001       }
01002 
01003       void
01004       remove_publication(const RepoId& publicationId)
01005       {
01006         endpoint_manager().remove_publication(publicationId);
01007       }
01008 
01009       void
01010       ignore_publication(const RepoId& ignoreId)
01011       {
01012         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01013         return endpoint_manager().ignore(ignoreId);
01014       }
01015 
01016       bool
01017       update_publication_qos(const RepoId& publicationId,
01018                              const DDS::DataWriterQos& qos,
01019                              const DDS::PublisherQos& publisherQos)
01020       {
01021         return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
01022       }
01023 
01024       RepoId
01025       add_subscription(const RepoId& topicId,
01026                        DCPS::DataReaderCallbacks* subscription,
01027                        const DDS::DataReaderQos& qos,
01028                        const DCPS::TransportLocatorSeq& transInfo,
01029                        const DDS::SubscriberQos& subscriberQos,
01030                        const char* filterClassName,
01031                        const char* filterExpr,
01032                        const DDS::StringSeq& params)
01033       {
01034         return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
01035                                                    subscriberQos, filterClassName, filterExpr, params);
01036       }
01037 
01038       void
01039       remove_subscription(const RepoId& subscriptionId)
01040       {
01041         endpoint_manager().remove_subscription(subscriptionId);
01042       }
01043 
01044       void
01045       ignore_subscription(const RepoId& ignoreId)
01046       {
01047         ACE_GUARD(ACE_Thread_Mutex, g, lock_);
01048         return endpoint_manager().ignore(ignoreId);
01049       }
01050 
01051       bool
01052       update_subscription_qos(const RepoId& subscriptionId,
01053                               const DDS::DataReaderQos& qos,
01054                               const DDS::SubscriberQos& subscriberQos)
01055       {
01056         return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
01057       }
01058 
01059       bool
01060       update_subscription_params(const RepoId& subId,
01061                                  const DDS::StringSeq& params)
01062       {
01063         return endpoint_manager().update_subscription_params(subId, params);
01064       }
01065 
01066       void
01067       association_complete(const RepoId& localId, const RepoId& remoteId)
01068       {
01069         endpoint_manager().association_complete(localId, remoteId);
01070       }
01071 
01072       DDS::Subscriber_var bit_subscriber() const { return bit_subscriber_; }
01073 
01074     protected:
01075 
01076       struct DiscoveredParticipant {
01077         DiscoveredParticipant() : bit_ih_(0) {}
01078         DiscoveredParticipant(const DiscoveredParticipantData& p,
01079                               const ACE_Time_Value& t)
01080           : pdata_(p), last_seen_(t), bit_ih_(DDS::HANDLE_NIL) {}
01081 
01082         DiscoveredParticipantData pdata_;
01083         ACE_Time_Value last_seen_;
01084         DDS::InstanceHandle_t bit_ih_;
01085       };
01086       typedef OPENDDS_MAP_CMP(DCPS::RepoId, DiscoveredParticipant,
01087                               DCPS::GUID_tKeyLessThan) DiscoveredParticipantMap;
01088       typedef typename DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
01089 
01090       virtual EndpointManagerType& endpoint_manager() = 0;
01091 
01092       void remove_discovered_participant(DiscoveredParticipantIter iter)
01093       {
01094         bool removed = endpoint_manager().disassociate(iter->second.pdata_);
01095         if (removed) {
01096 #ifndef DDS_HAS_MINIMUM_BIT
01097           DDS::ParticipantBuiltinTopicDataDataReaderImpl* bit = part_bit();
01098           // bit may be null if the DomainParticipant is shutting down
01099           if (bit && iter->second.bit_ih_ != DDS::HANDLE_NIL) {
01100             bit->set_instance_state(iter->second.bit_ih_,
01101                                     DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
01102           }
01103 #endif /* DDS_HAS_MINIMUM_BIT */
01104           if (DCPS::DCPS_debug_level > 3) {
01105             DCPS::GuidConverter conv(iter->first);
01106             ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) LocalParticipant::remove_discovered_participant")
01107                        ACE_TEXT(" - erasing %C\n"), OPENDDS_STRING(conv).c_str()));
01108           }
01109           participants_.erase(iter);
01110         }
01111       }
01112 
01113 #ifndef DDS_HAS_MINIMUM_BIT
01114       DDS::ParticipantBuiltinTopicDataDataReaderImpl* part_bit()
01115       {
01116         if (!bit_subscriber_.in())
01117           return 0;
01118 
01119         DDS::DataReader_var d =
01120           bit_subscriber_->lookup_datareader(DCPS::BUILT_IN_PARTICIPANT_TOPIC);
01121         return dynamic_cast<DDS::ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
01122       }
01123 #endif /* DDS_HAS_MINIMUM_BIT */
01124 
01125       ACE_Thread_Mutex lock_;
01126       DDS::Subscriber_var bit_subscriber_;
01127       DDS::DomainParticipantQos qos_;
01128       DiscoveredParticipantMap participants_;
01129     };
01130 
01131     template<typename Participant>
01132     class PeerDiscovery : public Discovery {
01133     public:
01134       typedef typename Participant::TopicDetails TopicDetails;
01135 
01136       explicit PeerDiscovery(const RepoKey& key) : Discovery(key) { }
01137 
01138       ~PeerDiscovery() {
01139         reactor_runner_.end();
01140       }
01141 
01142       virtual DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant) {
01143         using namespace DCPS;
01144         if (create_bit_topics(participant) != DDS::RETCODE_OK) {
01145           return 0;
01146         }
01147 
01148         DDS::Subscriber_var bit_subscriber =
01149           participant->create_subscriber(SUBSCRIBER_QOS_DEFAULT,
01150                                          DDS::SubscriberListener::_nil(),
01151                                          DEFAULT_STATUS_MASK);
01152         SubscriberImpl* sub = dynamic_cast<SubscriberImpl*>(bit_subscriber.in());
01153 
01154         DDS::DataReaderQos dr_qos;
01155         sub->get_default_datareader_qos(dr_qos);
01156         dr_qos.durability.kind = DDS::TRANSIENT_LOCAL_DURABILITY_QOS;
01157 
01158 #ifndef DDS_HAS_MINIMUM_BIT
01159         DDS::TopicDescription_var bit_part_topic =
01160           participant->lookup_topicdescription(BUILT_IN_PARTICIPANT_TOPIC);
01161         create_bit_dr(bit_part_topic, BUILT_IN_PARTICIPANT_TOPIC_TYPE,
01162                       sub, dr_qos);
01163 
01164         DDS::TopicDescription_var bit_topic_topic =
01165           participant->lookup_topicdescription(BUILT_IN_TOPIC_TOPIC);
01166         create_bit_dr(bit_topic_topic, BUILT_IN_TOPIC_TOPIC_TYPE,
01167                       sub, dr_qos);
01168 
01169         DDS::TopicDescription_var bit_pub_topic =
01170           participant->lookup_topicdescription(BUILT_IN_PUBLICATION_TOPIC);
01171         create_bit_dr(bit_pub_topic, BUILT_IN_PUBLICATION_TOPIC_TYPE,
01172                       sub, dr_qos);
01173 
01174         DDS::TopicDescription_var bit_sub_topic =
01175           participant->lookup_topicdescription(BUILT_IN_SUBSCRIPTION_TOPIC);
01176         create_bit_dr(bit_sub_topic, BUILT_IN_SUBSCRIPTION_TOPIC_TYPE,
01177                       sub, dr_qos);
01178 #endif /* DDS_HAS_MINIMUM_BIT */
01179 
01180         get_part(participant->get_domain_id(), participant->get_id())->init_bit(bit_subscriber);
01181 
01182         return bit_subscriber._retn();
01183       }
01184 
01185       virtual void fini_bit(DCPS::DomainParticipantImpl* participant)
01186       {
01187         get_part(participant->get_domain_id(), participant->get_id())->fini_bit();
01188       }
01189 
01190       virtual OpenDDS::DCPS::RepoId bit_key_to_repo_id(DCPS::DomainParticipantImpl* participant,
01191                                                        const char* bit_topic_name,
01192                                                        const DDS::BuiltinTopicKey_t& key) const
01193       {
01194         return get_part(participant->get_domain_id(), participant->get_id())
01195           ->bit_key_to_repo_id(bit_topic_name, key);
01196       }
01197 
01198       virtual bool attach_participant(DDS::DomainId_t /*domainId*/,
01199                                       const OpenDDS::DCPS::RepoId& /*participantId*/)
01200       {
01201         return false; // This is just for DCPSInfoRepo?
01202       }
01203 
01204       virtual bool remove_domain_participant(DDS::DomainId_t domain_id,
01205                                              const OpenDDS::DCPS::RepoId& participantId)
01206       {
01207         // Use reference counting to ensure participant
01208         // does not get deleted until lock as been released.
01209         ParticipantHandle participant;
01210         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01211         typename DomainParticipantMap::iterator domain = participants_.find(domain_id);
01212         if (domain == participants_.end()) {
01213           return false;
01214         }
01215         typename ParticipantMap::iterator part = domain->second.find(participantId);
01216         if (part == domain->second.end()) {
01217           return false;
01218         }
01219         participant = part->second;
01220         domain->second.erase(part);
01221         if (domain->second.empty()) {
01222           participants_.erase(domain);
01223         }
01224 
01225         return true;
01226       }
01227 
01228       virtual bool ignore_domain_participant(DDS::DomainId_t domain,
01229                                              const OpenDDS::DCPS::RepoId& myParticipantId,
01230                                              const OpenDDS::DCPS::RepoId& ignoreId)
01231       {
01232         get_part(domain, myParticipantId)->ignore_domain_participant(ignoreId);
01233         return true;
01234       }
01235 
01236       virtual bool update_domain_participant_qos(DDS::DomainId_t domain,
01237                                                  const OpenDDS::DCPS::RepoId& participant,
01238                                                  const DDS::DomainParticipantQos& qos)
01239       {
01240         return get_part(domain, participant)->update_domain_participant_qos(qos);
01241       }
01242 
01243       virtual DCPS::TopicStatus assert_topic(OpenDDS::DCPS::RepoId_out topicId,
01244                                              DDS::DomainId_t domainId,
01245                                              const OpenDDS::DCPS::RepoId& participantId,
01246                                              const char* topicName,
01247                                              const char* dataTypeName,
01248                                              const DDS::TopicQos& qos,
01249                                              bool hasDcpsKey)
01250       {
01251         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01252         typename OPENDDS_MAP(DDS::DomainId_t,
01253                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01254           topics_.find(domainId);
01255         if (topic_it != topics_.end()) {
01256           const typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator it =
01257             topic_it->second.find(topicName);
01258           if (it != topic_it->second.end()
01259               && it->second.data_type_ != dataTypeName) {
01260             topicId = GUID_UNKNOWN;
01261             return DCPS::CONFLICTING_TYPENAME;
01262           }
01263         }
01264 
01265         // Verified its safe to hold lock during call to assert_topic
01266         const DCPS::TopicStatus stat =
01267           participants_[domainId][participantId]->assert_topic(topicId, topicName,
01268                                                                dataTypeName, qos,
01269                                                                hasDcpsKey);
01270         if (stat == DCPS::CREATED || stat == DCPS::FOUND) { // qos change (FOUND)
01271           TopicDetails& td = topics_[domainId][topicName];
01272           td.data_type_ = dataTypeName;
01273           td.qos_ = qos;
01274           td.repo_id_ = topicId;
01275           ++topic_use_[domainId][topicName];
01276         }
01277         return stat;
01278       }
01279 
01280       virtual DCPS::TopicStatus find_topic(DDS::DomainId_t domainId, const char* topicName,
01281                                            CORBA::String_out dataTypeName, DDS::TopicQos_out qos,
01282                                            OpenDDS::DCPS::RepoId_out topicId)
01283       {
01284         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01285         typename OPENDDS_MAP(DDS::DomainId_t,
01286                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01287           topics_.find(domainId);
01288         if (topic_it == topics_.end()) {
01289           return DCPS::NOT_FOUND;
01290         }
01291         typename OPENDDS_MAP(OPENDDS_STRING, TopicDetails)::iterator iter =
01292           topic_it->second.find(topicName);
01293         if (iter == topic_it->second.end()) {
01294           return DCPS::NOT_FOUND;
01295         }
01296         TopicDetails& td = iter->second;
01297         dataTypeName = td.data_type_.c_str();
01298         qos = new DDS::TopicQos(td.qos_);
01299         topicId = td.repo_id_;
01300         ++topic_use_[domainId][topicName];
01301         return DCPS::FOUND;
01302       }
01303 
01304       virtual DCPS::TopicStatus remove_topic(DDS::DomainId_t domainId,
01305                                              const OpenDDS::DCPS::RepoId& participantId,
01306                                              const OpenDDS::DCPS::RepoId& topicId)
01307       {
01308         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, DCPS::INTERNAL_ERROR);
01309         typename OPENDDS_MAP(DDS::DomainId_t,
01310                              OPENDDS_MAP(OPENDDS_STRING, TopicDetails) )::iterator topic_it =
01311           topics_.find(domainId);
01312         if (topic_it == topics_.end()) {
01313           return DCPS::NOT_FOUND;
01314         }
01315 
01316         OPENDDS_STRING name;
01317         // Safe to hold lock while calling remove topic
01318         const DCPS::TopicStatus stat =
01319           participants_[domainId][participantId]->remove_topic(topicId, name);
01320 
01321         if (stat == DCPS::REMOVED) {
01322           if (0 == --topic_use_[domainId][name]) {
01323             topic_use_[domainId].erase(name);
01324             if (topic_it->second.empty()) {
01325               topic_use_.erase(domainId);
01326             }
01327             topic_it->second.erase(name);
01328             if (topic_it->second.empty()) {
01329               topics_.erase(topic_it);
01330             }
01331           }
01332         }
01333         return stat;
01334       }
01335 
01336       virtual bool ignore_topic(DDS::DomainId_t domainId, const OpenDDS::DCPS::RepoId& myParticipantId,
01337                                 const OpenDDS::DCPS::RepoId& ignoreId)
01338       {
01339         get_part(domainId, myParticipantId)->ignore_topic(ignoreId);
01340         return true;
01341       }
01342 
01343       virtual bool update_topic_qos(const OpenDDS::DCPS::RepoId& topicId, DDS::DomainId_t domainId,
01344                                     const OpenDDS::DCPS::RepoId& participantId, const DDS::TopicQos& qos)
01345       {
01346         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
01347         OPENDDS_STRING name;
01348         // Safe to hold lock while calling update_topic_qos
01349         if (participants_[domainId][participantId]->update_topic_qos(topicId,
01350                                                                      qos, name)) {
01351           topics_[domainId][name].qos_ = qos;
01352           return true;
01353         }
01354         return false;
01355       }
01356 
01357       virtual OpenDDS::DCPS::RepoId add_publication(DDS::DomainId_t domainId,
01358                                                     const OpenDDS::DCPS::RepoId& participantId,
01359                                                     const OpenDDS::DCPS::RepoId& topicId,
01360                                                     DCPS::DataWriterCallbacks* publication,
01361                                                     const DDS::DataWriterQos& qos,
01362                                                     const DCPS::TransportLocatorSeq& transInfo,
01363                                                     const DDS::PublisherQos& publisherQos)
01364       {
01365         return get_part(domainId, participantId)->add_publication(
01366                                                                   topicId, publication, qos, transInfo, publisherQos);
01367       }
01368 
01369       virtual bool remove_publication(DDS::DomainId_t domainId,
01370                                       const OpenDDS::DCPS::RepoId& participantId,
01371                                       const OpenDDS::DCPS::RepoId& publicationId)
01372       {
01373         get_part(domainId, participantId)->remove_publication(publicationId);
01374         return true;
01375       }
01376 
01377       virtual bool ignore_publication(DDS::DomainId_t domainId,
01378                                       const OpenDDS::DCPS::RepoId& participantId,
01379                                       const OpenDDS::DCPS::RepoId& ignoreId)
01380       {
01381         get_part(domainId, participantId)->ignore_publication(ignoreId);
01382         return true;
01383       }
01384 
01385       virtual bool update_publication_qos(DDS::DomainId_t domainId,
01386                                           const OpenDDS::DCPS::RepoId& partId,
01387                                           const OpenDDS::DCPS::RepoId& dwId,
01388                                           const DDS::DataWriterQos& qos,
01389                                           const DDS::PublisherQos& publisherQos)
01390       {
01391         return get_part(domainId, partId)->update_publication_qos(dwId, qos,
01392                                                                   publisherQos);
01393       }
01394 
01395       virtual OpenDDS::DCPS::RepoId add_subscription(DDS::DomainId_t domainId,
01396                                                      const OpenDDS::DCPS::RepoId& participantId,
01397                                                      const OpenDDS::DCPS::RepoId& topicId,
01398                                                      DCPS::DataReaderCallbacks* subscription,
01399                                                      const DDS::DataReaderQos& qos,
01400                                                      const DCPS::TransportLocatorSeq& transInfo,
01401                                                      const DDS::SubscriberQos& subscriberQos,
01402                                                      const char* filterClassName,
01403                                                      const char* filterExpr,
01404                                                      const DDS::StringSeq& params)
01405       {
01406         return get_part(domainId, participantId)->add_subscription(topicId, subscription, qos, transInfo, subscriberQos, filterClassName, filterExpr, params);
01407       }
01408 
01409       virtual bool remove_subscription(DDS::DomainId_t domainId,
01410                                        const OpenDDS::DCPS::RepoId& participantId,
01411                                        const OpenDDS::DCPS::RepoId& subscriptionId)
01412       {
01413         get_part(domainId, participantId)->remove_subscription(subscriptionId);
01414         return true;
01415       }
01416 
01417       virtual bool ignore_subscription(DDS::DomainId_t domainId,
01418                                        const OpenDDS::DCPS::RepoId& participantId,
01419                                        const OpenDDS::DCPS::RepoId& ignoreId)
01420       {
01421         get_part(domainId, participantId)->ignore_subscription(ignoreId);
01422         return true;
01423       }
01424 
01425       virtual bool update_subscription_qos(DDS::DomainId_t domainId,
01426                                            const OpenDDS::DCPS::RepoId& partId,
01427                                            const OpenDDS::DCPS::RepoId& drId,
01428                                            const DDS::DataReaderQos& qos,
01429                                            const DDS::SubscriberQos& subQos)
01430       {
01431         return get_part(domainId, partId)->update_subscription_qos(drId, qos, subQos);
01432       }
01433 
01434       virtual bool update_subscription_params(DDS::DomainId_t domainId,
01435                                               const OpenDDS::DCPS::RepoId& partId,
01436                                               const OpenDDS::DCPS::RepoId& subId,
01437                                               const DDS::StringSeq& params)
01438       {
01439         return get_part(domainId, partId)->update_subscription_params(subId, params);
01440       }
01441 
01442       virtual void association_complete(DDS::DomainId_t domainId,
01443                                         const OpenDDS::DCPS::RepoId& participantId,
01444                                         const OpenDDS::DCPS::RepoId& localId,
01445                                         const OpenDDS::DCPS::RepoId& remoteId)
01446       {
01447         get_part(domainId, participantId)->association_complete(localId, remoteId);
01448       }
01449 
01450       ACE_Reactor*
01451       reactor()
01452       {
01453         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, reactor_runner_.mtx_, 0);
01454         if (!reactor_runner_.reactor_) {
01455           reactor_runner_.reactor_ = new ACE_Reactor(new ACE_Select_Reactor, true);
01456           reactor_runner_.activate();
01457         }
01458         return reactor_runner_.reactor_;
01459       }
01460 
01461     protected:
01462 
01463       typedef DCPS::RcHandle<Participant> ParticipantHandle;
01464       typedef OPENDDS_MAP_CMP(DCPS::RepoId, ParticipantHandle, DCPS::GUID_tKeyLessThan) ParticipantMap;
01465       typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
01466 
01467       ParticipantHandle
01468         get_part(const DDS::DomainId_t domain_id,
01469                  const OpenDDS::DCPS::RepoId& part_id) const
01470       {
01471         ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, ParticipantHandle());
01472         typename DomainParticipantMap::const_iterator domain = participants_.find(domain_id);
01473         if (domain == participants_.end()) {
01474           return ParticipantHandle();
01475         }
01476         typename ParticipantMap::const_iterator part = domain->second.find(part_id);
01477         if (part == domain->second.end()) {
01478           return ParticipantHandle();
01479         }
01480         return part->second;
01481       }
01482 
01483       void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
01484                          DCPS::SubscriberImpl* sub,
01485                          const DDS::DataReaderQos& qos)
01486       {
01487         using namespace DCPS;
01488         TopicDescriptionImpl* bit_topic_i =
01489           dynamic_cast<TopicDescriptionImpl*>(topic);
01490 
01491         DDS::DomainParticipant_var participant = sub->get_participant();
01492         DomainParticipantImpl* participant_i =
01493           dynamic_cast<DomainParticipantImpl*>(participant.in());
01494 
01495         TypeSupport_var type_support =
01496           Registered_Data_Types->lookup(participant, type);
01497 
01498         DDS::DataReader_var dr = type_support->create_datareader();
01499         OpenDDS::DCPS::DataReaderImpl* dri = dynamic_cast<OpenDDS::DCPS::DataReaderImpl*>(dr.in());
01500 
01501         dri->init(bit_topic_i, qos, 0 /*listener*/, 0 /*mask*/,
01502                   participant_i, sub, dr);
01503         dri->disable_transport();
01504         dri->enable();
01505       }
01506 
01507       mutable ACE_Thread_Mutex lock_;
01508 
01509       // Before participants_ so destroyed after.
01510       struct ReactorRunner : ACE_Task_Base {
01511       ReactorRunner() : reactor_(0) {}
01512         ~ReactorRunner()
01513         {
01514           delete reactor_;
01515         }
01516 
01517         int svc()
01518         {
01519           reactor_->owner(ACE_Thread_Manager::instance()->thr_self());
01520           reactor_->run_reactor_event_loop();
01521           return 0;
01522         }
01523 
01524         void end()
01525         {
01526           ACE_GUARD(ACE_Thread_Mutex, g, mtx_);
01527           if (reactor_) {
01528             reactor_->end_reactor_event_loop();
01529             wait();
01530           }
01531         }
01532 
01533         ACE_Reactor* reactor_;
01534         ACE_Thread_Mutex mtx_;
01535       } reactor_runner_;
01536 
01537       DomainParticipantMap participants_;
01538       OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, TopicDetails) ) topics_;
01539       OPENDDS_MAP(DDS::DomainId_t, OPENDDS_MAP(OPENDDS_STRING, unsigned int) ) topic_use_;
01540     };
01541 
01542   } // namespace DCPS
01543 } // namespace OpenDDS
01544 
01545 #endif /* OPENDDS_DCPS_DISCOVERYBASE_H  */

Generated on Fri Feb 12 20:05:22 2016 for OpenDDS by  doxygen 1.4.7