OpenDDS  Snapshot(2023/04/28-20:55)
StaticDiscovery.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_STATICDISCOVERY_H
7 #define OPENDDS_DCPS_STATICDISCOVERY_H
8 
9 #include "WaitSet.h"
10 #include "PoolAllocator.h"
11 #include "TopicDetails.h"
12 #include "SporadicTask.h"
13 #include "GuidUtils.h"
14 #include "Marked_Default_Qos.h"
15 #include "DCPS_Utils.h"
17 #include "dcps_export.h"
18 
19 #include <ace/Configuration.h>
20 
21 #ifndef ACE_LACKS_PRAGMA_ONCE
22 # pragma once
23 #endif
24 
26 
27 namespace OpenDDS {
28 namespace DCPS {
29 
31 
33 
35 public:
36  struct Topic {
39  };
40  typedef OPENDDS_MAP(OPENDDS_STRING, Topic) TopicMapType;
41  TopicMapType topic_map;
42 
43  typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataReaderQos) DataReaderQosMapType;
44  DataReaderQosMapType datareaderqos_map;
45 
46  typedef OPENDDS_MAP(OPENDDS_STRING, DDS::DataWriterQos) DataWriterQosMapType;
47  DataWriterQosMapType datawriterqos_map;
48 
49  typedef OPENDDS_MAP(OPENDDS_STRING, DDS::SubscriberQos) SubscriberQosMapType;
50  SubscriberQosMapType subscriberqos_map;
51 
52  typedef OPENDDS_MAP(OPENDDS_STRING, DDS::PublisherQos) PublisherQosMapType;
53  PublisherQosMapType publisherqos_map;
54 
55  typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) RepoIdSetType;
56  struct Reader {
62  RepoIdSetType best_effort_writers;
63  RepoIdSetType reliable_writers;
65  const DDS::DataReaderQos& q,
66  const DDS::SubscriberQos& sq,
67  const OPENDDS_STRING& transport_cfg,
68  const TransportLocatorSeq& ti)
69  : topic_name(tn)
70  , qos(q)
71  , subscriber_qos(sq)
72  , trans_cfg(transport_cfg)
73  , trans_info(ti)
74  {}
75  };
76  typedef OPENDDS_MAP_CMP(GUID_t, Reader, GUID_tKeyLessThan) ReaderMapType;
77  ReaderMapType reader_map;
78 
79  struct Writer {
85  RepoIdSetType best_effort_readers;
86  RepoIdSetType reliable_readers;
88  const DDS::DataWriterQos& q,
89  const DDS::PublisherQos& pq,
90  const OPENDDS_STRING& transport_cfg,
91  const TransportLocatorSeq& ti)
92  : topic_name(tn)
93  , qos(q)
94  , publisher_qos(pq)
95  , trans_cfg(transport_cfg)
96  , trans_info(ti)
97  {}
98  };
99  typedef OPENDDS_MAP_CMP(GUID_t, Writer, GUID_tKeyLessThan) WriterMapType;
100  WriterMapType writer_map;
101 
103 
104  bool
105  operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
106  {
107  return std::memcmp(&lhs[2], &rhs[2], sizeof(DDS::DomainId_t)) == 0;
108  }
109  };
111 
112  bool
113  operator() (const GuidPrefix_t& lhs, const GuidPrefix_t& rhs) const
114  {
115  return std::memcmp(&lhs[6], &rhs[6], 6) == 0;
116  }
117  };
118 
119  void match();
120 
121  static EntityId_t build_id(const unsigned char* entity_key /* length of 3 */,
122  const unsigned char entity_kind);
123 
124  static GUID_t build_id(DDS::DomainId_t domain,
125  const unsigned char* participant_id /* length of 6 */,
126  const EntityId_t& entity_id);
127 };
128 
129 class StaticParticipant;
131  : public virtual RcEventHandler
132  , public DiscoveryListener
133 {
134 protected:
137  : bit_ih_(DDS::HANDLE_NIL)
138  , participant_discovered_at_(monotonic_time_zero())
139  , transport_context_(0)
140  {
141  }
142 
144  : reader_data_(r)
145  , bit_ih_(DDS::HANDLE_NIL)
146  , participant_discovered_at_(monotonic_time_zero())
147  , transport_context_(0)
148  {
149  }
150 
157 
158  const char* get_topic_name() const
159  {
160  return reader_data_.ddsSubscriptionData.topic_name;
161  }
162 
163  const char* get_type_name() const
164  {
165  return reader_data_.ddsSubscriptionData.type_name;
166  }
167  };
168 
170  GUID_tKeyLessThan) DiscoveredSubscriptionMap;
171 
172  typedef DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter;
173 
176  : bit_ih_(DDS::HANDLE_NIL)
177  , participant_discovered_at_(monotonic_time_zero())
178  , transport_context_(0)
179  {
180  }
181 
183  : writer_data_(w)
184  , bit_ih_(DDS::HANDLE_NIL)
185  , participant_discovered_at_(monotonic_time_zero())
186  , transport_context_(0)
187  {
188  }
189 
196 
197  const char* get_topic_name() const
198  {
199  return writer_data_.ddsPublicationData.topic_name;
200  }
201 
202  const char* get_type_name() const
203  {
204  return writer_data_.ddsPublicationData.type_name;
205  }
206  };
207 
209  GUID_tKeyLessThan) DiscoveredPublicationMap;
210  typedef DiscoveredPublicationMap::iterator DiscoveredPublicationIter;
211 
212  struct LocalEndpoint {
214  : topic_id_(GUID_UNKNOWN)
215  , participant_discovered_at_(monotonic_time_zero())
216  , transport_context_(0)
217  , sequence_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
218  {
219  }
220 
229  };
230 
235  };
236 
242  };
243 
245  GUID_tKeyLessThan) LocalPublicationMap;
246  typedef LocalPublicationMap::iterator LocalPublicationIter;
247  typedef LocalPublicationMap::const_iterator LocalPublicationCIter;
248 
250  GUID_tKeyLessThan) LocalSubscriptionMap;
251  typedef LocalSubscriptionMap::iterator LocalSubscriptionIter;
252  typedef LocalSubscriptionMap::const_iterator LocalSubscriptionCIter;
253 
254  typedef OPENDDS_MAP_CMP(GUID_t, String, GUID_tKeyLessThan) TopicNameMap;
255 
257  {
258  return pub.writer_data_.ddsPublicationData.key;
259  }
261  {
262  return sub.reader_data_.ddsSubscriptionData.key;
263  }
264 
265  virtual void remove_from_bit_i(const DiscoveredPublication& /*pub*/) { }
266  virtual void remove_from_bit_i(const DiscoveredSubscription& /*sub*/) { }
267 
269  LocalPublication& /*pub*/,
270  const GUID_t& reader = GUID_UNKNOWN)
271  {
272  ACE_UNUSED_ARG(reader);
273  return DDS::RETCODE_OK;
274  }
275 
277  LocalSubscription& /*pub*/,
278  const GUID_t& reader = GUID_UNKNOWN)
279  {
280  ACE_UNUSED_ARG(reader);
281  return DDS::RETCODE_OK;
282  }
283 
284  virtual bool send_type_lookup_request(const XTypes::TypeIdentifierSeq& /*type_ids*/,
285  const GUID_t& /*endpoint*/,
286  bool /*is_discovery_protected*/,
287  bool /*send_get_types*/)
288  {
289  return true;
290  }
291 
292 public:
293  StaticEndpointManager(const GUID_t& participant_id,
294  ACE_Thread_Mutex& lock,
295  const EndpointRegistry& registry,
296  StaticParticipant& participant);
297 
299 
300  void init_bit();
301 
302  virtual void assign_publication_key(GUID_t& rid,
303  const GUID_t& topicId,
304  const DDS::DataWriterQos& qos);
305  virtual void assign_subscription_key(GUID_t& rid,
306  const GUID_t& topicId,
307  const DDS::DataReaderQos& qos);
308 
309  virtual bool update_topic_qos(const GUID_t& /*topicId*/,
310  const DDS::TopicQos& /*qos*/);
311 
312  virtual bool update_publication_qos(const GUID_t& /*publicationId*/,
313  const DDS::DataWriterQos& /*qos*/,
314  const DDS::PublisherQos& /*publisherQos*/);
315 
316  virtual bool update_subscription_qos(const GUID_t& /*subscriptionId*/,
317  const DDS::DataReaderQos& /*qos*/,
318  const DDS::SubscriberQos& /*subscriberQos*/);
319 
320  virtual bool update_subscription_params(const GUID_t& /*subId*/,
321  const DDS::StringSeq& /*params*/);
322 
323  virtual bool disassociate();
324 
325  virtual DDS::ReturnCode_t add_publication_i(const GUID_t& /*rid*/,
326  LocalPublication& /*pub*/);
327 
328  virtual DDS::ReturnCode_t remove_publication_i(const GUID_t& /*publicationId*/,
329  LocalPublication& /*pub*/);
330 
331  virtual DDS::ReturnCode_t add_subscription_i(const GUID_t& /*rid*/,
332  LocalSubscription& /*pub*/);
333 
334  virtual DDS::ReturnCode_t remove_subscription_i(const GUID_t& /*subscriptionId*/,
335  LocalSubscription& /*sub*/);
336 
337  virtual bool is_expectant_opendds(const GUID_t& endpoint) const;
338 
339  virtual bool shutting_down() const;
340 
341  virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
342  DiscoveredSubscriptionIter& /*iter*/,
343  const GUID_t& /*reader*/);
344 
345  virtual void populate_transport_locator_sequence(TransportLocatorSeq*& /*tls*/,
346  DiscoveredPublicationIter& /*iter*/,
347  const GUID_t& /*reader*/);
348 
349  virtual void reader_exists(const GUID_t& readerid, const GUID_t& writerid);
350  virtual void reader_does_not_exist(const GUID_t& readerid, const GUID_t& writerid);
351  virtual void writer_exists(const GUID_t& writerid, const GUID_t& readerid);
352  virtual void writer_does_not_exist(const GUID_t& writerid, const GUID_t& readerid);
353  void cleanup_type_lookup_data(const GuidPrefix_t& prefix,
354  const XTypes::TypeIdentifier& ti,
355  bool secure);
356 
357 #ifndef DDS_HAS_MINIMUM_BIT
360 #endif /* DDS_HAS_MINIMUM_BIT */
361 
362  void type_lookup_init(ReactorInterceptor_rch reactor_interceptor);
363  void type_lookup_fini();
364  void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service);
365 
366  void purge_dead_topic(const String& topic_name);
367 
368  void ignore(const GUID_t& to_ignore);
369  bool ignoring(const GUID_t& guid) const;
370  bool ignoring(const char* topic_name) const;
371 
372  TopicStatus assert_topic(GUID_t& topicId, const char* topicName,
373  const char* dataTypeName, const DDS::TopicQos& qos,
374  bool hasDcpsKey, TopicCallbacks* topic_callbacks);
375  TopicStatus find_topic(const char* topicName,
376  CORBA::String_out dataTypeName,
377  DDS::TopicQos_out qos,
378  GUID_t& topicId);
379  TopicStatus remove_topic(const GUID_t& topicId);
380 
381 
382  GUID_t add_publication(const GUID_t& topicId,
383  DataWriterCallbacks_rch publication,
384  const DDS::DataWriterQos& qos,
385  const TransportLocatorSeq& transInfo,
386  const DDS::PublisherQos& publisherQos,
387  const XTypes::TypeInformation& type_info);
388  void remove_publication(const GUID_t& publicationId);
389  void update_publication_locators(const GUID_t& publicationId,
390  const TransportLocatorSeq& transInfo);
391 
392  GUID_t add_subscription(const GUID_t& topicId,
393  DataReaderCallbacks_rch subscription,
394  const DDS::DataReaderQos& qos,
395  const TransportLocatorSeq& transInfo,
396  const DDS::SubscriberQos& subscriberQos,
397  const char* filterClassName,
398  const char* filterExpr,
399  const DDS::StringSeq& params,
400  const XTypes::TypeInformation& type_info);
401  void remove_subscription(const GUID_t& subscriptionId);
402  void update_subscription_locators(const GUID_t& subscriptionId,
403  const TransportLocatorSeq& transInfo);
404 
405  void match_endpoints(GUID_t repoId, const TopicDetails& td,
406  bool remove = false);
407 
408  void remove_assoc(const GUID_t& remove_from, const GUID_t& removing);
409 
410  virtual void add_assoc_i(const GUID_t& /* local_guid */, const LocalPublication& /* lpub */,
411  const GUID_t& /* remote_guid */, const DiscoveredSubscription& /* dsub */) {}
412  virtual void remove_assoc_i(const GUID_t& /* local_guid */, const LocalPublication& /* lpub */,
413  const GUID_t& /* remote_guid */) {}
414  virtual void add_assoc_i(const GUID_t& /* local_guid */, const LocalSubscription& /* lsub */,
415  const GUID_t& /* remote_guid */, const DiscoveredPublication& /* dpub */) {}
416  virtual void remove_assoc_i(const GUID_t& /* local_guid */, const LocalSubscription& /* lsub */,
417  const GUID_t& /* remote_guid */) {}
418 
419 private:
420  void match(const GUID_t& writer, const GUID_t& reader);
421  void need_minimal_and_or_complete_types(const XTypes::TypeInformation* type_info,
422  bool& need_minimal,
423  bool& need_complete) const;
424  void remove_expired_endpoints(const MonotonicTimePoint& /*now*/);
425  void match_continue(const GUID_t& writer, const GUID_t& reader);
426 
428  {
429  remove_from_bit_i(pub);
430  }
431 
433  {
434  remove_from_bit_i(sub);
435  }
436 
437  GUID_t make_topic_guid();
438 
439  bool has_dcps_key(const GUID_t& topicId) const;
440 
444  unsigned int topic_counter_;
445  LocalPublicationMap local_publications_;
446  LocalSubscriptionMap local_subscriptions_;
447  DiscoveredPublicationMap discovered_publications_;
448  DiscoveredSubscriptionMap discovered_subscriptions_;
449  TopicDetailsMap topics_;
450  TopicNameMap topic_names_;
451  OPENDDS_SET(OPENDDS_STRING) ignored_topics_;
452  OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) relay_only_readers_;
454 #ifndef DDS_HAS_MINIMUM_BIT
456 #endif
457 
463 
465  GuidPrefix_t participant; // Prefix of remote participant
467  SequenceNumber seq_number; // Of the original request
468  bool secure; // Communicate via secure endpoints or not
470  };
471 
472  // Map from the sequence number of the most recent request for a type to its TypeIdentifier
473  // and the sequence number of the first request sent for that type. Every time a new request
474  // is sent for a type, a new entry must be stored.
475  typedef OPENDDS_MAP(SequenceNumber, TypeIdOrigSeqNumber) OrigSeqNumberMap;
476  OrigSeqNumberMap orig_seq_numbers_;
477 };
478 
479 class StaticParticipant : public virtual RcObject {
480 public:
482  const DDS::DomainParticipantQos& qos,
483  const EndpointRegistry& registry)
484  : qos_(qos)
485  , endpoint_manager_(make_rch<StaticEndpointManager>(guid, ref(lock_), ref(registry), ref(*this)))
486  {}
487 
488  void init_bit(const DDS::Subscriber_var& bit_subscriber)
489  {
490  bit_subscriber_ = bit_subscriber;
491  endpoint_manager_->init_bit();
492  }
493 
494  void fini_bit()
495  {
496  bit_subscriber_ = 0;
497  }
498 
499  void shutdown() {}
500 
501  DDS::Subscriber_ptr init_bit(DomainParticipantImpl* participant);
502 
503  void fini_bit(DCPS::DomainParticipantImpl* participant);
504 
505  bool attach_participant(DDS::DomainId_t domainId, const GUID_t& participantId);
506 
507  bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t& participantId);
508 
509  bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t& myParticipantId,
510  const GUID_t& ignoreId);
511 
512  bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t& participant,
513  const DDS::DomainParticipantQos& qos);
514 
515  DCPS::TopicStatus assert_topic(
516  GUID_t& topicId,
517  DDS::DomainId_t domainId,
518  const GUID_t& participantId,
519  const char* topicName,
520  const char* dataTypeName,
521  const DDS::TopicQos& qos,
522  bool hasDcpsKey,
523  DCPS::TopicCallbacks* topic_callbacks);
524 
525  DCPS::TopicStatus find_topic(
526  DDS::DomainId_t domainId,
527  const GUID_t& participantId,
528  const char* topicName,
529  CORBA::String_out dataTypeName,
530  DDS::TopicQos_out qos,
531  GUID_t& topicId);
532 
533  DCPS::TopicStatus remove_topic(
534  DDS::DomainId_t domainId,
535  const GUID_t& participantId,
536  const GUID_t& topicId);
537 
538  bool ignore_topic(DDS::DomainId_t domainId,
539  const GUID_t& myParticipantId, const GUID_t& ignoreId);
540 
541  bool update_topic_qos(const GUID_t& topicId, DDS::DomainId_t domainId,
542  const GUID_t& participantId, const DDS::TopicQos& qos);
543 
544  GUID_t add_publication(
545  DDS::DomainId_t domainId,
546  const GUID_t& participantId,
547  const GUID_t& topicId,
548  DCPS::DataWriterCallbacks_rch publication,
549  const DDS::DataWriterQos& qos,
550  const DCPS::TransportLocatorSeq& transInfo,
551  const DDS::PublisherQos& publisherQos,
552  const XTypes::TypeInformation& type_info);
553 
554  bool remove_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
555  const GUID_t& publicationId);
556 
557  bool ignore_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
558  const GUID_t& ignoreId);
559 
560  bool update_publication_qos(
561  DDS::DomainId_t domainId,
562  const GUID_t& partId,
563  const GUID_t& dwId,
564  const DDS::DataWriterQos& qos,
565  const DDS::PublisherQos& publisherQos);
566 
567  void update_publication_locators(
568  DDS::DomainId_t domainId,
569  const GUID_t& partId,
570  const GUID_t& dwId,
571  const DCPS::TransportLocatorSeq& transInfo);
572 
573  GUID_t add_subscription(
574  DDS::DomainId_t domainId,
575  const GUID_t& participantId,
576  const GUID_t& topicId,
577  DCPS::DataReaderCallbacks_rch subscription,
578  const DDS::DataReaderQos& qos,
579  const DCPS::TransportLocatorSeq& transInfo,
580  const DDS::SubscriberQos& subscriberQos,
581  const char* filterClassName,
582  const char* filterExpr,
583  const DDS::StringSeq& params,
584  const XTypes::TypeInformation& type_info);
585 
586  bool remove_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
587  const GUID_t& subscriptionId);
588 
589  bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
590  const GUID_t& ignoreId);
591 
592  bool update_subscription_qos(
593  DDS::DomainId_t domainId,
594  const GUID_t& partId,
595  const GUID_t& drId,
596  const DDS::DataReaderQos& qos,
597  const DDS::SubscriberQos& subQos);
598 
599  bool update_subscription_params(
600  DDS::DomainId_t domainId,
601  const GUID_t& partId,
602  const GUID_t& subId,
603  const DDS::StringSeq& params);
604 
605  void update_subscription_locators(
606  DDS::DomainId_t domainId,
607  const GUID_t& partId,
608  const GUID_t& subId,
609  const DCPS::TransportLocatorSeq& transInfo);
610 
611  void ignore_domain_participant(const GUID_t& ignoreId)
612  {
614  endpoint_manager().ignore(ignoreId);
615 
616  DiscoveredParticipantIter iter = participants_.find(ignoreId);
617  if (iter != participants_.end()) {
618  remove_discovered_participant(iter);
619  }
620  }
621 
623  {
624  return true;
625  }
626 
628  {
630  qos_ = qos;
631  return announce_domain_participant_qos();
632  }
633 
635  GUID_t& topicId, const char* topicName,
636  const char* dataTypeName, const DDS::TopicQos& qos,
637  bool hasDcpsKey, TopicCallbacks* topic_callbacks)
638  {
639  if (std::strlen(topicName) > 256 || std::strlen(dataTypeName) > 256) {
640  if (DCPS_debug_level) {
641  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR LocalParticipant::assert_topic() - ")
642  ACE_TEXT("topic or type name length limit (256) exceeded\n")));
643  }
644  return PRECONDITION_NOT_MET;
645  }
646 
647  return endpoint_manager().assert_topic(topicId, topicName, dataTypeName, qos, hasDcpsKey, topic_callbacks);
648  }
649 
651  const char* topicName,
652  CORBA::String_out dataTypeName,
653  DDS::TopicQos_out qos,
654  GUID_t& topicId)
655  {
656  return endpoint_manager().find_topic(topicName, dataTypeName, qos, topicId);
657  }
658 
660  {
661  return endpoint_manager().remove_topic(topicId);
662  }
663 
664  void ignore_topic(const GUID_t& ignoreId)
665  {
667  endpoint_manager().ignore(ignoreId);
668  }
669 
670  bool update_topic_qos(const GUID_t& topicId, const DDS::TopicQos& qos)
671  {
672  return endpoint_manager().update_topic_qos(topicId, qos);
673  }
674 
676  const GUID_t& topicId,
677  DataWriterCallbacks_rch publication,
678  const DDS::DataWriterQos& qos,
679  const TransportLocatorSeq& transInfo,
680  const DDS::PublisherQos& publisherQos,
681  const XTypes::TypeInformation& type_info)
682  {
683  return endpoint_manager().add_publication(topicId, publication, qos,
684  transInfo, publisherQos, type_info);
685  }
686 
687  void remove_publication(const GUID_t& publicationId)
688  {
689  endpoint_manager().remove_publication(publicationId);
690  }
691 
692  void ignore_publication(const GUID_t& ignoreId)
693  {
695  return endpoint_manager().ignore(ignoreId);
696  }
697 
699  const GUID_t& publicationId,
700  const DDS::DataWriterQos& qos,
701  const DDS::PublisherQos& publisherQos)
702  {
703  return endpoint_manager().update_publication_qos(publicationId, qos, publisherQos);
704  }
705 
706  void update_publication_locators(const GUID_t& publicationId,
707  const TransportLocatorSeq& transInfo)
708  {
709  endpoint_manager().update_publication_locators(publicationId, transInfo);
710  }
711 
713  const GUID_t& topicId,
714  DataReaderCallbacks_rch subscription,
715  const DDS::DataReaderQos& qos,
716  const TransportLocatorSeq& transInfo,
717  const DDS::SubscriberQos& subscriberQos,
718  const char* filterClassName,
719  const char* filterExpr,
720  const DDS::StringSeq& params,
721  const XTypes::TypeInformation& type_info)
722  {
723  return endpoint_manager().add_subscription(topicId, subscription, qos, transInfo,
724  subscriberQos, filterClassName, filterExpr, params, type_info);
725  }
726 
727  void remove_subscription(const GUID_t& subscriptionId)
728  {
729  endpoint_manager().remove_subscription(subscriptionId);
730  }
731 
732  void ignore_subscription(const GUID_t& ignoreId)
733  {
735  return endpoint_manager().ignore(ignoreId);
736  }
737 
739  const GUID_t& subscriptionId,
740  const DDS::DataReaderQos& qos,
741  const DDS::SubscriberQos& subscriberQos)
742  {
743  return endpoint_manager().update_subscription_qos(subscriptionId, qos, subscriberQos);
744  }
745 
746  bool update_subscription_params(const GUID_t& subId, const DDS::StringSeq& params)
747  {
748  return endpoint_manager().update_subscription_params(subId, params);
749  }
750 
751  void update_subscription_locators(const GUID_t& subId, const TransportLocatorSeq& transInfo)
752  {
753  endpoint_manager().update_subscription_locators(subId, transInfo);
754  }
755 
756  DDS::Subscriber_var bit_subscriber() const
757  {
758  return bit_subscriber_;
759  }
760 
761  void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service)
762  {
763  endpoint_manager().type_lookup_service(type_lookup_service);
764  }
765 
766 private:
768 
770  : location_ih_(DDS::HANDLE_NIL)
771  , bit_ih_(DDS::HANDLE_NIL)
772  , seq_reset_count_(0)
773  {
774  }
775 
776  struct LocationUpdate {
782  const ACE_INET_Addr& from,
783  const SystemTimePoint& timestamp)
784  : mask_(mask), from_(from), timestamp_(timestamp) {}
785  };
786  typedef OPENDDS_VECTOR(LocationUpdate) LocationUpdateList;
787  LocationUpdateList location_updates_;
790 
796  ACE_UINT16 seq_reset_count_;
797  };
798 
800  GUID_tKeyLessThan) DiscoveredParticipantMap;
801  typedef DiscoveredParticipantMap::iterator DiscoveredParticipantIter;
802  typedef DiscoveredParticipantMap::const_iterator
804 
805  void remove_discovered_participant(DiscoveredParticipantIter& iter);
806 
807  virtual void remove_discovered_participant_i(DiscoveredParticipantIter&) {}
808 
809 #ifndef DDS_HAS_MINIMUM_BIT
811  {
812  DDS::Subscriber_var bit_sub(bit_subscriber());
813  if (!bit_sub.in())
814  return 0;
815 
816  DDS::DataReader_var d =
817  bit_sub->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
818  return dynamic_cast<ParticipantBuiltinTopicDataDataReaderImpl*>(d.in());
819  }
820 
822  {
823  DDS::Subscriber_var bit_sub(bit_subscriber());
824  if (!bit_sub.in())
825  return 0;
826 
827  DDS::DataReader_var d =
828  bit_sub->lookup_datareader(BUILT_IN_PARTICIPANT_LOCATION_TOPIC);
829  return dynamic_cast<ParticipantLocationBuiltinTopicDataDataReaderImpl*>(d.in());
830  }
831 
833  {
834  DDS::Subscriber_var bit_sub(bit_subscriber());
835  if (!bit_sub.in())
836  return 0;
837 
838  DDS::DataReader_var d =
839  bit_sub->lookup_datareader(BUILT_IN_CONNECTION_RECORD_TOPIC);
840  return dynamic_cast<ConnectionRecordDataReaderImpl*>(d.in());
841  }
842 
844  {
845  DDS::Subscriber_var bit_sub(bit_subscriber());
846  if (!bit_sub.in())
847  return 0;
848 
849  DDS::DataReader_var d =
850  bit_sub->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
851  return dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
852  }
853 #endif /* DDS_HAS_MINIMUM_BIT */
854 
855  StaticEndpointManager& endpoint_manager() { return *endpoint_manager_; }
856 
858  DDS::Subscriber_var bit_subscriber_;
860  DiscoveredParticipantMap participants_;
861 
863 };
864 
866 public:
867  explicit StaticDiscovery(const RepoKey& key);
868 
869  int load_configuration(ACE_Configuration_Heap& config);
870 
871  virtual GUID_t generate_participant_guid();
872 
873  virtual AddDomainStatus add_domain_participant(DDS::DomainId_t domain,
874  const DDS::DomainParticipantQos& qos,
876 
877 #if defined(OPENDDS_SECURITY)
878  virtual AddDomainStatus add_domain_participant_secure(
879  DDS::DomainId_t domain,
880  const DDS::DomainParticipantQos& qos,
882  const GUID_t& guid,
886 #endif
887 
889 
890  static StaticDiscovery_rch instance() { return instance_; }
891 
893 
894  void fini_bit(DCPS::DomainParticipantImpl* participant);
895 
896  bool attach_participant(DDS::DomainId_t domainId, const GUID_t& participantId);
897 
898  bool remove_domain_participant(DDS::DomainId_t domain_id, const GUID_t& participantId);
899 
900  bool ignore_domain_participant(DDS::DomainId_t domain, const GUID_t& myParticipantId,
901  const GUID_t& ignoreId);
902 
903  bool update_domain_participant_qos(DDS::DomainId_t domain, const GUID_t& participant,
904  const DDS::DomainParticipantQos& qos);
905 
906  DCPS::TopicStatus assert_topic(
907  GUID_t& topicId,
908  DDS::DomainId_t domainId,
909  const GUID_t& participantId,
910  const char* topicName,
911  const char* dataTypeName,
912  const DDS::TopicQos& qos,
913  bool hasDcpsKey,
914  DCPS::TopicCallbacks* topic_callbacks);
915 
916  DCPS::TopicStatus find_topic(
917  DDS::DomainId_t domainId,
918  const GUID_t& participantId,
919  const char* topicName,
920  CORBA::String_out dataTypeName,
921  DDS::TopicQos_out qos,
922  GUID_t& topicId);
923 
924  DCPS::TopicStatus remove_topic(
925  DDS::DomainId_t domainId,
926  const GUID_t& participantId,
927  const GUID_t& topicId);
928 
929  bool ignore_topic(DDS::DomainId_t domainId,
930  const GUID_t& myParticipantId, const GUID_t& ignoreId);
931 
932  bool update_topic_qos(const GUID_t& topicId, DDS::DomainId_t domainId,
933  const GUID_t& participantId, const DDS::TopicQos& qos);
934 
935  GUID_t add_publication(
936  DDS::DomainId_t domainId,
937  const GUID_t& participantId,
938  const GUID_t& topicId,
939  DCPS::DataWriterCallbacks_rch publication,
940  const DDS::DataWriterQos& qos,
941  const DCPS::TransportLocatorSeq& transInfo,
942  const DDS::PublisherQos& publisherQos,
943  const XTypes::TypeInformation& type_info);
944 
945  bool remove_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
946  const GUID_t& publicationId);
947 
948  bool ignore_publication(DDS::DomainId_t domainId, const GUID_t& participantId,
949  const GUID_t& ignoreId);
950 
951  bool update_publication_qos(
952  DDS::DomainId_t domainId,
953  const GUID_t& partId,
954  const GUID_t& dwId,
955  const DDS::DataWriterQos& qos,
956  const DDS::PublisherQos& publisherQos);
957 
958  void update_publication_locators(
959  DDS::DomainId_t domainId,
960  const GUID_t& partId,
961  const GUID_t& dwId,
962  const DCPS::TransportLocatorSeq& transInfo);
963 
964  GUID_t add_subscription(
965  DDS::DomainId_t domainId,
966  const GUID_t& participantId,
967  const GUID_t& topicId,
968  DCPS::DataReaderCallbacks_rch subscription,
969  const DDS::DataReaderQos& qos,
970  const DCPS::TransportLocatorSeq& transInfo,
971  const DDS::SubscriberQos& subscriberQos,
972  const char* filterClassName,
973  const char* filterExpr,
974  const DDS::StringSeq& params,
975  const XTypes::TypeInformation& type_info);
976 
977  bool remove_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
978  const GUID_t& subscriptionId);
979 
980  bool ignore_subscription(DDS::DomainId_t domainId, const GUID_t& participantId,
981  const GUID_t& ignoreId);
982 
983  bool update_subscription_qos(
984  DDS::DomainId_t domainId,
985  const GUID_t& partId,
986  const GUID_t& drId,
987  const DDS::DataReaderQos& qos,
988  const DDS::SubscriberQos& subQos);
989 
990  bool update_subscription_params(
991  DDS::DomainId_t domainId,
992  const GUID_t& partId,
993  const GUID_t& subId,
994  const DDS::StringSeq& params);
995 
996  void update_subscription_locators(
997  DDS::DomainId_t domainId,
998  const GUID_t& partId,
999  const GUID_t& subId,
1000  const DCPS::TransportLocatorSeq& transInfo);
1001 
1002 private:
1004  typedef OPENDDS_MAP_CMP(GUID_t, ParticipantHandle, GUID_tKeyLessThan) ParticipantMap;
1005  typedef OPENDDS_MAP(DDS::DomainId_t, ParticipantMap) DomainParticipantMap;
1006 
1007  ParticipantHandle get_part(const DDS::DomainId_t domain_id, const GUID_t& part_id) const;
1008 
1009  void create_bit_dr(DDS::TopicDescription_ptr topic, const char* type,
1010  SubscriberImpl* sub,
1011  const DDS::DataReaderQos& qos);
1012 
1013  int parse_topics(ACE_Configuration_Heap& cf);
1014  int parse_datawriterqos(ACE_Configuration_Heap& cf);
1015  int parse_datareaderqos(ACE_Configuration_Heap& cf);
1016  int parse_publisherqos(ACE_Configuration_Heap& cf);
1017  int parse_subscriberqos(ACE_Configuration_Heap& cf);
1018  int parse_endpoints(ACE_Configuration_Heap& cf);
1019 
1020  void pre_writer(DataWriterImpl* writer);
1021  void pre_reader(DataReaderImpl* reader);
1022 
1023  static StaticDiscovery_rch instance_;
1024 
1026 
1027  DomainParticipantMap participants_;
1028 };
1029 
1030 } // namespace DCPS
1031 } // namespace OpenDDS
1032 
1034 
1035 #endif /* OPENDDS_STATICDISCOVERY_STATICDISCOVERY_H */
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
InternalThreadBuiltinTopicDataDataReaderImpl * internal_thread_bit()
DiscoveredPublicationMap::iterator DiscoveredPublicationIter
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const InstanceHandle_t HANDLE_NIL
std::string String
DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
void update_publication_locators(const GUID_t &publicationId, const TransportLocatorSeq &transInfo)
static DDS::BuiltinTopicKey_t get_key(const DiscoveredPublication &pub)
bool update_domain_participant_qos(const DDS::DomainParticipantQos &qos)
LocalSubscriptionMap::iterator LocalSubscriptionIter
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC
LocalSubscriptionMap local_subscriptions_
void ignore_publication(const GUID_t &ignoreId)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
TopicStatus assert_topic(GUID_t &topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, TopicCallbacks *topic_callbacks)
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
sequence< octet > key
static StaticDiscovery_rch instance_
GuidSet RepoIdSet
Definition: GuidUtils.h:113
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
ParticipantLocationBuiltinTopicData location_data_
ParticipantBuiltinTopicDataDataReaderImpl * part_bit()
ConnectionRecordDataReaderImpl * connection_record_bit()
virtual void remove_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
sequence< TransportLocator > TransportLocatorSeq
Reader(const OPENDDS_STRING &tn, const DDS::DataReaderQos &q, const DDS::SubscriberQos &sq, const OPENDDS_STRING &transport_cfg, const TransportLocatorSeq &ti)
RcHandle< StaticParticipant > ParticipantHandle
ACE_Guard< ACE_Thread_Mutex > lock_
DDS::PublicationBuiltinTopicData ddsPublicationData
LocalPublicationMap::iterator LocalPublicationIter
LocalPublicationMap::const_iterator LocalPublicationCIter
virtual void remove_assoc_i(const GUID_t &, const LocalSubscription &, const GUID_t &)
GUID_t add_publication(const GUID_t &topicId, DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
DDS::SubscriptionBuiltinTopicData ddsSubscriptionData
void remove_publication(const GUID_t &publicationId)
DiscoveredSubscriptionMap discovered_subscriptions_
void remove_subscription(const GUID_t &subscriptionId)
Writer(const OPENDDS_STRING &tn, const DDS::DataWriterQos &q, const DDS::PublisherQos &pq, const OPENDDS_STRING &transport_cfg, const TransportLocatorSeq &ti)
#define OPENDDS_STRING
DDS::Subscriber_var bit_subscriber_
virtual bool send_type_lookup_request(const XTypes::TypeIdentifierSeq &, const GUID_t &, bool, bool)
virtual void remove_discovered_participant_i(DiscoveredParticipantIter &)
virtual void remove_from_bit_i(const DiscoveredPublication &)
DOMAINID_TYPE_NATIVE DomainId_t
DataWriterQosMapType datawriterqos_map
TopicStatus find_topic(const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, GUID_t &topicId)
void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service)
DiscoveredParticipantMap::iterator DiscoveredParticipantIter
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
virtual void add_assoc_i(const GUID_t &, const LocalSubscription &, const GUID_t &, const DiscoveredPublication &)
TopicStatus remove_topic(const GUID_t &topicId)
unsigned long ParticipantLocation
DomainParticipantMap participants_
RcHandle< StaticEndpointManagerSporadic > type_lookup_reply_deadline_processor_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void update_subscription_locators(const GUID_t &subId, const TransportLocatorSeq &transInfo)
DataReaderQosMapType datareaderqos_map
Implements the DDS::DataReader interface.
bool update_subscription_params(const GUID_t &subId, const DDS::StringSeq &params)
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
virtual void remove_from_bit_i(const DiscoveredSubscription &)
SubscriberQosMapType subscriberqos_map
ACE_INLINE OpenDDS_Dcps_Export const MonotonicTime_t & monotonic_time_zero()
typedef OPENDDS_SET_CMP(GUID_t, GUID_tKeyLessThan) GuidSet
ParticipantLocationBuiltinTopicDataDataReaderImpl * part_loc_bit()
StaticEndpointManager & endpoint_manager()
XTypes::TypeLookupService_rch type_lookup_service_
void ignore_domain_participant(const GUID_t &ignoreId)
const char *const BUILT_IN_PARTICIPANT_TOPIC
void init_bit(const DDS::Subscriber_var &bit_subscriber)
void ignore_subscription(const GUID_t &ignoreId)
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC
ACE_UINT32 ULong
bool update_topic_qos(const GUID_t &topicId, const DDS::TopicQos &qos)
The End User API.
RcHandle< StaticEndpointManager > endpoint_manager_
static DDS::BuiltinTopicKey_t get_key(const DiscoveredSubscription &sub)
GUID_t add_subscription(const GUID_t &topicId, DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params, const XTypes::TypeInformation &type_info)
virtual void add_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &, const DiscoveredSubscription &)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void ignore_topic(const GUID_t &ignoreId)
ACE_TEXT("TCP_Factory")
virtual DDS::ReturnCode_t write_publication_data(const GUID_t &, LocalPublication &, const GUID_t &reader=GUID_UNKNOWN)
void remove_from_bit(const DiscoveredPublication &pub)
bool update_publication_qos(const GUID_t &publicationId, const DDS::DataWriterQos &qos, const DDS::PublisherQos &publisherQos)
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::Subscriber_var bit_subscriber() const
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
Sequence number abstraction. Only allows positive 64 bit values.
octet GuidPrefix_t[12]
Definition: DdsDcpsGuid.idl:19
StaticParticipant(GUID_t &guid, const DDS::DomainParticipantQos &qos, const EndpointRegistry &registry)
PmfSporadicTask< StaticEndpointManager > StaticEndpointManagerSporadic
DiscoveredParticipantMap participants_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
Discovery Strategy interface class.
Definition: Discovery.h:76
RcHandle< StaticDiscovery > StaticDiscovery_rch
const ReturnCode_t RETCODE_OK
virtual bool announce_domain_participant_qos()
PublisherQosMapType publisherqos_map
virtual DDS::ReturnCode_t write_subscription_data(const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
const EndpointRegistry & registry_
bool update_subscription_qos(const GUID_t &subscriptionId, const DDS::DataReaderQos &qos, const DDS::SubscriberQos &subscriberQos)
Defines the interface for Discovery callbacks into the Topic.
OPENDDS_STRING RepoKey
Definition: Discovery.h:80
LocalSubscriptionMap::const_iterator LocalSubscriptionCIter
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Defines the interface that allows DataWriters (and lower levels) to inform discovery.
void remove_from_bit(const DiscoveredSubscription &sub)
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
DiscoveredPublicationMap discovered_publications_
DDS::DomainParticipantQos qos_
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
DiscoveredParticipantMap::const_iterator DiscoveredParticipantConstIter
typedef OPENDDS_SET(NetworkAddress) AddrSet
static StaticDiscovery_rch instance()
LocationUpdate(ParticipantLocation mask, const ACE_INET_Addr &from, const SystemTimePoint &timestamp)