00001
00002
00003
00004
00005
00006 #ifndef OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00007 #define OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00008
00009 #include "dds/DCPS/DiscoveryBase.h"
00010 #include "dds/DCPS/Service_Participant.h"
00011 #include "dcps_export.h"
00012
00013 #ifdef DDS_HAS_MINIMUM_BIT
00014 #include "dds/DCPS/DataReaderImpl_T.h"
00015 #include "dds/DCPS/DataWriterImpl_T.h"
00016 #endif
00017
00018 #include "ace/Configuration.h"
00019
00020 #include "dds/DCPS/PoolAllocator.h"
00021
00022 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00023 #pragma once
00024 #endif
00025
00026 namespace OpenDDS {
00027 namespace DCPS {
00028
00029 class StaticDiscovery;
00030
00031 typedef RcHandle<StaticDiscovery> StaticDiscovery_rch;
00032
00033 class OpenDDS_Dcps_Export EndpointRegistry {
00034 public:
00035 struct Topic {
00036 OPENDDS_STRING name;
00037 OPENDDS_STRING type_name;
00038 };
00039 typedef OPENDDS_MAP(OPENDDS_STRING, Topic) TopicMapType;
00040 TopicMapType topic_map;
00041
00042 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataReaderQos) DataReaderQosMapType;
00043 DataReaderQosMapType datareaderqos_map;
00044
00045 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataWriterQos) DataWriterQosMapType;
00046 DataWriterQosMapType datawriterqos_map;
00047
00048 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::SubscriberQos) SubscriberQosMapType;
00049 SubscriberQosMapType subscriberqos_map;
00050
00051 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::PublisherQos) PublisherQosMapType;
00052 PublisherQosMapType publisherqos_map;
00053
00054 typedef OPENDDS_SET_CMP(RepoId, GUID_tKeyLessThan) RepoIdSetType;
00055 struct Reader {
00056 OPENDDS_STRING topic_name;
00057 DDS::DataReaderQos qos;
00058 DDS::SubscriberQos subscriber_qos;
00059 OPENDDS_STRING trans_cfg;
00060 TransportLocatorSeq trans_info;
00061 RepoIdSetType best_effort_writers;
00062 RepoIdSetType reliable_writers;
00063 Reader(const OPENDDS_STRING& tn,
00064 const DDS::DataReaderQos& q,
00065 const DDS::SubscriberQos& sq,
00066 const OPENDDS_STRING& transport_cfg,
00067 const TransportLocatorSeq& ti)
00068 : topic_name(tn)
00069 , qos(q)
00070 , subscriber_qos(sq)
00071 , trans_cfg(transport_cfg)
00072 , trans_info(ti)
00073 {}
00074 };
00075 typedef OPENDDS_MAP_CMP(RepoId, Reader, GUID_tKeyLessThan) ReaderMapType;
00076 ReaderMapType reader_map;
00077
00078 struct Writer {
00079 OPENDDS_STRING topic_name;
00080 DDS::DataWriterQos qos;
00081 DDS::PublisherQos publisher_qos;
00082 OPENDDS_STRING trans_cfg;
00083 TransportLocatorSeq trans_info;
00084 RepoIdSetType best_effort_readers;
00085 RepoIdSetType reliable_readers;
00086 Writer(const OPENDDS_STRING& tn,
00087 const DDS::DataWriterQos& q,
00088 const DDS::PublisherQos& pq,
00089 const OPENDDS_STRING& transport_cfg,
00090 const TransportLocatorSeq& ti)
00091 : topic_name(tn)
00092 , qos(q)
00093 , publisher_qos(pq)
00094 , trans_cfg(transport_cfg)
00095 , trans_info(ti)
00096 {}
00097 };
00098 typedef OPENDDS_MAP_CMP(RepoId, Writer, GUID_tKeyLessThan) WriterMapType;
00099 WriterMapType writer_map;
00100
00101 struct StaticDiscGuidDomainEqual {
00102
00103 bool
00104 operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00105 {
00106 return std::memcmp(&lhs[2], &rhs[2], sizeof(DDS::DomainId_t)) == 0;
00107 }
00108 };
00109 struct StaticDiscGuidPartEqual {
00110
00111 bool
00112 operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00113 {
00114 return std::memcmp(&lhs[6], &rhs[6], 6) == 0;
00115 }
00116 };
00117
00118 void match();
00119
00120 static EntityId_t build_id(const unsigned char* entity_key ,
00121 const unsigned char entity_kind);
00122
00123 static RepoId build_id(DDS::DomainId_t domain,
00124 const unsigned char* participant_id ,
00125 const EntityId_t& entity_id);
00126 };
00127
00128 struct StaticDiscoveredParticipantData {};
00129 class StaticParticipant;
00130
00131 class StaticEndpointManager
00132 : public EndpointManager<StaticDiscoveredParticipantData>
00133 , public DiscoveryListener {
00134 public:
00135 StaticEndpointManager(const RepoId& participant_id,
00136 ACE_Thread_Mutex& lock,
00137 const EndpointRegistry& registry,
00138 StaticParticipant& participant);
00139
00140 void init_bit();
00141
00142 virtual void assign_publication_key(RepoId& rid,
00143 const RepoId& topicId,
00144 const DDS::DataWriterQos& qos);
00145 virtual void assign_subscription_key(RepoId& rid,
00146 const RepoId& topicId,
00147 const DDS::DataReaderQos& qos);
00148
00149 virtual bool update_topic_qos(const RepoId& ,
00150 const DDS::TopicQos& ,
00151 OPENDDS_STRING& );
00152
00153 virtual bool update_publication_qos(const RepoId& ,
00154 const DDS::DataWriterQos& ,
00155 const DDS::PublisherQos& );
00156
00157 virtual bool update_subscription_qos(const RepoId& ,
00158 const DDS::DataReaderQos& ,
00159 const DDS::SubscriberQos& );
00160
00161 virtual bool update_subscription_params(const RepoId& ,
00162 const DDS::StringSeq& );
00163
00164 virtual void association_complete(const RepoId& ,
00165 const RepoId& );
00166
00167 virtual bool disassociate(const StaticDiscoveredParticipantData& );
00168
00169 virtual DDS::ReturnCode_t add_publication_i(const RepoId& ,
00170 LocalPublication& );
00171
00172 virtual DDS::ReturnCode_t remove_publication_i(const RepoId& );
00173
00174 virtual DDS::ReturnCode_t add_subscription_i(const RepoId& ,
00175 LocalSubscription& );
00176
00177 virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& );
00178
00179 virtual bool shutting_down() const;
00180
00181 virtual void populate_transport_locator_sequence(TransportLocatorSeq*& ,
00182 DiscoveredSubscriptionIter& ,
00183 const RepoId& );
00184
00185 virtual void populate_transport_locator_sequence(TransportLocatorSeq*& ,
00186 DiscoveredPublicationIter& ,
00187 const RepoId& );
00188
00189 virtual bool defer_writer(const RepoId& ,
00190 const RepoId& );
00191
00192 virtual bool defer_reader(const RepoId& ,
00193 const RepoId& );
00194
00195 virtual void reader_exists(const RepoId& readerid, const RepoId& writerid);
00196 virtual void reader_does_not_exist(const RepoId& readerid, const RepoId& writerid);
00197 virtual void writer_exists(const RepoId& writerid, const RepoId& readerid);
00198 virtual void writer_does_not_exist(const RepoId& writerid, const RepoId& readerid);
00199 #ifndef DDS_HAS_MINIMUM_BIT
00200 DDS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00201 DDS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00202 #endif
00203
00204
00205 private:
00206 const EndpointRegistry& registry_;
00207 StaticParticipant& participant_;
00208 };
00209
00210 class StaticParticipant : public LocalParticipant<StaticEndpointManager> {
00211 public:
00212 StaticParticipant(RepoId& guid,
00213 const DDS::DomainParticipantQos& qos,
00214 const EndpointRegistry& registry)
00215 : LocalParticipant<StaticEndpointManager>(qos)
00216 , endpoint_manager_(guid, lock_, registry, *this)
00217 {}
00218
00219 void init_bit(const DDS::Subscriber_var& bit_subscriber)
00220 {
00221 bit_subscriber_ = bit_subscriber;
00222 endpoint_manager_.init_bit();
00223 }
00224
00225 void fini_bit()
00226 {
00227 bit_subscriber_ = 0;
00228 }
00229
00230 private:
00231 virtual StaticEndpointManager& endpoint_manager() { return endpoint_manager_; }
00232
00233 StaticEndpointManager endpoint_manager_;
00234 };
00235
00236 class OpenDDS_Dcps_Export StaticDiscovery
00237 : public PeerDiscovery<StaticParticipant> {
00238 public:
00239 explicit StaticDiscovery(const RepoKey& key);
00240
00241 int load_configuration(ACE_Configuration_Heap& config);
00242
00243 virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain,
00244 const DDS::DomainParticipantQos& qos);
00245
00246 EndpointRegistry registry;
00247
00248 static StaticDiscovery_rch instance() { return instance_; }
00249
00250 private:
00251 int parse_topics(ACE_Configuration_Heap& cf);
00252 int parse_datawriterqos(ACE_Configuration_Heap& cf);
00253 int parse_datareaderqos(ACE_Configuration_Heap& cf);
00254 int parse_publisherqos(ACE_Configuration_Heap& cf);
00255 int parse_subscriberqos(ACE_Configuration_Heap& cf);
00256 int parse_endpoints(ACE_Configuration_Heap& cf);
00257
00258 void pre_writer(DataWriterImpl* writer);
00259 void pre_reader(DataReaderImpl* reader);
00260
00261 static StaticDiscovery_rch instance_;
00262 };
00263
00264 }
00265 }
00266
00267 #endif