StaticDiscovery.h

Go to the documentation of this file.
00001 /*
00002  * Distributed under the OpenDDS License.
00003  * See: http://www.opendds.org/license.html
00004  */
00005 
00006 #ifndef OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00007 #define OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00008 
00009 #include "dds/DCPS/DiscoveryBase.h"
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dcps_export.h"
00012 
00013 #ifdef DDS_HAS_MINIMUM_BIT
00014 #include "dds/DCPS/DataReaderImpl_T.h"
00015 #include "dds/DCPS/DataWriterImpl_T.h"
00016 #endif /* DDS_HAS_MINIMUM_BIT */
00017 
00018 #include "ace/Configuration.h"
00019 
00020 #include "dds/DCPS/PoolAllocator.h"
00021 
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 #pragma once
00024 #endif /* ACE_LACKS_PRAGMA_ONCE */
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class StaticDiscovery;
00030 
00031 typedef RcHandle<StaticDiscovery> StaticDiscovery_rch;
00032 
00033 class OpenDDS_Dcps_Export EndpointRegistry {
00034 public:
00035   struct Topic {
00036     OPENDDS_STRING name;
00037     OPENDDS_STRING type_name;
00038   };
00039   typedef OPENDDS_MAP(OPENDDS_STRING, Topic) TopicMapType;
00040   TopicMapType topic_map;
00041 
00042   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataReaderQos) DataReaderQosMapType;
00043   DataReaderQosMapType datareaderqos_map;
00044 
00045   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataWriterQos) DataWriterQosMapType;
00046   DataWriterQosMapType datawriterqos_map;
00047 
00048   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::SubscriberQos) SubscriberQosMapType;
00049   SubscriberQosMapType subscriberqos_map;
00050 
00051   typedef OPENDDS_MAP(OPENDDS_STRING, DDS::PublisherQos) PublisherQosMapType;
00052   PublisherQosMapType publisherqos_map;
00053 
00054   typedef OPENDDS_SET_CMP(RepoId, GUID_tKeyLessThan) RepoIdSetType;
00055   struct Reader {
00056     OPENDDS_STRING topic_name;
00057     DDS::DataReaderQos qos;
00058     DDS::SubscriberQos subscriber_qos;
00059     OPENDDS_STRING trans_cfg;
00060     TransportLocatorSeq trans_info;
00061     RepoIdSetType best_effort_writers;
00062     RepoIdSetType reliable_writers;
00063     Reader(const OPENDDS_STRING& tn,
00064            const DDS::DataReaderQos& q,
00065            const DDS::SubscriberQos& sq,
00066            const OPENDDS_STRING& transport_cfg,
00067            const TransportLocatorSeq& ti)
00068       : topic_name(tn)
00069       , qos(q)
00070       , subscriber_qos(sq)
00071       , trans_cfg(transport_cfg)
00072       , trans_info(ti)
00073     {}
00074   };
00075   typedef OPENDDS_MAP_CMP(RepoId, Reader, GUID_tKeyLessThan) ReaderMapType;
00076   ReaderMapType reader_map;
00077 
00078   struct Writer {
00079     OPENDDS_STRING topic_name;
00080     DDS::DataWriterQos qos;
00081     DDS::PublisherQos publisher_qos;
00082     OPENDDS_STRING trans_cfg;
00083     TransportLocatorSeq trans_info;
00084     RepoIdSetType best_effort_readers;
00085     RepoIdSetType reliable_readers;
00086     Writer(const OPENDDS_STRING& tn,
00087            const DDS::DataWriterQos& q,
00088            const DDS::PublisherQos& pq,
00089            const OPENDDS_STRING& transport_cfg,
00090            const TransportLocatorSeq& ti)
00091       : topic_name(tn)
00092       , qos(q)
00093       , publisher_qos(pq)
00094       , trans_cfg(transport_cfg)
00095       , trans_info(ti)
00096     {}
00097   };
00098   typedef OPENDDS_MAP_CMP(RepoId, Writer, GUID_tKeyLessThan) WriterMapType;
00099   WriterMapType writer_map;
00100 
00101   struct StaticDiscGuidDomainEqual {
00102 
00103     bool
00104     operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00105     {
00106       return std::memcmp(&lhs[2], &rhs[2], sizeof(DDS::DomainId_t)) == 0;
00107     }
00108   };
00109   struct StaticDiscGuidPartEqual {
00110 
00111     bool
00112     operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00113     {
00114       return std::memcmp(&lhs[6], &rhs[6], 6) == 0;
00115     }
00116   };
00117 
00118   void match();
00119 
00120   static EntityId_t build_id(const unsigned char* entity_key /* length of 3 */,
00121                              const unsigned char entity_kind);
00122 
00123   static RepoId build_id(DDS::DomainId_t domain,
00124                          const unsigned char* participant_id /* length of 6 */,
00125                          const EntityId_t& entity_id);
00126 };
00127 
00128 struct StaticDiscoveredParticipantData {};
00129 class StaticParticipant;
00130 
00131 class StaticEndpointManager
00132   : public EndpointManager<StaticDiscoveredParticipantData>
00133   , public DiscoveryListener {
00134 public:
00135   StaticEndpointManager(const RepoId& participant_id,
00136                         ACE_Thread_Mutex& lock,
00137                         const EndpointRegistry& registry,
00138                         StaticParticipant& participant);
00139 
00140   void init_bit();
00141 
00142   virtual void assign_publication_key(RepoId& rid,
00143                                       const RepoId& topicId,
00144                                       const DDS::DataWriterQos& qos);
00145   virtual void assign_subscription_key(RepoId& rid,
00146                                        const RepoId& topicId,
00147                                        const DDS::DataReaderQos& qos);
00148 
00149   virtual bool update_topic_qos(const RepoId& /*topicId*/,
00150                                 const DDS::TopicQos& /*qos*/,
00151                                 OPENDDS_STRING& /*name*/);
00152 
00153   virtual bool update_publication_qos(const RepoId& /*publicationId*/,
00154                                       const DDS::DataWriterQos& /*qos*/,
00155                                       const DDS::PublisherQos& /*publisherQos*/);
00156 
00157   virtual bool update_subscription_qos(const RepoId& /*subscriptionId*/,
00158                                        const DDS::DataReaderQos& /*qos*/,
00159                                        const DDS::SubscriberQos& /*subscriberQos*/);
00160 
00161   virtual bool update_subscription_params(const RepoId& /*subId*/,
00162                                           const DDS::StringSeq& /*params*/);
00163 
00164   virtual void association_complete(const RepoId& /*localId*/,
00165                                     const RepoId& /*remoteId*/);
00166 
00167   virtual bool disassociate(const StaticDiscoveredParticipantData& /*pdata*/);
00168 
00169   virtual DDS::ReturnCode_t add_publication_i(const RepoId& /*rid*/,
00170                                               LocalPublication& /*pub*/);
00171 
00172   virtual DDS::ReturnCode_t remove_publication_i(const RepoId& /*publicationId*/);
00173 
00174   virtual DDS::ReturnCode_t add_subscription_i(const RepoId& /*rid*/,
00175                                                LocalSubscription& /*pub*/);
00176 
00177   virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& /*subscriptionId*/);
00178 
00179   virtual bool shutting_down() const;
00180 
00181   virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00182                                                    DiscoveredSubscriptionIter& /*iter*/,
00183                                                    const RepoId& /*reader*/);
00184 
00185   virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
00186                                                    DiscoveredPublicationIter& /*iter*/,
00187                                                    const RepoId& /*reader*/);
00188 
00189   virtual bool defer_writer(const RepoId& /*writer*/,
00190                             const RepoId& /*writer_participant*/);
00191 
00192   virtual bool defer_reader(const RepoId& /*writer*/,
00193                             const RepoId& /*writer_participant*/);
00194 
00195   virtual void reader_exists(const RepoId& readerid, const RepoId& writerid);
00196   virtual void reader_does_not_exist(const RepoId& readerid, const RepoId& writerid);
00197   virtual void writer_exists(const RepoId& writerid, const RepoId& readerid);
00198   virtual void writer_does_not_exist(const RepoId& writerid, const RepoId& readerid);
00199 #ifndef DDS_HAS_MINIMUM_BIT
00200   DDS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00201   DDS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00202 #endif /* DDS_HAS_MINIMUM_BIT */
00203 
00204 
00205 private:
00206   const EndpointRegistry& registry_;
00207   StaticParticipant& participant_;
00208 };
00209 
00210 class StaticParticipant : public LocalParticipant<StaticEndpointManager> {
00211 public:
00212   StaticParticipant(RepoId& guid,
00213                     const DDS::DomainParticipantQos& qos,
00214                     const EndpointRegistry& registry)
00215     : LocalParticipant<StaticEndpointManager>(qos)
00216     , endpoint_manager_(guid, lock_, registry, *this)
00217   {}
00218 
00219   void init_bit(const DDS::Subscriber_var& bit_subscriber)
00220   {
00221     bit_subscriber_ = bit_subscriber;
00222     endpoint_manager_.init_bit();
00223   }
00224 
00225   void fini_bit()
00226   {
00227     bit_subscriber_ = 0;
00228   }
00229 
00230 private:
00231   virtual StaticEndpointManager& endpoint_manager() { return endpoint_manager_; }
00232 
00233   StaticEndpointManager endpoint_manager_;
00234 };
00235 
00236 class OpenDDS_Dcps_Export StaticDiscovery
00237   : public PeerDiscovery<StaticParticipant> {
00238 public:
00239   explicit StaticDiscovery(const RepoKey& key);
00240 
00241   int load_configuration(ACE_Configuration_Heap& config);
00242 
00243   virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain,
00244                                                  const DDS::DomainParticipantQos& qos);
00245 
00246   EndpointRegistry registry;
00247 
00248   static StaticDiscovery_rch instance() { return instance_; }
00249 
00250 private:
00251   int parse_topics(ACE_Configuration_Heap& cf);
00252   int parse_datawriterqos(ACE_Configuration_Heap& cf);
00253   int parse_datareaderqos(ACE_Configuration_Heap& cf);
00254   int parse_publisherqos(ACE_Configuration_Heap& cf);
00255   int parse_subscriberqos(ACE_Configuration_Heap& cf);
00256   int parse_endpoints(ACE_Configuration_Heap& cf);
00257 
00258   void pre_writer(DataWriterImpl* writer);
00259   void pre_reader(DataReaderImpl* reader);
00260 
00261   static StaticDiscovery_rch instance_;
00262 };
00263 
00264 }
00265 }
00266 
00267 #endif /* OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H */

Generated on Fri Feb 12 20:05:27 2016 for OpenDDS by  doxygen 1.4.7