00001
00002
00003
00004
00005
00006 #ifndef OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00007 #define OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H
00008
00009 #include "dcps_export.h"
00010
00011 #include "dds/DCPS/WaitSet.h"
00012 #include "dds/DCPS/DiscoveryBase.h"
00013
00014 #ifdef DDS_HAS_MINIMUM_BIT
00015 #include "dds/DCPS/DataReaderImpl_T.h"
00016 #include "dds/DCPS/DataWriterImpl_T.h"
00017 #endif
00018
00019 #include "ace/Configuration.h"
00020
00021 #include "dds/DCPS/PoolAllocator.h"
00022
00023 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00024 #pragma once
00025 #endif
00026
00027 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00028
00029 namespace OpenDDS {
00030 namespace DCPS {
00031
00032 class StaticDiscovery;
00033
00034 typedef RcHandle<StaticDiscovery> StaticDiscovery_rch;
00035
00036 class OpenDDS_Dcps_Export EndpointRegistry {
00037 public:
00038 struct Topic {
00039 OPENDDS_STRING name;
00040 OPENDDS_STRING type_name;
00041 };
00042 typedef OPENDDS_MAP(OPENDDS_STRING, Topic) TopicMapType;
00043 TopicMapType topic_map;
00044
00045 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataReaderQos) DataReaderQosMapType;
00046 DataReaderQosMapType datareaderqos_map;
00047
00048 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataWriterQos) DataWriterQosMapType;
00049 DataWriterQosMapType datawriterqos_map;
00050
00051 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::SubscriberQos) SubscriberQosMapType;
00052 SubscriberQosMapType subscriberqos_map;
00053
00054 typedef OPENDDS_MAP(OPENDDS_STRING, DDS::PublisherQos) PublisherQosMapType;
00055 PublisherQosMapType publisherqos_map;
00056
00057 typedef OPENDDS_SET_CMP(RepoId, GUID_tKeyLessThan) RepoIdSetType;
00058 struct Reader {
00059 OPENDDS_STRING topic_name;
00060 DDS::DataReaderQos qos;
00061 DDS::SubscriberQos subscriber_qos;
00062 OPENDDS_STRING trans_cfg;
00063 TransportLocatorSeq trans_info;
00064 RepoIdSetType best_effort_writers;
00065 RepoIdSetType reliable_writers;
00066 Reader(const OPENDDS_STRING& tn,
00067 const DDS::DataReaderQos& q,
00068 const DDS::SubscriberQos& sq,
00069 const OPENDDS_STRING& transport_cfg,
00070 const TransportLocatorSeq& ti)
00071 : topic_name(tn)
00072 , qos(q)
00073 , subscriber_qos(sq)
00074 , trans_cfg(transport_cfg)
00075 , trans_info(ti)
00076 {}
00077 };
00078 typedef OPENDDS_MAP_CMP(RepoId, Reader, GUID_tKeyLessThan) ReaderMapType;
00079 ReaderMapType reader_map;
00080
00081 struct Writer {
00082 OPENDDS_STRING topic_name;
00083 DDS::DataWriterQos qos;
00084 DDS::PublisherQos publisher_qos;
00085 OPENDDS_STRING trans_cfg;
00086 TransportLocatorSeq trans_info;
00087 RepoIdSetType best_effort_readers;
00088 RepoIdSetType reliable_readers;
00089 Writer(const OPENDDS_STRING& tn,
00090 const DDS::DataWriterQos& q,
00091 const DDS::PublisherQos& pq,
00092 const OPENDDS_STRING& transport_cfg,
00093 const TransportLocatorSeq& ti)
00094 : topic_name(tn)
00095 , qos(q)
00096 , publisher_qos(pq)
00097 , trans_cfg(transport_cfg)
00098 , trans_info(ti)
00099 {}
00100 };
00101 typedef OPENDDS_MAP_CMP(RepoId, Writer, GUID_tKeyLessThan) WriterMapType;
00102 WriterMapType writer_map;
00103
00104 struct StaticDiscGuidDomainEqual {
00105
00106 bool
00107 operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00108 {
00109 return std::memcmp(&lhs[2], &rhs[2], sizeof(DDS::DomainId_t)) == 0;
00110 }
00111 };
00112 struct StaticDiscGuidPartEqual {
00113
00114 bool
00115 operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
00116 {
00117 return std::memcmp(&lhs[6], &rhs[6], 6) == 0;
00118 }
00119 };
00120
00121 void match();
00122
00123 static EntityId_t build_id(const unsigned char* entity_key ,
00124 const unsigned char entity_kind);
00125
00126 static RepoId build_id(DDS::DomainId_t domain,
00127 const unsigned char* participant_id ,
00128 const EntityId_t& entity_id);
00129 };
00130
00131 struct StaticDiscoveredParticipantData {};
00132 class StaticParticipant;
00133
00134 class StaticEndpointManager
00135 : public EndpointManager<StaticDiscoveredParticipantData>
00136 , public DiscoveryListener {
00137 public:
00138 StaticEndpointManager(const RepoId& participant_id,
00139 ACE_Thread_Mutex& lock,
00140 const EndpointRegistry& registry,
00141 StaticParticipant& participant);
00142
00143 void init_bit();
00144
00145 virtual void assign_publication_key(RepoId& rid,
00146 const RepoId& topicId,
00147 const DDS::DataWriterQos& qos);
00148 virtual void assign_subscription_key(RepoId& rid,
00149 const RepoId& topicId,
00150 const DDS::DataReaderQos& qos);
00151
00152 virtual bool update_topic_qos(const RepoId& ,
00153 const DDS::TopicQos& ,
00154 OPENDDS_STRING& );
00155
00156 virtual bool update_publication_qos(const RepoId& ,
00157 const DDS::DataWriterQos& ,
00158 const DDS::PublisherQos& );
00159
00160 virtual bool update_subscription_qos(const RepoId& ,
00161 const DDS::DataReaderQos& ,
00162 const DDS::SubscriberQos& );
00163
00164 virtual bool update_subscription_params(const RepoId& ,
00165 const DDS::StringSeq& );
00166
00167 virtual void association_complete(const RepoId& ,
00168 const RepoId& );
00169
00170 virtual bool disassociate(const StaticDiscoveredParticipantData& );
00171
00172 virtual DDS::ReturnCode_t add_publication_i(const RepoId& ,
00173 LocalPublication& );
00174
00175 virtual DDS::ReturnCode_t remove_publication_i(const RepoId& );
00176
00177 virtual DDS::ReturnCode_t add_subscription_i(const RepoId& ,
00178 LocalSubscription& );
00179
00180 virtual DDS::ReturnCode_t remove_subscription_i(const RepoId& );
00181
00182 virtual bool shutting_down() const;
00183
00184 virtual void populate_transport_locator_sequence(TransportLocatorSeq*& ,
00185 DiscoveredSubscriptionIter& ,
00186 const RepoId& );
00187
00188 virtual void populate_transport_locator_sequence(TransportLocatorSeq*& ,
00189 DiscoveredPublicationIter& ,
00190 const RepoId& );
00191
00192 virtual bool defer_writer(const RepoId& ,
00193 const RepoId& );
00194
00195 virtual bool defer_reader(const RepoId& ,
00196 const RepoId& );
00197
00198 virtual void reader_exists(const RepoId& readerid, const RepoId& writerid);
00199 virtual void reader_does_not_exist(const RepoId& readerid, const RepoId& writerid);
00200 virtual void writer_exists(const RepoId& writerid, const RepoId& readerid);
00201 virtual void writer_does_not_exist(const RepoId& writerid, const RepoId& readerid);
00202 #ifndef DDS_HAS_MINIMUM_BIT
00203 OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
00204 OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
00205 #endif
00206
00207
00208 private:
00209 const EndpointRegistry& registry_;
00210 StaticParticipant& participant_;
00211 };
00212
00213 class StaticParticipant : public LocalParticipant<StaticEndpointManager> {
00214 public:
00215 StaticParticipant(RepoId& guid,
00216 const DDS::DomainParticipantQos& qos,
00217 const EndpointRegistry& registry)
00218 : LocalParticipant<StaticEndpointManager>(qos)
00219 , endpoint_manager_(guid, lock_, registry, *this)
00220 {}
00221
00222 void init_bit(const DDS::Subscriber_var& bit_subscriber)
00223 {
00224 bit_subscriber_ = bit_subscriber;
00225 endpoint_manager_.init_bit();
00226 }
00227
00228 void fini_bit()
00229 {
00230 bit_subscriber_ = 0;
00231 }
00232
00233 private:
00234 virtual StaticEndpointManager& endpoint_manager() { return endpoint_manager_; }
00235
00236 StaticEndpointManager endpoint_manager_;
00237 };
00238
00239 class OpenDDS_Dcps_Export StaticDiscovery
00240 : public PeerDiscovery<StaticParticipant> {
00241 public:
00242 explicit StaticDiscovery(const RepoKey& key);
00243
00244 int load_configuration(ACE_Configuration_Heap& config);
00245
00246 virtual OpenDDS::DCPS::RepoId generate_participant_guid();
00247
00248 virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain,
00249 const DDS::DomainParticipantQos& qos);
00250
00251 #if defined(OPENDDS_SECURITY)
00252 virtual OpenDDS::DCPS::AddDomainStatus add_domain_participant_secure(
00253 DDS::DomainId_t domain,
00254 const DDS::DomainParticipantQos& qos,
00255 const OpenDDS::DCPS::RepoId& guid,
00256 DDS::Security::IdentityHandle id,
00257 DDS::Security::PermissionsHandle perm,
00258 DDS::Security::ParticipantCryptoHandle part_crypto);
00259 #endif
00260
00261 EndpointRegistry registry;
00262
00263 static StaticDiscovery_rch instance() { return instance_; }
00264
00265 private:
00266 int parse_topics(ACE_Configuration_Heap& cf);
00267 int parse_datawriterqos(ACE_Configuration_Heap& cf);
00268 int parse_datareaderqos(ACE_Configuration_Heap& cf);
00269 int parse_publisherqos(ACE_Configuration_Heap& cf);
00270 int parse_subscriberqos(ACE_Configuration_Heap& cf);
00271 int parse_endpoints(ACE_Configuration_Heap& cf);
00272
00273 void pre_writer(DataWriterImpl* writer);
00274 void pre_reader(DataReaderImpl* reader);
00275
00276 static StaticDiscovery_rch instance_;
00277 };
00278
00279 }
00280 }
00281
00282 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00283
00284 #endif