Line data Source code
1 : /*
2 : * Distributed under the OpenDDS License.
3 : * See: http://www.opendds.org/license.html
4 : */
5 :
6 : #ifndef OPENDDS_DCPS_STATICDISCOVERY_H
7 : #define OPENDDS_DCPS_STATICDISCOVERY_H
8 :
9 : #include "WaitSet.h"
10 : #include "PoolAllocator.h"
11 : #include "TopicDetails.h"
12 : #include "SporadicTask.h"
13 : #include "GuidUtils.h"
14 : #include "Marked_Default_Qos.h"
15 : #include "DCPS_Utils.h"
16 : #include "BuiltInTopicDataReaderImpls.h"
17 : #include "dcps_export.h"
18 :
19 : #include <ace/Configuration.h>
20 :
21 : #ifndef ACE_LACKS_PRAGMA_ONCE
22 : # pragma once
23 : #endif
24 :
25 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
26 :
27 : namespace OpenDDS {
28 : namespace DCPS {
29 :
30 : class StaticDiscovery;
31 :
32 : typedef RcHandle<StaticDiscovery> StaticDiscovery_rch;
33 :
34 : class OpenDDS_Dcps_Export EndpointRegistry {
35 : public:
36 : struct Topic {
37 : OPENDDS_STRING name;
38 : OPENDDS_STRING type_name;
39 : };
40 : typedef OPENDDS_MAP(OPENDDS_STRING, Topic) TopicMapType;
41 : TopicMapType topic_map;
42 :
43 : typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataReaderQos) DataReaderQosMapType;
44 : DataReaderQosMapType datareaderqos_map;
45 :
46 : typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataWriterQos) DataWriterQosMapType;
47 : DataWriterQosMapType datawriterqos_map;
48 :
49 : typedef OPENDDS_MAP(OPENDDS_STRING, DDS::SubscriberQos) SubscriberQosMapType;
50 : SubscriberQosMapType subscriberqos_map;
51 :
52 : typedef OPENDDS_MAP(OPENDDS_STRING, DDS::PublisherQos) PublisherQosMapType;
53 : PublisherQosMapType publisherqos_map;
54 :
55 : typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) RepoIdSetType;
56 : struct Reader {
57 : OPENDDS_STRING topic_name;
58 : DDS::DataReaderQos qos;
59 : DDS::SubscriberQos subscriber_qos;
60 : OPENDDS_STRING trans_cfg;
61 : TransportLocatorSeq trans_info;
62 : RepoIdSetType best_effort_writers;
63 : RepoIdSetType reliable_writers;
64 0 : Reader(const OPENDDS_STRING& tn,
65 : const DDS::DataReaderQos& q,
66 : const DDS::SubscriberQos& sq,
67 : const OPENDDS_STRING& transport_cfg,
68 : const TransportLocatorSeq& ti)
69 0 : : topic_name(tn)
70 0 : , qos(q)
71 0 : , subscriber_qos(sq)
72 0 : , trans_cfg(transport_cfg)
73 0 : , trans_info(ti)
74 0 : {}
75 : };
76 : typedef OPENDDS_MAP_CMP(GUID_t, Reader, GUID_tKeyLessThan) ReaderMapType;
77 : ReaderMapType reader_map;
78 :
79 : struct Writer {
80 : OPENDDS_STRING topic_name;
81 : DDS::DataWriterQos qos;
82 : DDS::PublisherQos publisher_qos;
83 : OPENDDS_STRING trans_cfg;
84 : TransportLocatorSeq trans_info;
85 : RepoIdSetType best_effort_readers;
86 : RepoIdSetType reliable_readers;
87 0 : Writer(const OPENDDS_STRING& tn,
88 : const DDS::DataWriterQos& q,
89 : const DDS::PublisherQos& pq,
90 : const OPENDDS_STRING& transport_cfg,
91 : const TransportLocatorSeq& ti)
92 0 : : topic_name(tn)
93 0 : , qos(q)
94 0 : , publisher_qos(pq)
95 0 : , trans_cfg(transport_cfg)
96 0 : , trans_info(ti)
97 0 : {}
98 : };
99 : typedef OPENDDS_MAP_CMP(GUID_t, Writer, GUID_tKeyLessThan) WriterMapType;
100 : WriterMapType writer_map;
101 :
102 : struct StaticDiscGuidDomainEqual {
103 :
104 : bool
105 0 : operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
106 : {
107 0 : return std::memcmp(&lhs[2], &rhs[2], sizeof(DDS::DomainId_t)) == 0;
108 : }
109 : };
110 : struct StaticDiscGuidPartEqual {
111 :
112 : bool
113 0 : operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
114 : {
115 0 : return std::memcmp(&lhs[6], &rhs[6], 6) == 0;
116 : }
117 : };
118 :
119 : void match();
120 :
121 : static EntityId_t build_id(const unsigned char* entity_key /* length of 3 */,
122 : const unsigned char entity_kind);
123 :
124 : static GUID_t build_id(DDS::DomainId_t domain,
125 : const unsigned char* participant_id /* length of 6 */,
126 : const EntityId_t& entity_id);
127 : };
128 :
129 : class StaticParticipant;
130 : class StaticEndpointManager
131 : : public virtual RcEventHandler
132 : , public DiscoveryListener
133 : {
134 : protected:
135 : struct DiscoveredSubscription : PoolAllocationBase {
136 : DiscoveredSubscription()
137 : : bit_ih_(DDS::HANDLE_NIL)
138 : , participant_discovered_at_(monotonic_time_zero())
139 : , transport_context_(0)
140 : {
141 : }
142 :
143 : explicit DiscoveredSubscription(const DiscoveredReaderData& r)
144 : : reader_data_(r)
145 : , bit_ih_(DDS::HANDLE_NIL)
146 : , participant_discovered_at_(monotonic_time_zero())
147 : , transport_context_(0)
148 : {
149 : }
150 :
151 : RepoIdSet matched_endpoints_;
152 : DiscoveredReaderData reader_data_;
153 : DDS::InstanceHandle_t bit_ih_;
154 : MonotonicTime_t participant_discovered_at_;
155 : ACE_CDR::ULong transport_context_;
156 : XTypes::TypeInformation type_info_;
157 :
158 0 : const char* get_topic_name() const
159 : {
160 0 : return reader_data_.ddsSubscriptionData.topic_name;
161 : }
162 :
163 0 : const char* get_type_name() const
164 : {
165 0 : return reader_data_.ddsSubscriptionData.type_name;
166 : }
167 : };
168 :
169 : typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredSubscription,
170 : GUID_tKeyLessThan) DiscoveredSubscriptionMap;
171 :
172 : typedef DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
173 :
174 : struct DiscoveredPublication : PoolAllocationBase {
175 : DiscoveredPublication()
176 : : bit_ih_(DDS::HANDLE_NIL)
177 : , participant_discovered_at_(monotonic_time_zero())
178 : , transport_context_(0)
179 : {
180 : }
181 :
182 : explicit DiscoveredPublication(const DiscoveredWriterData& w)
183 : : writer_data_(w)
184 : , bit_ih_(DDS::HANDLE_NIL)
185 : , participant_discovered_at_(monotonic_time_zero())
186 : , transport_context_(0)
187 : {
188 : }
189 :
190 : RepoIdSet matched_endpoints_;
191 : DiscoveredWriterData writer_data_;
192 : DDS::InstanceHandle_t bit_ih_;
193 : MonotonicTime_t participant_discovered_at_;
194 : ACE_CDR::ULong transport_context_;
195 : XTypes::TypeInformation type_info_;
196 :
197 0 : const char* get_topic_name() const
198 : {
199 0 : return writer_data_.ddsPublicationData.topic_name;
200 : }
201 :
202 0 : const char* get_type_name() const
203 : {
204 0 : return writer_data_.ddsPublicationData.type_name;
205 : }
206 : };
207 :
208 : typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredPublication,
209 : GUID_tKeyLessThan) DiscoveredPublicationMap;
210 : typedef DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
211 :
212 : struct LocalEndpoint {
213 0 : LocalEndpoint()
214 0 : : topic_id_(GUID_UNKNOWN)
215 0 : , participant_discovered_at_(monotonic_time_zero())
216 0 : , transport_context_(0)
217 0 : , sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
218 : {
219 0 : }
220 :
221 : GUID_t topic_id_;
222 : TransportLocatorSeq trans_info_;
223 : MonotonicTime_t participant_discovered_at_;
224 : ACE_CDR::ULong transport_context_;
225 : RepoIdSet matched_endpoints_;
226 : SequenceNumber sequence_;
227 : RepoIdSet remote_expectant_opendds_associations_;
228 : XTypes::TypeInformation type_info_;
229 : };
230 :
231 : struct LocalPublication : LocalEndpoint {
232 : DataWriterCallbacks_wrch publication_;
233 : DDS::DataWriterQos qos_;
234 : DDS::PublisherQos publisher_qos_;
235 : };
236 :
237 : struct LocalSubscription : LocalEndpoint {
238 : DataReaderCallbacks_wrch subscription_;
239 : DDS::DataReaderQos qos_;
240 : DDS::SubscriberQos subscriber_qos_;
241 : ContentFilterProperty_t filterProperties;
242 : };
243 :
244 : typedef OPENDDS_MAP_CMP(GUID_t, LocalPublication,
245 : GUID_tKeyLessThan) LocalPublicationMap;
246 : typedef LocalPublicationMap::iterator LocalPublicationIter;
247 : typedef LocalPublicationMap::const_iterator LocalPublicationCIter;
248 :
249 : typedef OPENDDS_MAP_CMP(GUID_t, LocalSubscription,
250 : GUID_tKeyLessThan) LocalSubscriptionMap;
251 : typedef LocalSubscriptionMap::iterator LocalSubscriptionIter;
252 : typedef LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
253 :
254 : typedef OPENDDS_MAP_CMP(GUID_t, String, GUID_tKeyLessThan) TopicNameMap;
255 :
256 : static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication& pub)
257 : {
258 : return pub.writer_data_.ddsPublicationData.key;
259 : }
260 : static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription& sub)
261 : {
262 : return sub.reader_data_.ddsSubscriptionData.key;
263 : }
264 :
265 0 : virtual void remove_from_bit_i(const DiscoveredPublication& /*pub*/) { }
266 0 : virtual void remove_from_bit_i(const DiscoveredSubscription& /*sub*/) { }
267 :
268 0 : virtual DDS::ReturnCode_t write_publication_data(const GUID_t& /*rid*/,
269 : LocalPublication& /*pub*/,
270 : const GUID_t& reader = GUID_UNKNOWN)
271 : {
272 : ACE_UNUSED_ARG(reader);
273 0 : return DDS::RETCODE_OK;
274 : }
275 :
276 0 : virtual DDS::ReturnCode_t write_subscription_data(const GUID_t& /*rid*/,
277 : LocalSubscription& /*pub*/,
278 : const GUID_t& reader = GUID_UNKNOWN)
279 : {
280 : ACE_UNUSED_ARG(reader);
281 0 : return DDS::RETCODE_OK;
282 : }
283 :
284 0 : virtual bool send_type_lookup_request(const XTypes::TypeIdentifierSeq& /*type_ids*/,
285 : const GUID_t& /*endpoint*/,
286 : bool /*is_discovery_protected*/,
287 : bool /*send_get_types*/)
288 : {
289 0 : return true;
290 : }
291 :
292 : public:
293 : StaticEndpointManager(const GUID_t& participant_id,
294 : ACE_Thread_Mutex& lock,
295 : const EndpointRegistry& registry,
296 : StaticParticipant& participant);
297 :
298 : ~StaticEndpointManager();
299 :
300 : void init_bit();
301 :
302 : virtual void assign_publication_key(GUID_t& rid,
303 : const GUID_t& topicId,
304 : const DDS::DataWriterQos& qos);
305 : virtual void assign_subscription_key(GUID_t& rid,
306 : const GUID_t& topicId,
307 : const DDS::DataReaderQos& qos);
308 :
309 : virtual bool update_topic_qos(const GUID_t& /*topicId*/,
310 : const DDS::TopicQos& /*qos*/);
311 :
312 : virtual bool update_publication_qos(const GUID_t& /*publicationId*/,
313 : const DDS::DataWriterQos& /*qos*/,
314 : const DDS::PublisherQos& /*publisherQos*/);
315 :
316 : virtual bool update_subscription_qos(const GUID_t& /*subscriptionId*/,
317 : const DDS::DataReaderQos& /*qos*/,
318 : const DDS::SubscriberQos& /*subscriberQos*/);
319 :
320 : virtual bool update_subscription_params(const GUID_t& /*subId*/,
321 : const DDS::StringSeq& /*params*/);
322 :
323 : virtual bool disassociate();
324 :
325 : virtual DDS::ReturnCode_t add_publication_i(const GUID_t& /*rid*/,
326 : LocalPublication& /*pub*/);
327 :
328 : virtual DDS::ReturnCode_t remove_publication_i(const GUID_t& /*publicationId*/,
329 : LocalPublication& /*pub*/);
330 :
331 : virtual DDS::ReturnCode_t add_subscription_i(const GUID_t& /*rid*/,
332 : LocalSubscription& /*pub*/);
333 :
334 : virtual DDS::ReturnCode_t remove_subscription_i(const GUID_t& /*subscriptionId*/,
335 : LocalSubscription& /*sub*/);
336 :
337 : virtual bool is_expectant_opendds(const GUID_t& endpoint) const;
338 :
339 : virtual bool shutting_down() const;
340 :
341 : virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
342 : DiscoveredSubscriptionIter& /*iter*/,
343 : const GUID_t& /*reader*/);
344 :
345 : virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
346 : DiscoveredPublicationIter& /*iter*/,
347 : const GUID_t& /*reader*/);
348 :
349 : virtual void reader_exists(const GUID_t& readerid, const GUID_t& writerid);
350 : virtual void reader_does_not_exist(const GUID_t& readerid, const GUID_t& writerid);
351 : virtual void writer_exists(const GUID_t& writerid, const GUID_t& readerid);
352 : virtual void writer_does_not_exist(const GUID_t& writerid, const GUID_t& readerid);
353 : void cleanup_type_lookup_data(const GuidPrefix_t& prefix,
354 : const XTypes::TypeIdentifier& ti,
355 : bool secure);
356 :
357 : #ifndef DDS_HAS_MINIMUM_BIT
358 : PublicationBuiltinTopicDataDataReaderImpl* pub_bit();
359 : SubscriptionBuiltinTopicDataDataReaderImpl* sub_bit();
360 : #endif /* DDS_HAS_MINIMUM_BIT */
361 :
362 : void type_lookup_init(ReactorInterceptor_rch reactor_interceptor);
363 : void type_lookup_fini();
364 : void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service);
365 :
366 : void purge_dead_topic(const String& topic_name);
367 :
368 : void ignore(const GUID_t& to_ignore);
369 : bool ignoring(const GUID_t& guid) const;
370 : bool ignoring(const char* topic_name) const;
371 :
372 : TopicStatus assert_topic(GUID_t& topicId, const char* topicName,
373 : const char* dataTypeName, const DDS::TopicQos& qos,
374 : bool hasDcpsKey, TopicCallbacks* topic_callbacks);
375 : TopicStatus find_topic(const char* topicName,
376 : CORBA::String_out dataTypeName,
377 : DDS::TopicQos_out qos,
378 : GUID_t& topicId);
379 : TopicStatus remove_topic(const GUID_t& topicId);
380 :
381 :
382 : GUID_t add_publication(const GUID_t& topicId,
383 : DataWriterCallbacks_rch publication,
384 : const DDS::DataWriterQos& qos,
385 : const TransportLocatorSeq& transInfo,
386 : const DDS::PublisherQos& publisherQos,
387 : const XTypes::TypeInformation& type_info);
388 : void remove_publication(const GUID_t& publicationId);
389 : void update_publication_locators(const GUID_t& publicationId,
390 : const TransportLocatorSeq& transInfo);
391 :
392 : GUID_t add_subscription(const GUID_t& topicId,
393 : DataReaderCallbacks_rch subscription,
394 : const DDS::DataReaderQos& qos,
395 : const TransportLocatorSeq& transInfo,
396 : const DDS::SubscriberQos& subscriberQos,
397 : const char* filterClassName,
398 : const char* filterExpr,
399 : const DDS::StringSeq& params,
400 : const XTypes::TypeInformation& type_info);
401 : void remove_subscription(const GUID_t& subscriptionId);
402 : void update_subscription_locators(const GUID_t& subscriptionId,
403 : const TransportLocatorSeq& transInfo);
404 :
405 : void match_endpoints(GUID_t repoId, const TopicDetails& td,
406 : bool remove = false);
407 :
408 : void remove_assoc(const GUID_t& remove_from, const GUID_t& removing);
409 :
410 0 : virtual void add_assoc_i(const GUID_t& /* local_guid */, const LocalPublication& /* lpub */,
411 0 : const GUID_t& /* remote_guid */, const DiscoveredSubscription& /* dsub */) {}
412 0 : virtual void remove_assoc_i(const GUID_t& /* local_guid */, const LocalPublication& /* lpub */,
413 0 : const GUID_t& /* remote_guid */) {}
414 0 : virtual void add_assoc_i(const GUID_t& /* local_guid */, const LocalSubscription& /* lsub */,
415 0 : const GUID_t& /* remote_guid */, const DiscoveredPublication& /* dpub */) {}
416 0 : virtual void remove_assoc_i(const GUID_t& /* local_guid */, const LocalSubscription& /* lsub */,
417 0 : const GUID_t& /* remote_guid */) {}
418 :
419 : private:
420 : void match(const GUID_t& writer, const GUID_t& reader);
421 : void need_minimal_and_or_complete_types(const XTypes::TypeInformation* type_info,
422 : bool& need_minimal,
423 : bool& need_complete) const;
424 : void remove_expired_endpoints(const MonotonicTimePoint& /*now*/);
425 : void match_continue(const GUID_t& writer, const GUID_t& reader);
426 :
427 0 : void remove_from_bit(const DiscoveredPublication& pub)
428 : {
429 0 : remove_from_bit_i(pub);
430 0 : }
431 :
432 0 : void remove_from_bit(const DiscoveredSubscription& sub)
433 : {
434 0 : remove_from_bit_i(sub);
435 0 : }
436 :
437 : GUID_t make_topic_guid();
438 :
439 : bool has_dcps_key(const GUID_t& topicId) const;
440 :
441 : ACE_Thread_Mutex& lock_;
442 : GUID_t participant_id_;
443 : RepoIdSet ignored_guids_;
444 : unsigned int topic_counter_;
445 : LocalPublicationMap local_publications_;
446 : LocalSubscriptionMap local_subscriptions_;
447 : DiscoveredPublicationMap discovered_publications_;
448 : DiscoveredSubscriptionMap discovered_subscriptions_;
449 : TopicDetailsMap topics_;
450 : TopicNameMap topic_names_;
451 : OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
452 : OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) relay_only_readers_;
453 : const EndpointRegistry& registry_;
454 : #ifndef DDS_HAS_MINIMUM_BIT
455 : StaticParticipant& participant_;
456 : #endif
457 :
458 : XTypes::TypeLookupService_rch type_lookup_service_;
459 : typedef PmfSporadicTask<StaticEndpointManager> StaticEndpointManagerSporadic;
460 : RcHandle<StaticEndpointManagerSporadic> type_lookup_reply_deadline_processor_;
461 : TimeDuration max_type_lookup_service_reply_period_;
462 : SequenceNumber type_lookup_service_sequence_number_;
463 :
464 : struct TypeIdOrigSeqNumber {
465 : GuidPrefix_t participant; // Prefix of remote participant
466 : XTypes::TypeIdentifier type_id; // Remote type
467 : SequenceNumber seq_number; // Of the original request
468 : bool secure; // Communicate via secure endpoints or not
469 : MonotonicTimePoint time_started;
470 : };
471 :
472 : // Map from the sequence number of the most recent request for a type to its TypeIdentifier
473 : // and the sequence number of the first request sent for that type. Every time a new request
474 : // is sent for a type, a new entry must be stored.
475 : typedef OPENDDS_MAP(SequenceNumber, TypeIdOrigSeqNumber) OrigSeqNumberMap;
476 : OrigSeqNumberMap orig_seq_numbers_;
477 : };
478 :
479 : class StaticParticipant : public virtual RcObject {
480 : public:
481 0 : StaticParticipant(GUID_t& guid,
482 : const DDS::DomainParticipantQos& qos,
483 : const EndpointRegistry& registry)
484 0 : : qos_(qos)
485 0 : , endpoint_manager_(make_rch<StaticEndpointManager>(guid, ref(lock_), ref(registry), ref(*this)))
486 0 : {}
487 :
488 0 : void init_bit(const DDS::Subscriber_var& bit_subscriber)
489 : {
490 0 : bit_subscriber_ = bit_subscriber;
491 0 : endpoint_manager_->init_bit();
492 0 : }
493 :
494 0 : void fini_bit()
495 : {
496 0 : bit_subscriber_ = 0;
497 0 : }
498 :
499 0 : void shutdown() {}
500 :
501 : DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant);
502 :
503 : void fini_bit(DCPS::DomainParticipantImpl* participant);
504 :
505 : bool attach_participant(DDS::DomainId_t domainId, const GUID_t& participantId);
506 :
507 : bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t& participantId);
508 :
509 : bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t& myParticipantId,
510 : const GUID_t& ignoreId);
511 :
512 : bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t& participant,
513 : const DDS::DomainParticipantQos& qos);
514 :
515 : DCPS::TopicStatus assert_topic(
516 : GUID_t& topicId,
517 : DDS::DomainId_t domainId,
518 : const GUID_t& participantId,
519 : const char* topicName,
520 : const char* dataTypeName,
521 : const DDS::TopicQos& qos,
522 : bool hasDcpsKey,
523 : DCPS::TopicCallbacks* topic_callbacks);
524 :
525 : DCPS::TopicStatus find_topic(
526 : DDS::DomainId_t domainId,
527 : const GUID_t& participantId,
528 : const char* topicName,
529 : CORBA::String_out dataTypeName,
530 : DDS::TopicQos_out qos,
531 : GUID_t& topicId);
532 :
533 : DCPS::TopicStatus remove_topic(
534 : DDS::DomainId_t domainId,
535 : const GUID_t& participantId,
536 : const GUID_t& topicId);
537 :
538 : bool ignore_topic(DDS::DomainId_t domainId,
539 : const GUID_t& myParticipantId, const GUID_t& ignoreId);
540 :
541 : bool update_topic_qos(const GUID_t& topicId, DDS::DomainId_t domainId,
542 : const GUID_t& participantId, const DDS::TopicQos& qos);
543 :
544 : GUID_t add_publication(
545 : DDS::DomainId_t domainId,
546 : const GUID_t& participantId,
547 : const GUID_t& topicId,
548 : DCPS::DataWriterCallbacks_rch publication,
549 : const DDS::DataWriterQos& qos,
550 : const DCPS::TransportLocatorSeq& transInfo,
551 : const DDS::PublisherQos& publisherQos,
552 : const XTypes::TypeInformation& type_info);
553 :
554 : bool remove_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
555 : const GUID_t& publicationId);
556 :
557 : bool ignore_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
558 : const GUID_t& ignoreId);
559 :
560 : bool update_publication_qos(
561 : DDS::DomainId_t domainId,
562 : const GUID_t& partId,
563 : const GUID_t& dwId,
564 : const DDS::DataWriterQos& qos,
565 : const DDS::PublisherQos& publisherQos);
566 :
567 : void update_publication_locators(
568 : DDS::DomainId_t domainId,
569 : const GUID_t& partId,
570 : const GUID_t& dwId,
571 : const DCPS::TransportLocatorSeq& transInfo);
572 :
573 : GUID_t add_subscription(
574 : DDS::DomainId_t domainId,
575 : const GUID_t& participantId,
576 : const GUID_t& topicId,
577 : DCPS::DataReaderCallbacks_rch subscription,
578 : const DDS::DataReaderQos& qos,
579 : const DCPS::TransportLocatorSeq& transInfo,
580 : const DDS::SubscriberQos& subscriberQos,
581 : const char* filterClassName,
582 : const char* filterExpr,
583 : const DDS::StringSeq& params,
584 : const XTypes::TypeInformation& type_info);
585 :
586 : bool remove_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
587 : const GUID_t& subscriptionId);
588 :
589 : bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
590 : const GUID_t& ignoreId);
591 :
592 : bool update_subscription_qos(
593 : DDS::DomainId_t domainId,
594 : const GUID_t& partId,
595 : const GUID_t& drId,
596 : const DDS::DataReaderQos& qos,
597 : const DDS::SubscriberQos& subQos);
598 :
599 : bool update_subscription_params(
600 : DDS::DomainId_t domainId,
601 : const GUID_t& partId,
602 : const GUID_t& subId,
603 : const DDS::StringSeq& params);
604 :
605 : void update_subscription_locators(
606 : DDS::DomainId_t domainId,
607 : const GUID_t& partId,
608 : const GUID_t& subId,
609 : const DCPS::TransportLocatorSeq& transInfo);
610 :
611 0 : void ignore_domain_participant(const GUID_t& ignoreId)
612 : {
613 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
614 0 : endpoint_manager().ignore(ignoreId);
615 :
616 0 : DiscoveredParticipantIter iter = participants_.find(ignoreId);
617 0 : if (iter != participants_.end()) {
618 0 : remove_discovered_participant(iter);
619 : }
620 0 : }
621 :
622 0 : virtual bool announce_domain_participant_qos()
623 : {
624 0 : return true;
625 : }
626 :
627 0 : bool update_domain_participant_qos(const DDS::DomainParticipantQos& qos)
628 : {
629 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, false);
630 0 : qos_ = qos;
631 0 : return announce_domain_participant_qos();
632 0 : }
633 :
634 0 : TopicStatus assert_topic(
635 : GUID_t& topicId, const char* topicName,
636 : const char* dataTypeName, const DDS::TopicQos& qos,
637 : bool hasDcpsKey, TopicCallbacks* topic_callbacks)
638 : {
639 0 : if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
640 0 : if (DCPS_debug_level) {
641 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
642 : ACE_TEXT("topic or type name length limit (256) exceeded\n")));
643 : }
644 0 : return PRECONDITION_NOT_MET;
645 : }
646 :
647 0 : return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey, topic_callbacks);
648 : }
649 :
650 0 : TopicStatus find_topic(
651 : const char* topicName,
652 : CORBA::String_out dataTypeName,
653 : DDS::TopicQos_out qos,
654 : GUID_t& topicId)
655 : {
656 0 : return endpoint_manager().find_topic(topicName, dataTypeName, qos, topicId);
657 : }
658 :
659 0 : TopicStatus remove_topic(const GUID_t& topicId)
660 : {
661 0 : return endpoint_manager().remove_topic(topicId);
662 : }
663 :
664 0 : void ignore_topic(const GUID_t& ignoreId)
665 : {
666 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
667 0 : endpoint_manager().ignore(ignoreId);
668 0 : }
669 :
670 0 : bool update_topic_qos(const GUID_t& topicId, const DDS::TopicQos& qos)
671 : {
672 0 : return endpoint_manager().update_topic_qos(topicId, qos);
673 : }
674 :
675 0 : GUID_t add_publication(
676 : const GUID_t& topicId,
677 : DataWriterCallbacks_rch publication,
678 : const DDS::DataWriterQos& qos,
679 : const TransportLocatorSeq& transInfo,
680 : const DDS::PublisherQos& publisherQos,
681 : const XTypes::TypeInformation& type_info)
682 : {
683 0 : return endpoint_manager().add_publication(topicId, publication, qos,
684 0 : transInfo, publisherQos, type_info);
685 : }
686 :
687 0 : void remove_publication(const GUID_t& publicationId)
688 : {
689 0 : endpoint_manager().remove_publication(publicationId);
690 0 : }
691 :
692 0 : void ignore_publication(const GUID_t& ignoreId)
693 : {
694 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
695 0 : return endpoint_manager().ignore(ignoreId);
696 0 : }
697 :
698 0 : bool update_publication_qos(
699 : const GUID_t& publicationId,
700 : const DDS::DataWriterQos& qos,
701 : const DDS::PublisherQos& publisherQos)
702 : {
703 0 : return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
704 : }
705 :
706 0 : void update_publication_locators(const GUID_t& publicationId,
707 : const TransportLocatorSeq& transInfo)
708 : {
709 0 : endpoint_manager().update_publication_locators(publicationId, transInfo);
710 0 : }
711 :
712 0 : GUID_t add_subscription(
713 : const GUID_t& topicId,
714 : DataReaderCallbacks_rch subscription,
715 : const DDS::DataReaderQos& qos,
716 : const TransportLocatorSeq& transInfo,
717 : const DDS::SubscriberQos& subscriberQos,
718 : const char* filterClassName,
719 : const char* filterExpr,
720 : const DDS::StringSeq& params,
721 : const XTypes::TypeInformation& type_info)
722 : {
723 0 : return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
724 0 : subscriberQos, filterClassName, filterExpr, params, type_info);
725 : }
726 :
727 0 : void remove_subscription(const GUID_t& subscriptionId)
728 : {
729 0 : endpoint_manager().remove_subscription(subscriptionId);
730 0 : }
731 :
732 0 : void ignore_subscription(const GUID_t& ignoreId)
733 : {
734 0 : ACE_GUARD(ACE_Thread_Mutex, g, lock_);
735 0 : return endpoint_manager().ignore(ignoreId);
736 0 : }
737 :
738 0 : bool update_subscription_qos(
739 : const GUID_t& subscriptionId,
740 : const DDS::DataReaderQos& qos,
741 : const DDS::SubscriberQos& subscriberQos)
742 : {
743 0 : return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
744 : }
745 :
746 0 : bool update_subscription_params(const GUID_t& subId, const DDS::StringSeq& params)
747 : {
748 0 : return endpoint_manager().update_subscription_params(subId, params);
749 : }
750 :
751 0 : void update_subscription_locators(const GUID_t& subId, const TransportLocatorSeq& transInfo)
752 : {
753 0 : endpoint_manager().update_subscription_locators(subId, transInfo);
754 0 : }
755 :
756 0 : DDS::Subscriber_var bit_subscriber() const
757 : {
758 0 : return bit_subscriber_;
759 : }
760 :
761 0 : void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service)
762 : {
763 0 : endpoint_manager().type_lookup_service(type_lookup_service);
764 0 : }
765 :
766 : private:
767 : struct DiscoveredParticipant {
768 :
769 : DiscoveredParticipant()
770 : : location_ih_(DDS::HANDLE_NIL)
771 : , bit_ih_(DDS::HANDLE_NIL)
772 : , seq_reset_count_(0)
773 : {
774 : }
775 :
776 : struct LocationUpdate {
777 : ParticipantLocation mask_;
778 : ACE_INET_Addr from_;
779 : SystemTimePoint timestamp_;
780 : LocationUpdate() {}
781 : LocationUpdate(ParticipantLocation mask,
782 : const ACE_INET_Addr& from,
783 : const SystemTimePoint& timestamp)
784 : : mask_(mask), from_(from), timestamp_(timestamp) {}
785 : };
786 : typedef OPENDDS_VECTOR(LocationUpdate) LocationUpdateList;
787 : LocationUpdateList location_updates_;
788 : ParticipantLocationBuiltinTopicData location_data_;
789 : DDS::InstanceHandle_t location_ih_;
790 :
791 : ACE_INET_Addr local_address_;
792 : MonotonicTimePoint discovered_at_;
793 : MonotonicTimePoint lease_expiration_;
794 : DDS::InstanceHandle_t bit_ih_;
795 : SequenceNumber max_seq_;
796 : ACE_UINT16 seq_reset_count_;
797 : };
798 :
799 : typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredParticipant,
800 : GUID_tKeyLessThan) DiscoveredParticipantMap;
801 : typedef DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
802 : typedef DiscoveredParticipantMap::const_iterator
803 : DiscoveredParticipantConstIter;
804 :
805 : void remove_discovered_participant(DiscoveredParticipantIter& iter);
806 :
807 0 : virtual void remove_discovered_participant_i(DiscoveredParticipantIter&) {}
808 :
809 : #ifndef DDS_HAS_MINIMUM_BIT
810 0 : ParticipantBuiltinTopicDataDataReaderImpl* part_bit()
811 : {
812 0 : DDS::Subscriber_var bit_sub(bit_subscriber());
813 0 : if (!bit_sub.in())
814 0 : return 0;
815 :
816 : DDS::DataReader_var d =
817 0 : bit_sub->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
818 0 : return dynamic_cast<ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
819 0 : }
820 :
821 0 : ParticipantLocationBuiltinTopicDataDataReaderImpl* part_loc_bit()
822 : {
823 0 : DDS::Subscriber_var bit_sub(bit_subscriber());
824 0 : if (!bit_sub.in())
825 0 : return 0;
826 :
827 : DDS::DataReader_var d =
828 0 : bit_sub->lookup_datareader(BUILT_IN_PARTICIPANT_LOCATION_TOPIC);
829 0 : return dynamic_cast<ParticipantLocationBuiltinTopicDataDataReaderImpl*>(d.in());
830 0 : }
831 :
832 : ConnectionRecordDataReaderImpl* connection_record_bit()
833 : {
834 : DDS::Subscriber_var bit_sub(bit_subscriber());
835 : if (!bit_sub.in())
836 : return 0;
837 :
838 : DDS::DataReader_var d =
839 : bit_sub->lookup_datareader(BUILT_IN_CONNECTION_RECORD_TOPIC);
840 : return dynamic_cast<ConnectionRecordDataReaderImpl*>(d.in());
841 : }
842 :
843 : InternalThreadBuiltinTopicDataDataReaderImpl* internal_thread_bit()
844 : {
845 : DDS::Subscriber_var bit_sub(bit_subscriber());
846 : if (!bit_sub.in())
847 : return 0;
848 :
849 : DDS::DataReader_var d =
850 : bit_sub->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
851 : return dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
852 : }
853 : #endif /* DDS_HAS_MINIMUM_BIT */
854 :
855 0 : StaticEndpointManager& endpoint_manager() { return *endpoint_manager_; }
856 :
857 : mutable ACE_Thread_Mutex lock_;
858 : DDS::Subscriber_var bit_subscriber_;
859 : DDS::DomainParticipantQos qos_;
860 : DiscoveredParticipantMap participants_;
861 :
862 : RcHandle<StaticEndpointManager> endpoint_manager_;
863 : };
864 :
865 : class OpenDDS_Dcps_Export StaticDiscovery : public Discovery {
866 : public:
867 : explicit StaticDiscovery(const RepoKey& key);
868 :
869 : int load_configuration(ACE_Configuration_Heap& config);
870 :
871 : virtual GUID_t generate_participant_guid();
872 :
873 : virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain,
874 : const DDS::DomainParticipantQos& qos,
875 : XTypes::TypeLookupService_rch tls);
876 :
877 : #if defined(OPENDDS_SECURITY)
878 : virtual AddDomainStatus add_domain_participant_secure(
879 : DDS::DomainId_t domain,
880 : const DDS::DomainParticipantQos& qos,
881 : XTypes::TypeLookupService_rch tls,
882 : const GUID_t& guid,
883 : DDS::Security::IdentityHandle id,
884 : DDS::Security::PermissionsHandle perm,
885 : DDS::Security::ParticipantCryptoHandle part_crypto);
886 : #endif
887 :
888 : EndpointRegistry registry;
889 :
890 0 : static StaticDiscovery_rch instance() { return instance_; }
891 :
892 : RcHandle<BitSubscriber> init_bit(DCPS::DomainParticipantImpl* participant);
893 :
894 : void fini_bit(DCPS::DomainParticipantImpl* participant);
895 :
896 : bool attach_participant(DDS::DomainId_t domainId, const GUID_t& participantId);
897 :
898 : bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t& participantId);
899 :
900 : bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t& myParticipantId,
901 : const GUID_t& ignoreId);
902 :
903 : bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t& participant,
904 : const DDS::DomainParticipantQos& qos);
905 :
906 : DCPS::TopicStatus assert_topic(
907 : GUID_t& topicId,
908 : DDS::DomainId_t domainId,
909 : const GUID_t& participantId,
910 : const char* topicName,
911 : const char* dataTypeName,
912 : const DDS::TopicQos& qos,
913 : bool hasDcpsKey,
914 : DCPS::TopicCallbacks* topic_callbacks);
915 :
916 : DCPS::TopicStatus find_topic(
917 : DDS::DomainId_t domainId,
918 : const GUID_t& participantId,
919 : const char* topicName,
920 : CORBA::String_out dataTypeName,
921 : DDS::TopicQos_out qos,
922 : GUID_t& topicId);
923 :
924 : DCPS::TopicStatus remove_topic(
925 : DDS::DomainId_t domainId,
926 : const GUID_t& participantId,
927 : const GUID_t& topicId);
928 :
929 : bool ignore_topic(DDS::DomainId_t domainId,
930 : const GUID_t& myParticipantId, const GUID_t& ignoreId);
931 :
932 : bool update_topic_qos(const GUID_t& topicId, DDS::DomainId_t domainId,
933 : const GUID_t& participantId, const DDS::TopicQos& qos);
934 :
935 : GUID_t add_publication(
936 : DDS::DomainId_t domainId,
937 : const GUID_t& participantId,
938 : const GUID_t& topicId,
939 : DCPS::DataWriterCallbacks_rch publication,
940 : const DDS::DataWriterQos& qos,
941 : const DCPS::TransportLocatorSeq& transInfo,
942 : const DDS::PublisherQos& publisherQos,
943 : const XTypes::TypeInformation& type_info);
944 :
945 : bool remove_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
946 : const GUID_t& publicationId);
947 :
948 : bool ignore_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
949 : const GUID_t& ignoreId);
950 :
951 : bool update_publication_qos(
952 : DDS::DomainId_t domainId,
953 : const GUID_t& partId,
954 : const GUID_t& dwId,
955 : const DDS::DataWriterQos& qos,
956 : const DDS::PublisherQos& publisherQos);
957 :
958 : void update_publication_locators(
959 : DDS::DomainId_t domainId,
960 : const GUID_t& partId,
961 : const GUID_t& dwId,
962 : const DCPS::TransportLocatorSeq& transInfo);
963 :
964 : GUID_t add_subscription(
965 : DDS::DomainId_t domainId,
966 : const GUID_t& participantId,
967 : const GUID_t& topicId,
968 : DCPS::DataReaderCallbacks_rch subscription,
969 : const DDS::DataReaderQos& qos,
970 : const DCPS::TransportLocatorSeq& transInfo,
971 : const DDS::SubscriberQos& subscriberQos,
972 : const char* filterClassName,
973 : const char* filterExpr,
974 : const DDS::StringSeq& params,
975 : const XTypes::TypeInformation& type_info);
976 :
977 : bool remove_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
978 : const GUID_t& subscriptionId);
979 :
980 : bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
981 : const GUID_t& ignoreId);
982 :
983 : bool update_subscription_qos(
984 : DDS::DomainId_t domainId,
985 : const GUID_t& partId,
986 : const GUID_t& drId,
987 : const DDS::DataReaderQos& qos,
988 : const DDS::SubscriberQos& subQos);
989 :
990 : bool update_subscription_params(
991 : DDS::DomainId_t domainId,
992 : const GUID_t& partId,
993 : const GUID_t& subId,
994 : const DDS::StringSeq& params);
995 :
996 : void update_subscription_locators(
997 : DDS::DomainId_t domainId,
998 : const GUID_t& partId,
999 : const GUID_t& subId,
1000 : const DCPS::TransportLocatorSeq& transInfo);
1001 :
1002 : private:
1003 : typedef RcHandle<StaticParticipant> ParticipantHandle;
1004 : typedef OPENDDS_MAP_CMP(GUID_t, ParticipantHandle, GUID_tKeyLessThan) ParticipantMap;
1005 : typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
1006 :
1007 : ParticipantHandle get_part(const DDS::DomainId_t domain_id, const GUID_t& part_id) const;
1008 :
1009 : void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
1010 : SubscriberImpl* sub,
1011 : const DDS::DataReaderQos& qos);
1012 :
1013 : int parse_topics(ACE_Configuration_Heap& cf);
1014 : int parse_datawriterqos(ACE_Configuration_Heap& cf);
1015 : int parse_datareaderqos(ACE_Configuration_Heap& cf);
1016 : int parse_publisherqos(ACE_Configuration_Heap& cf);
1017 : int parse_subscriberqos(ACE_Configuration_Heap& cf);
1018 : int parse_endpoints(ACE_Configuration_Heap& cf);
1019 :
1020 : void pre_writer(DataWriterImpl* writer);
1021 : void pre_reader(DataReaderImpl* reader);
1022 :
1023 : static StaticDiscovery_rch instance_;
1024 :
1025 : mutable ACE_Thread_Mutex lock_;
1026 :
1027 : DomainParticipantMap participants_;
1028 : };
1029 :
1030 : } // namespace DCPS
1031 : } // namespace OpenDDS
1032 :
1033 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
1034 :
1035 : #endif /* OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H */
|