OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Protected Types | Protected Member Functions | Static Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::StaticEndpointManager Class Reference

#include <StaticDiscovery.h>

Inheritance diagram for OpenDDS::DCPS::StaticEndpointManager:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::StaticEndpointManager:
Collaboration graph
[legend]

Classes

struct  DiscoveredPublication
 
struct  DiscoveredSubscription
 
struct  LocalEndpoint
 
struct  LocalPublication
 
struct  LocalSubscription
 
struct  TypeIdOrigSeqNumber
 

Public Member Functions

 StaticEndpointManager (const GUID_t &participant_id, ACE_Thread_Mutex &lock, const EndpointRegistry &registry, StaticParticipant &participant)
 
 ~StaticEndpointManager ()
 
void init_bit ()
 
virtual void assign_publication_key (GUID_t &rid, const GUID_t &topicId, const DDS::DataWriterQos &qos)
 
virtual void assign_subscription_key (GUID_t &rid, const GUID_t &topicId, const DDS::DataReaderQos &qos)
 
virtual bool update_topic_qos (const GUID_t &, const DDS::TopicQos &)
 
virtual bool update_publication_qos (const GUID_t &, const DDS::DataWriterQos &, const DDS::PublisherQos &)
 
virtual bool update_subscription_qos (const GUID_t &, const DDS::DataReaderQos &, const DDS::SubscriberQos &)
 
virtual bool update_subscription_params (const GUID_t &, const DDS::StringSeq &)
 
virtual bool disassociate ()
 
virtual DDS::ReturnCode_t add_publication_i (const GUID_t &, LocalPublication &)
 
virtual DDS::ReturnCode_t remove_publication_i (const GUID_t &, LocalPublication &)
 
virtual DDS::ReturnCode_t add_subscription_i (const GUID_t &, LocalSubscription &)
 
virtual DDS::ReturnCode_t remove_subscription_i (const GUID_t &, LocalSubscription &)
 
virtual bool is_expectant_opendds (const GUID_t &endpoint) const
 
virtual bool shutting_down () const
 
virtual void populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredSubscriptionIter &, const GUID_t &)
 
virtual void populate_transport_locator_sequence (TransportLocatorSeq *&, DiscoveredPublicationIter &, const GUID_t &)
 
virtual void reader_exists (const GUID_t &readerid, const GUID_t &writerid)
 
virtual void reader_does_not_exist (const GUID_t &readerid, const GUID_t &writerid)
 
virtual void writer_exists (const GUID_t &writerid, const GUID_t &readerid)
 
virtual void writer_does_not_exist (const GUID_t &writerid, const GUID_t &readerid)
 
void cleanup_type_lookup_data (const GuidPrefix_t &prefix, const XTypes::TypeIdentifier &ti, bool secure)
 
PublicationBuiltinTopicDataDataReaderImplpub_bit ()
 
SubscriptionBuiltinTopicDataDataReaderImplsub_bit ()
 
void type_lookup_init (ReactorInterceptor_rch reactor_interceptor)
 
void type_lookup_fini ()
 
void type_lookup_service (const XTypes::TypeLookupService_rch type_lookup_service)
 
void purge_dead_topic (const String &topic_name)
 
void ignore (const GUID_t &to_ignore)
 
bool ignoring (const GUID_t &guid) const
 
bool ignoring (const char *topic_name) const
 
TopicStatus assert_topic (GUID_t &topicId, const char *topicName, const char *dataTypeName, const DDS::TopicQos &qos, bool hasDcpsKey, TopicCallbacks *topic_callbacks)
 
TopicStatus find_topic (const char *topicName, CORBA::String_out dataTypeName, DDS::TopicQos_out qos, GUID_t &topicId)
 
TopicStatus remove_topic (const GUID_t &topicId)
 
GUID_t add_publication (const GUID_t &topicId, DataWriterCallbacks_rch publication, const DDS::DataWriterQos &qos, const TransportLocatorSeq &transInfo, const DDS::PublisherQos &publisherQos, const XTypes::TypeInformation &type_info)
 
void remove_publication (const GUID_t &publicationId)
 
void update_publication_locators (const GUID_t &publicationId, const TransportLocatorSeq &transInfo)
 
GUID_t add_subscription (const GUID_t &topicId, DataReaderCallbacks_rch subscription, const DDS::DataReaderQos &qos, const TransportLocatorSeq &transInfo, const DDS::SubscriberQos &subscriberQos, const char *filterClassName, const char *filterExpr, const DDS::StringSeq &params, const XTypes::TypeInformation &type_info)
 
void remove_subscription (const GUID_t &subscriptionId)
 
void update_subscription_locators (const GUID_t &subscriptionId, const TransportLocatorSeq &transInfo)
 
void match_endpoints (GUID_t repoId, const TopicDetails &td, bool remove=false)
 
void remove_assoc (const GUID_t &remove_from, const GUID_t &removing)
 
virtual void add_assoc_i (const GUID_t &, const LocalPublication &, const GUID_t &, const DiscoveredSubscription &)
 
virtual void remove_assoc_i (const GUID_t &, const LocalPublication &, const GUID_t &)
 
virtual void add_assoc_i (const GUID_t &, const LocalSubscription &, const GUID_t &, const DiscoveredPublication &)
 
virtual void remove_assoc_i (const GUID_t &, const LocalSubscription &, const GUID_t &)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcEventHandler
 RcEventHandler ()
 
ACE_Event_Handler::Reference_Count add_reference ()
 
ACE_Event_Handler::Reference_Count remove_reference ()
 
- Public Member Functions inherited from ACE_Event_Handler
virtual ~ACE_Event_Handler (void)
 
virtual ACE_HANDLE get_handle (void) const
 
virtual void set_handle (ACE_HANDLE)
 
virtual int priority (void) const
 
virtual void priority (int priority)
 
virtual int handle_input (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_output (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_exception (ACE_HANDLE fd=ACE_INVALID_HANDLE)
 
virtual int handle_timeout (const ACE_Time_Value &current_time, const void *act=0)
 
virtual int handle_exit (ACE_Process *)
 
virtual int handle_close (ACE_HANDLE handle, ACE_Reactor_Mask close_mask)
 
virtual int handle_signal (int signum, siginfo_t *=0, ucontext_t *=0)
 
virtual int resume_handler (void)
 
virtual int handle_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual int handle_group_qos (ACE_HANDLE=ACE_INVALID_HANDLE)
 
virtual void reactor (ACE_Reactor *reactor)
 
virtual ACE_Reactorreactor (void) const
 
virtual ACE_Reactor_Timer_Interfacereactor_timer_interface (void) const
 
Reference_Counting_Policyreference_counting_policy (void)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DiscoveryListener
 DiscoveryListener ()
 
virtual ~DiscoveryListener ()
 

Protected Types

typedef DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
 
typedef DiscoveredPublicationMap::iterator DiscoveredPublicationIter
 
typedef LocalPublicationMap::iterator LocalPublicationIter
 
typedef LocalPublicationMap::const_iterator LocalPublicationCIter
 
typedef LocalSubscriptionMap::iterator LocalSubscriptionIter
 
typedef LocalSubscriptionMap::const_iterator LocalSubscriptionCIter
 
- Protected Types inherited from ACE_Event_Handler
typedef ACE_Atomic_Op< ACE_SYNCH_MUTEX, Reference_CountAtomic_Reference_Count
 

Protected Member Functions

typedef OPENDDS_MAP_CMP (GUID_t, DiscoveredSubscription, GUID_tKeyLessThan) DiscoveredSubscriptionMap
 
typedef OPENDDS_MAP_CMP (GUID_t, DiscoveredPublication, GUID_tKeyLessThan) DiscoveredPublicationMap
 
typedef OPENDDS_MAP_CMP (GUID_t, LocalPublication, GUID_tKeyLessThan) LocalPublicationMap
 
typedef OPENDDS_MAP_CMP (GUID_t, LocalSubscription, GUID_tKeyLessThan) LocalSubscriptionMap
 
typedef OPENDDS_MAP_CMP (GUID_t, String, GUID_tKeyLessThan) TopicNameMap
 
virtual void remove_from_bit_i (const DiscoveredPublication &)
 
virtual void remove_from_bit_i (const DiscoveredSubscription &)
 
virtual DDS::ReturnCode_t write_publication_data (const GUID_t &, LocalPublication &, const GUID_t &reader=GUID_UNKNOWN)
 
virtual DDS::ReturnCode_t write_subscription_data (const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
 
virtual bool send_type_lookup_request (const XTypes::TypeIdentifierSeq &, const GUID_t &, bool, bool)
 
- Protected Member Functions inherited from ACE_Event_Handler
 ACE_Event_Handler (ACE_Reactor *=0, int priority=ACE_Event_Handler::LO_PRIORITY)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Static Protected Member Functions

static DDS::BuiltinTopicKey_t get_key (const DiscoveredPublication &pub)
 
static DDS::BuiltinTopicKey_t get_key (const DiscoveredSubscription &sub)
 

Private Types

typedef PmfSporadicTask< StaticEndpointManagerStaticEndpointManagerSporadic
 

Private Member Functions

void match (const GUID_t &writer, const GUID_t &reader)
 
void need_minimal_and_or_complete_types (const XTypes::TypeInformation *type_info, bool &need_minimal, bool &need_complete) const
 
void remove_expired_endpoints (const MonotonicTimePoint &)
 
void match_continue (const GUID_t &writer, const GUID_t &reader)
 
void remove_from_bit (const DiscoveredPublication &pub)
 
void remove_from_bit (const DiscoveredSubscription &sub)
 
GUID_t make_topic_guid ()
 
bool has_dcps_key (const GUID_t &topicId) const
 
 OPENDDS_SET (OPENDDS_STRING) ignored_topics_
 
 OPENDDS_SET_CMP (GUID_t, GUID_tKeyLessThan) relay_only_readers_
 
typedef OPENDDS_MAP (SequenceNumber, TypeIdOrigSeqNumber) OrigSeqNumberMap
 

Private Attributes

ACE_Thread_Mutexlock_
 
GUID_t participant_id_
 
RepoIdSet ignored_guids_
 
unsigned int topic_counter_
 
LocalPublicationMap local_publications_
 
LocalSubscriptionMap local_subscriptions_
 
DiscoveredPublicationMap discovered_publications_
 
DiscoveredSubscriptionMap discovered_subscriptions_
 
TopicDetailsMap topics_
 
TopicNameMap topic_names_
 
const EndpointRegistryregistry_
 
StaticParticipantparticipant_
 
XTypes::TypeLookupService_rch type_lookup_service_
 
RcHandle< StaticEndpointManagerSporadictype_lookup_reply_deadline_processor_
 
TimeDuration max_type_lookup_service_reply_period_
 
SequenceNumber type_lookup_service_sequence_number_
 
OrigSeqNumberMap orig_seq_numbers_
 

Additional Inherited Members

- Public Types inherited from ACE_Event_Handler
typedef long Reference_Count
 
- Static Public Member Functions inherited from ACE_Event_Handler
static ACE_THR_FUNC_RETURN read_adapter (void *event_handler)
 
static int register_stdin_handler (ACE_Event_Handler *eh, ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr, int flags=THR_DETACHED)
 
static int remove_stdin_handler (ACE_Reactor *reactor, ACE_Thread_Manager *thr_mgr)
 
- Public Attributes inherited from ACE_Event_Handler
 LO_PRIORITY
 
 HI_PRIORITY
 
 NULL_MASK
 
 READ_MASK
 
 WRITE_MASK
 
 EXCEPT_MASK
 
 ACCEPT_MASK
 
 CONNECT_MASK
 
 TIMER_MASK
 
 QOS_MASK
 
 GROUP_QOS_MASK
 
 SIGNAL_MASK
 
 ALL_EVENTS_MASK
 
 RWE_MASK
 
 DONT_CALL
 
 ACE_EVENT_HANDLER_NOT_RESUMED
 
 ACE_REACTOR_RESUMES_HANDLER
 
 ACE_APPLICATION_RESUMES_HANDLER
 
- Protected Attributes inherited from ACE_Event_Handler
Atomic_Reference_Count reference_count_
 

Detailed Description

Definition at line 130 of file StaticDiscovery.h.

Member Typedef Documentation

◆ DiscoveredPublicationIter

typedef DiscoveredPublicationMap::iterator OpenDDS::DCPS::StaticEndpointManager::DiscoveredPublicationIter
protected

Definition at line 210 of file StaticDiscovery.h.

◆ DiscoveredSubscriptionIter

typedef DiscoveredSubscriptionMap::iterator OpenDDS::DCPS::StaticEndpointManager::DiscoveredSubscriptionIter
protected

Definition at line 172 of file StaticDiscovery.h.

◆ LocalPublicationCIter

typedef LocalPublicationMap::const_iterator OpenDDS::DCPS::StaticEndpointManager::LocalPublicationCIter
protected

Definition at line 247 of file StaticDiscovery.h.

◆ LocalPublicationIter

typedef LocalPublicationMap::iterator OpenDDS::DCPS::StaticEndpointManager::LocalPublicationIter
protected

Definition at line 246 of file StaticDiscovery.h.

◆ LocalSubscriptionCIter

typedef LocalSubscriptionMap::const_iterator OpenDDS::DCPS::StaticEndpointManager::LocalSubscriptionCIter
protected

Definition at line 252 of file StaticDiscovery.h.

◆ LocalSubscriptionIter

typedef LocalSubscriptionMap::iterator OpenDDS::DCPS::StaticEndpointManager::LocalSubscriptionIter
protected

Definition at line 251 of file StaticDiscovery.h.

◆ StaticEndpointManagerSporadic

Definition at line 459 of file StaticDiscovery.h.

Constructor & Destructor Documentation

◆ StaticEndpointManager()

OpenDDS::DCPS::StaticEndpointManager::StaticEndpointManager ( const GUID_t participant_id,
ACE_Thread_Mutex lock,
const EndpointRegistry registry,
StaticParticipant participant 
)

Definition at line 77 of file StaticDiscovery.cpp.

References TheServiceParticipant, and type_lookup_init().

81  : lock_(lock)
82  , participant_id_(participant_id)
83  , topic_counter_(0)
84  , registry_(registry)
85 #ifndef DDS_HAS_MINIMUM_BIT
86  , participant_(participant)
87 #endif
90 {
91 #ifdef DDS_HAS_MINIMUM_BIT
92  ACE_UNUSED_ARG(participant);
93 #endif
95 }
void type_lookup_init(ReactorInterceptor_rch reactor_interceptor)
const EndpointRegistry & registry_
#define TheServiceParticipant

◆ ~StaticEndpointManager()

OpenDDS::DCPS::StaticEndpointManager::~StaticEndpointManager ( )

Definition at line 97 of file StaticDiscovery.cpp.

References type_lookup_fini().

Member Function Documentation

◆ add_assoc_i() [1/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::add_assoc_i ( const GUID_t ,
const LocalPublication ,
const GUID_t ,
const DiscoveredSubscription  
)
inlinevirtual

Definition at line 410 of file StaticDiscovery.h.

Referenced by match_continue().

411  {}

◆ add_assoc_i() [2/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::add_assoc_i ( const GUID_t ,
const LocalSubscription ,
const GUID_t ,
const DiscoveredPublication  
)
inlinevirtual

Definition at line 414 of file StaticDiscovery.h.

415  {}

◆ add_publication()

GUID_t OpenDDS::DCPS::StaticEndpointManager::add_publication ( const GUID_t topicId,
DataWriterCallbacks_rch  publication,
const DDS::DataWriterQos qos,
const TransportLocatorSeq transInfo,
const DDS::PublisherQos publisherQos,
const XTypes::TypeInformation type_info 
)

Definition at line 760 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::TopicDetails::add_local_publication(), add_publication_i(), assign_publication_key(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, local_publications_, lock_, match_endpoints(), OPENDDS_STRING, participant_id_, OpenDDS::DCPS::StaticEndpointManager::LocalPublication::publication_, OpenDDS::DCPS::StaticEndpointManager::LocalPublication::publisher_qos_, OpenDDS::DCPS::StaticEndpointManager::LocalPublication::qos_, DDS::RETCODE_OK, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::topic_id_, topic_names_, topics_, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::trans_info_, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::type_info_, and write_publication_data().

767 {
768  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, GUID_t());
769 
770  GUID_t rid = participant_id_;
771  assign_publication_key(rid, topicId, qos);
772  LocalPublication& pb = local_publications_[rid];
773  pb.topic_id_ = topicId;
774  pb.publication_ = publication;
775  pb.qos_ = qos;
776  pb.trans_info_ = transInfo;
777  pb.publisher_qos_ = publisherQos;
778  pb.type_info_ = type_info;
779  const OPENDDS_STRING& topic_name = topic_names_[topicId];
780 
781  TopicDetails& td = topics_[topic_name];
782  td.add_local_publication(rid);
783 
784  if (DDS::RETCODE_OK != add_publication_i(rid, pb)) {
785  return GUID_t();
786  }
787 
788  if (DDS::RETCODE_OK != write_publication_data(rid, pb)) {
789  return GUID_t();
790  }
791 
792  if (DCPS_debug_level > 3) {
793  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_publication - ")
794  ACE_TEXT("calling match_endpoints\n")));
795  }
796  match_endpoints(rid, td);
797 
798  return rid;
799 }
#define ACE_DEBUG(X)
const ReturnCode_t RETCODE_OK
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
virtual DDS::ReturnCode_t add_publication_i(const GUID_t &, LocalPublication &)
#define OPENDDS_STRING
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual DDS::ReturnCode_t write_publication_data(const GUID_t &, LocalPublication &, const GUID_t &reader=GUID_UNKNOWN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
virtual void assign_publication_key(GUID_t &rid, const GUID_t &topicId, const DDS::DataWriterQos &qos)

◆ add_publication_i()

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_publication_i ( const GUID_t writerid,
LocalPublication pub 
)
virtual

Definition at line 311 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointRegistry::Writer::best_effort_readers, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_id_, OpenDDS::DCPS::StaticEndpointManager::LocalPublication::publication_, OpenDDS::DCPS::EndpointRegistry::Reader::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EndpointRegistry::Reader::subscriber_qos, OpenDDS::DCPS::EndpointRegistry::Reader::trans_info, and OpenDDS::DCPS::EndpointRegistry::writer_map.

Referenced by add_publication().

313 {
314  /*
315  Find all matching remote readers.
316  If the reader is best effort, then associate immediately.
317  If the reader is reliable (we are reliable by implication), register with the transport to receive notification that the remote reader is up.
318  */
319  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
320  if (pos == registry_.writer_map.end()) {
321  return DDS::RETCODE_ERROR;
322  }
323  const EndpointRegistry::Writer& writer = pos->second;
324 
325  for (RepoIdSet::const_iterator pos = writer.best_effort_readers.begin(), limit = writer.best_effort_readers.end();
326  pos != limit;
327  ++pos) {
328  const GUID_t& readerid = *pos;
329  const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
330 
331  const ReaderAssociation ra =
332  {reader.trans_info, TransportLocator(), 0, readerid, reader.subscriber_qos, reader.qos, "", "", 0, 0, {0, 0}};
333  DataWriterCallbacks_rch pl = pub.publication_.lock();
334  if (pl) {
335  pl->add_association(writerid, ra, true);
336  }
337  }
338 
339  for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
340  pos != limit;
341  ++pos) {
342  const GUID_t& readerid = *pos;
343  const EndpointRegistry::Reader& reader = registry_.reader_map.find(readerid)->second;
344  DataWriterCallbacks_rch pl = pub.publication_.lock();
345  if (pl) {
346  pl->register_for_reader(participant_id_, writerid, readerid, reader.trans_info, this);
347  }
348  }
349 
350  return DDS::RETCODE_OK;
351 }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_ERROR
const EndpointRegistry & registry_
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch

◆ add_subscription()

GUID_t OpenDDS::DCPS::StaticEndpointManager::add_subscription ( const GUID_t topicId,
DataReaderCallbacks_rch  subscription,
const DDS::DataReaderQos qos,
const TransportLocatorSeq transInfo,
const DDS::SubscriberQos subscriberQos,
const char *  filterClassName,
const char *  filterExpr,
const DDS::StringSeq params,
const XTypes::TypeInformation type_info 
)

Definition at line 839 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::TopicDetails::add_local_subscription(), add_subscription_i(), assign_subscription_key(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::ContentFilterProperty_t::expressionParameters, OpenDDS::DCPS::ContentFilterProperty_t::filterClassName, OpenDDS::DCPS::ContentFilterProperty_t::filterExpression, OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::filterProperties, LM_DEBUG, local_subscriptions_, lock_, match_endpoints(), OPENDDS_STRING, participant_id_, OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::qos_, DDS::RETCODE_OK, OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::subscriber_qos_, OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::subscription_, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::topic_id_, topic_names_, topics_, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::trans_info_, OpenDDS::DCPS::StaticEndpointManager::LocalEndpoint::type_info_, and write_subscription_data().

849 {
850  ACE_GUARD_RETURN(ACE_Thread_Mutex, g, lock_, GUID_t());
851 
852  GUID_t rid = participant_id_;
853  assign_subscription_key(rid, topicId, qos);
854  LocalSubscription& sb = local_subscriptions_[rid];
855  sb.topic_id_ = topicId;
856  sb.subscription_ = subscription;
857  sb.qos_ = qos;
858  sb.trans_info_ = transInfo;
859  sb.subscriber_qos_ = subscriberQos;
860  sb.filterProperties.filterClassName = filterClassName;
861  sb.filterProperties.filterExpression = filterExpr;
862  sb.filterProperties.expressionParameters = params;
863  sb.type_info_ = type_info;
864  const OPENDDS_STRING& topic_name = topic_names_[topicId];
865 
866  TopicDetails& td = topics_[topic_name];
867  td.add_local_subscription(rid);
868 
869  if (DDS::RETCODE_OK != add_subscription_i(rid, sb)) {
870  return GUID_t();
871  }
872 
873  if (DDS::RETCODE_OK != write_subscription_data(rid, sb)) {
874  return GUID_t();
875  }
876 
877  if (DCPS_debug_level > 3) {
878  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::add_subscription - ")
879  ACE_TEXT("calling match_endpoints\n")));
880  }
881  match_endpoints(rid, td);
882 
883  return rid;
884 }
#define ACE_DEBUG(X)
LocalSubscriptionMap local_subscriptions_
const ReturnCode_t RETCODE_OK
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
#define OPENDDS_STRING
virtual DDS::ReturnCode_t write_subscription_data(const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual void assign_subscription_key(GUID_t &rid, const GUID_t &topicId, const DDS::DataReaderQos &qos)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
virtual DDS::ReturnCode_t add_subscription_i(const GUID_t &, LocalSubscription &)

◆ add_subscription_i()

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::add_subscription_i ( const GUID_t readerid,
LocalSubscription sub 
)
virtual

Definition at line 381 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::EndpointRegistry::Reader::best_effort_writers, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_id_, OpenDDS::DCPS::EndpointRegistry::Writer::publisher_qos, OpenDDS::DCPS::EndpointRegistry::Writer::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::subscription_, OpenDDS::DCPS::EndpointRegistry::Writer::trans_info, and OpenDDS::DCPS::EndpointRegistry::writer_map.

Referenced by add_subscription().

383 {
384  /*
385  Find all matching remote writers.
386  If we (the reader) is best effort, then associate immediately.
387  If we (the reader) are reliable, then register with the transport to receive notification that the remote writer is up.
388  */
389  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
390  if (pos == registry_.reader_map.end()) {
391  return DDS::RETCODE_ERROR;
392  }
393  const EndpointRegistry::Reader& reader = pos->second;
394 
395  for (RepoIdSet::const_iterator pos = reader.best_effort_writers.begin(), limit = reader.best_effort_writers.end();
396  pos != limit;
397  ++pos) {
398  const GUID_t& writerid = *pos;
399  const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
400 
401  DDS::OctetSeq type_info;
402  const WriterAssociation wa = {
403  writer.trans_info, TransportLocator(), 0, writerid, writer.publisher_qos, writer.qos, type_info, {0, 0}
404  };
405  DataReaderCallbacks_rch sl = sub.subscription_.lock();
406  if (sl) {
407  sl->add_association(readerid, wa, false);
408  }
409  }
410 
411  for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
412  pos != limit;
413  ++pos) {
414  const GUID_t& writerid = *pos;
415  const EndpointRegistry::Writer& writer = registry_.writer_map.find(writerid)->second;
416  DataReaderCallbacks_rch sl = sub.subscription_.lock();
417  if (sl) {
418  sl->register_for_writer(participant_id_, readerid, writerid, writer.trans_info, this);
419  }
420  }
421 
422  return DDS::RETCODE_OK;
423 }
const ReturnCode_t RETCODE_OK
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
const ReturnCode_t RETCODE_ERROR
const EndpointRegistry & registry_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64

◆ assert_topic()

TopicStatus OpenDDS::DCPS::StaticEndpointManager::assert_topic ( GUID_t topicId,
const char *  topicName,
const char *  dataTypeName,
const DDS::TopicQos qos,
bool  hasDcpsKey,
TopicCallbacks topic_callbacks 
)

Definition at line 698 of file StaticDiscovery.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::CONFLICTING_TYPENAME, OpenDDS::DCPS::CREATED, OpenDDS::DCPS::FOUND, OpenDDS::DCPS::TopicDetails::init(), OpenDDS::DCPS::INTERNAL_ERROR, lock_, make_topic_guid(), OpenDDS::DCPS::TopicDetails::set_local(), topic_names_, and topics_.

702 {
704  TopicDetailsMap::iterator iter = topics_.find(topicName);
705  if (iter != topics_.end()) {
706  if (iter->second.local_is_set() && iter->second.local_data_type_name() != dataTypeName) {
707  return CONFLICTING_TYPENAME;
708  }
709  topicId = iter->second.topic_id();
710  iter->second.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
711  return FOUND;
712  }
713 
714  TopicDetails& td = topics_[topicName];
715  topicId = make_topic_guid();
716  td.init(topicName, topicId);
717  topic_names_[topicId] = topicName;
718  td.set_local(dataTypeName, qos, hasDcpsKey, topic_callbacks);
719 
720  return CREATED;
721 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ assign_publication_key()

void OpenDDS::DCPS::StaticEndpointManager::assign_publication_key ( GUID_t rid,
const GUID_t topicId,
const DDS::DataWriterQos qos 
)
virtual

Definition at line 197 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_WRITER_WITH_KEY, LM_ERROR, LM_INFO, LM_WARNING, registry_, DDS::DataWriterQos::user_data, DDS::UserDataQosPolicy::value, and OpenDDS::DCPS::EndpointRegistry::writer_map.

Referenced by add_publication().

200 {
201  if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
202  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: no user data to identify writer\n")));
203  return;
204  }
205 
206  rid.entityId.entityKey[0] = qos.user_data.value[0];
207  rid.entityId.entityKey[1] = qos.user_data.value[1];
208  rid.entityId.entityKey[2] = qos.user_data.value[2];
209  rid.entityId.entityKind = ENTITYKIND_USER_WRITER_WITH_KEY;
210 
211  if (DCPS_debug_level > 8) {
212  ACE_DEBUG((LM_INFO, "(%P|%t) looking up writer ID %C\n",
213  LogGuid(rid).c_str()));
214  }
215 
216  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(rid);
217  if (pos == registry_.writer_map.end()) {
218  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_publication_key: unknown writer: %C\n"), LogGuid(rid).c_str()));
219  return;
220  }
221 
222  DDS::DataWriterQos qos2(qos);
223  // Qos in registry will not have the user data so overwrite.
224  qos2.user_data = pos->second.qos.user_data;
225 
226  DDS::DataWriterQos qos3(pos->second.qos);
227 
228  if (qos2 != qos3) {
229  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_publication_key: dynamic and static QoS differ\n")));
230  }
231 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
UserDataQosPolicy user_data
const EndpointRegistry & registry_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const octet ENTITYKIND_USER_WRITER_WITH_KEY
Definition: DdsDcpsGuid.idl:40

◆ assign_subscription_key()

void OpenDDS::DCPS::StaticEndpointManager::assign_subscription_key ( GUID_t rid,
const GUID_t topicId,
const DDS::DataReaderQos qos 
)
virtual

Definition at line 233 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_USER_READER_WITH_KEY, LM_ERROR, LM_WARNING, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, DDS::DataReaderQos::user_data, and DDS::UserDataQosPolicy::value.

Referenced by add_subscription().

236 {
237  if (qos.user_data.value.length() != BYTES_IN_ENTITY) {
238  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: no user data to identify reader\n")));
239  return;
240  }
241 
242  rid.entityId.entityKey[0] = qos.user_data.value[0];
243  rid.entityId.entityKey[1] = qos.user_data.value[1];
244  rid.entityId.entityKey[2] = qos.user_data.value[2];
245  rid.entityId.entityKind = ENTITYKIND_USER_READER_WITH_KEY;
246 
247  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(rid);
248  if (pos == registry_.reader_map.end()) {
249  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: StaticEndpointManager::assign_subscription_key: unknown reader: %C\n"), LogGuid(rid).c_str()));
250  return;
251  }
252 
253  DDS::DataReaderQos qos2(qos);
254  // Qos in registry will not have the user data so overwrite.
255  qos2.user_data = pos->second.qos.user_data;
256 
257  DDS::DataReaderQos qos3(pos->second.qos);
258 
259  if (qos2 != qos3) {
260  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::assign_subscription_key: dynamic and static QoS differ\n")));
261  }
262 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const octet ENTITYKIND_USER_READER_WITH_KEY
Definition: DdsDcpsGuid.idl:43
UserDataQosPolicy user_data
const EndpointRegistry & registry_
ACE_TEXT("TCP_Factory")

◆ cleanup_type_lookup_data()

void OpenDDS::DCPS::StaticEndpointManager::cleanup_type_lookup_data ( const GuidPrefix_t prefix,
const XTypes::TypeIdentifier ti,
bool  secure 
)

Definition at line 557 of file StaticDiscovery.cpp.

Referenced by remove_expired_endpoints().

560 {
561  // Do nothing.
562 }

◆ disassociate()

bool OpenDDS::DCPS::StaticEndpointManager::disassociate ( )
virtual

Definition at line 303 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_TEXT(), and LM_NOTICE.

304 {
305  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::disassociate TODO\n")));
306  // TODO
307  return false;
308 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

◆ find_topic()

TopicStatus OpenDDS::DCPS::StaticEndpointManager::find_topic ( const char *  topicName,
CORBA::String_out  dataTypeName,
DDS::TopicQos_out  qos,
GUID_t topicId 
)

Definition at line 723 of file StaticDiscovery.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::FOUND, OpenDDS::DCPS::INTERNAL_ERROR, OpenDDS::DCPS::TopicDetails::local_data_type_name(), OpenDDS::DCPS::TopicDetails::local_qos(), lock_, OpenDDS::DCPS::NOT_FOUND, OpenDDS::DCPS::TopicDetails::topic_id(), Update::TopicQos, and topics_.

728 {
730  TopicDetailsMap::const_iterator iter = topics_.find(topicName);
731  if (iter == topics_.end()) {
732  return NOT_FOUND;
733  }
734 
735  const TopicDetails& td = iter->second;
736 
737  dataTypeName = td.local_data_type_name().c_str();
738  qos = new DDS::TopicQos(td.local_qos());
739  topicId = td.topic_id();
740  return FOUND;
741 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ get_key() [1/2]

static DDS::BuiltinTopicKey_t OpenDDS::DCPS::StaticEndpointManager::get_key ( const DiscoveredPublication pub)
inlinestaticprotected

Definition at line 256 of file StaticDiscovery.h.

References OpenDDS::DCPS::DiscoveredWriterData::ddsPublicationData, and OpenDDS::DCPS::StaticEndpointManager::DiscoveredPublication::writer_data_.

257  {
258  return pub.writer_data_.ddsPublicationData.key;
259  }

◆ get_key() [2/2]

static DDS::BuiltinTopicKey_t OpenDDS::DCPS::StaticEndpointManager::get_key ( const DiscoveredSubscription sub)
inlinestaticprotected

Definition at line 260 of file StaticDiscovery.h.

References OpenDDS::DCPS::DiscoveredReaderData::ddsSubscriptionData, and OpenDDS::DCPS::StaticEndpointManager::DiscoveredSubscription::reader_data_.

261  {
262  return sub.reader_data_.ddsSubscriptionData.key;
263  }

◆ has_dcps_key()

bool OpenDDS::DCPS::StaticEndpointManager::has_dcps_key ( const GUID_t topicId) const
private

Definition at line 1487 of file StaticDiscovery.cpp.

References OPENDDS_MAP_CMP(), OPENDDS_STRING, topic_names_, and topics_.

1488 {
1489  typedef OPENDDS_MAP_CMP(GUID_t, OPENDDS_STRING, GUID_tKeyLessThan) TNMap;
1490  TNMap::const_iterator tn = topic_names_.find(topicId);
1491  if (tn == topic_names_.end()) return false;
1492 
1493  TopicDetailsMap::const_iterator td = topics_.find(tn->second);
1494  if (td == topics_.end()) return false;
1495 
1496  return td->second.has_dcps_key();
1497 }
#define OPENDDS_STRING
typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredSubscription, GUID_tKeyLessThan) DiscoveredSubscriptionMap

◆ ignore()

void OpenDDS::DCPS::StaticEndpointManager::ignore ( const GUID_t to_ignore)

Definition at line 618 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::TopicDetails::discovered_publications(), discovered_publications_, OpenDDS::DCPS::TopicDetails::discovered_subscriptions(), discovered_subscriptions_, ignored_guids_, OpenDDS::DCPS::TopicDetails::is_dead(), match_endpoints(), OPENDDS_MAP_CMP(), OPENDDS_STRING, purge_dead_topic(), OpenDDS::DCPS::TopicDetails::remove_discovered_publication(), OpenDDS::DCPS::TopicDetails::remove_discovered_subscription(), remove_from_bit(), shutting_down(), topic_names_, and topics_.

619 {
620  // Locked prior to call from Spdp.
621  ignored_guids_.insert(to_ignore);
622  {
623  const DiscoveredPublicationIter iter = discovered_publications_.find(to_ignore);
624  if (iter != discovered_publications_.end()) {
625  // clean up tracking info
626  const String topic_name = iter->second.get_topic_name();
627  TopicDetails& td = topics_[topic_name];
628  td.remove_discovered_publication(to_ignore);
629  remove_from_bit(iter->second);
630  discovered_publications_.erase(iter);
631  // break associations
632  match_endpoints(to_ignore, td, true /*remove*/);
633  if (td.is_dead()) {
634  purge_dead_topic(topic_name);
635  }
636  return;
637  }
638  }
639  {
640  const DiscoveredSubscriptionIter iter =
641  discovered_subscriptions_.find(to_ignore);
642  if (iter != discovered_subscriptions_.end()) {
643  // clean up tracking info
644  const String topic_name = iter->second.get_topic_name();
645  TopicDetails& td = topics_[topic_name];
646  td.remove_discovered_publication(to_ignore);
647  remove_from_bit(iter->second);
648  discovered_subscriptions_.erase(iter);
649  // break associations
650  match_endpoints(to_ignore, td, true /*remove*/);
651  if (td.is_dead()) {
652  purge_dead_topic(topic_name);
653  }
654  return;
655  }
656  }
657  {
658  const OPENDDS_MAP_CMP(GUID_t, OPENDDS_STRING, GUID_tKeyLessThan)::iterator
659  iter = topic_names_.find(to_ignore);
660  if (iter != topic_names_.end()) {
661  ignored_topics_.insert(iter->second);
662  // Remove all publications and subscriptions on this topic
663  TopicDetails& td = topics_[iter->second];
664  {
665  const RepoIdSet ids = td.discovered_publications();
666  for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
667  match_endpoints(*ep, td, true /*remove*/);
668  td.remove_discovered_publication(*ep);
669  // TODO: Do we need to remove from discovered_subscriptions?
670  if (shutting_down()) { return; }
671  }
672  }
673  {
674  const RepoIdSet ids = td.discovered_subscriptions();
675  for (RepoIdSet::const_iterator ep = ids.begin(); ep!= ids.end(); ++ep) {
676  match_endpoints(*ep, td, true /*remove*/);
677  td.remove_discovered_subscription(*ep);
678  // TODO: Do we need to remove from discovered_publications?
679  if (shutting_down()) { return; }
680  }
681  }
682  if (td.is_dead()) {
683  purge_dead_topic(iter->second);
684  }
685  }
686  }
687 }
void remove_from_bit(const DiscoveredPublication &pub)
DiscoveredSubscriptionMap discovered_subscriptions_
GuidSet RepoIdSet
Definition: GuidUtils.h:113
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
void purge_dead_topic(const String &topic_name)
#define OPENDDS_STRING
DiscoveredPublicationMap::iterator DiscoveredPublicationIter
typedef OPENDDS_MAP_CMP(GUID_t, DiscoveredSubscription, GUID_tKeyLessThan) DiscoveredSubscriptionMap
DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
int find(Container &c, const Key &key, typename Container::mapped_type *&value)
Definition: Util.h:71
DiscoveredPublicationMap discovered_publications_
std::string String

◆ ignoring() [1/2]

bool OpenDDS::DCPS::StaticEndpointManager::ignoring ( const GUID_t guid) const

Definition at line 689 of file StaticDiscovery.cpp.

References ignored_guids_.

690 {
691  return ignored_guids_.count(guid);
692 }

◆ ignoring() [2/2]

bool OpenDDS::DCPS::StaticEndpointManager::ignoring ( const char *  topic_name) const

Definition at line 693 of file StaticDiscovery.cpp.

694 {
695  return ignored_topics_.count(topic_name);
696 }

◆ init_bit()

void OpenDDS::DCPS::StaticEndpointManager::init_bit ( )

Definition at line 102 of file StaticDiscovery.cpp.

References DDS::DataWriterQos::deadline, DDS::DataReaderQos::deadline, DDS::DataWriterQos::destination_order, DDS::DataReaderQos::destination_order, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, DDS::DataWriterQos::durability_service, OpenDDS::DCPS::equal_guid_prefixes(), DDS::PublisherQos::group_data, DDS::SubscriberQos::group_data, OpenDDS::DCPS::guid_to_bit_key(), DDS::DataWriterQos::latency_budget, DDS::DataReaderQos::latency_budget, DDS::DataWriterQos::lifespan, DDS::DataWriterQos::liveliness, DDS::DataReaderQos::liveliness, DDS::NEW_VIEW_STATE, OPENDDS_STRING, DDS::DataWriterQos::ownership, DDS::DataReaderQos::ownership, DDS::DataWriterQos::ownership_strength, participant_id_, DDS::PublisherQos::partition, DDS::SubscriberQos::partition, DDS::PublisherQos::presentation, DDS::SubscriberQos::presentation, pub_bit(), OpenDDS::DCPS::EndpointRegistry::Writer::publisher_qos, OpenDDS::DCPS::EndpointRegistry::Reader::qos, OpenDDS::DCPS::EndpointRegistry::Writer::qos, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, DDS::DataWriterQos::representation, DDS::DataReaderQos::representation, OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data(), sub_bit(), OpenDDS::DCPS::EndpointRegistry::Reader::subscriber_qos, DDS::DataReaderQos::time_based_filter, OpenDDS::DCPS::EndpointRegistry::topic_map, OpenDDS::DCPS::EndpointRegistry::Reader::topic_name, OpenDDS::DCPS::EndpointRegistry::Writer::topic_name, OpenDDS::DCPS::EndpointRegistry::Topic::type_name, DDS::DataWriterQos::user_data, DDS::DataReaderQos::user_data, and OpenDDS::DCPS::EndpointRegistry::writer_map.

103 {
104  // Discover all remote publications and subscriptions.
105 
106  for (EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.begin(),
107  limit = registry_.writer_map.end();
108  pos != limit;
109  ++pos) {
110  const GUID_t& remoteid = pos->first;
111  const EndpointRegistry::Writer& writer = pos->second;
112 
113  if (!equal_guid_prefixes(participant_id_, remoteid)) {
114  const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
115 
116  // pos represents a remote.
117  // Populate data.
119 
120  data.key = key;
121  OPENDDS_STRING topic_name = writer.topic_name;
122  data.topic_name = topic_name.c_str();
123  const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
124  data.type_name = topic.type_name.c_str();
125  data.durability = writer.qos.durability;
126  data.durability_service = writer.qos.durability_service;
127  data.deadline = writer.qos.deadline;
128  data.latency_budget = writer.qos.latency_budget;
129  data.liveliness = writer.qos.liveliness;
130  data.reliability = writer.qos.reliability;
131  data.lifespan = writer.qos.lifespan;
132  data.user_data = writer.qos.user_data;
133  data.ownership = writer.qos.ownership;
134  data.ownership_strength = writer.qos.ownership_strength;
135  data.destination_order = writer.qos.destination_order;
136  data.presentation = writer.publisher_qos.presentation;
137  data.partition = writer.publisher_qos.partition;
138  // If the TopicQos becomes available, this can be populated.
139  //data.topic_data = topic_details.qos_.topic_data;
140  data.group_data = writer.publisher_qos.group_data;
141  data.representation = writer.qos.representation;
142 
143 #ifndef DDS_HAS_MINIMUM_BIT
145  if (bit) { // bit may be null if the DomainParticipant is shutting down
147  }
148 #endif /* DDS_HAS_MINIMUM_BIT */
149  }
150  }
151 
152  for (EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.begin(),
153  limit = registry_.reader_map.end();
154  pos != limit;
155  ++pos) {
156  const GUID_t& remoteid = pos->first;
157  const EndpointRegistry::Reader& reader = pos->second;
158 
159  if (!equal_guid_prefixes(participant_id_, remoteid)) {
160  const DDS::BuiltinTopicKey_t key = guid_to_bit_key(remoteid);
161 
162  // pos represents a remote.
163  // Populate data.
165 
166  data.key = key;
167  OPENDDS_STRING topic_name = reader.topic_name;
168  data.topic_name = topic_name.c_str();
169  const EndpointRegistry::Topic& topic = registry_.topic_map.find(topic_name)->second;
170  data.type_name = topic.type_name.c_str();
171  data.durability = reader.qos.durability;
172  data.deadline = reader.qos.deadline;
173  data.latency_budget = reader.qos.latency_budget;
174  data.liveliness = reader.qos.liveliness;
175  data.reliability = reader.qos.reliability;
176  data.ownership = reader.qos.ownership;
177  data.destination_order = reader.qos.destination_order;
178  data.user_data = reader.qos.user_data;
179  data.time_based_filter = reader.qos.time_based_filter;
180  data.presentation = reader.subscriber_qos.presentation;
181  data.partition = reader.subscriber_qos.partition;
182  // // If the TopicQos becomes available, this can be populated.
183  //data.topic_data = topic_details.qos_.topic_data;
184  data.group_data = reader.subscriber_qos.group_data;
185  data.representation = reader.qos.representation;
186 
187 #ifndef DDS_HAS_MINIMUM_BIT
189  if (bit) { // bit may be null if the DomainParticipant is shutting down
191  }
192 #endif /* DDS_HAS_MINIMUM_BIT */
193  }
194  }
195 }
sequence< octet > key
const ViewStateKind NEW_VIEW_STATE
DDS::InstanceHandle_t store_synthetic_data(const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
PublicationBuiltinTopicDataDataReaderImpl * pub_bit()
#define OPENDDS_STRING
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
const EndpointRegistry & registry_
OpenDDS_Dcps_Export DDS::BuiltinTopicKey_t guid_to_bit_key(const GUID_t &guid)
Definition: GuidUtils.h:243
SubscriptionBuiltinTopicDataDataReaderImpl * sub_bit()

◆ is_expectant_opendds()

bool OpenDDS::DCPS::StaticEndpointManager::is_expectant_opendds ( const GUID_t endpoint) const
virtual

Definition at line 453 of file StaticDiscovery.cpp.

454 {
455  // We can't propagate associated writers via SEDP announcments if we're
456  // using static discovery, so nobody ought to be "expecting" them
457  return false;
458 }

◆ make_topic_guid()

GUID_t OpenDDS::DCPS::StaticEndpointManager::make_topic_guid ( )
private

Definition at line 1469 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::assign(), OpenDDS::DCPS::EntityId_t::entityKey, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_OPENDDS_TOPIC, LM_ERROR, OpenDDS::DCPS::make_id(), participant_id_, and topic_counter_.

Referenced by assert_topic().

1470 {
1471  EntityId_t entity_id;
1472  assign(entity_id.entityKey, topic_counter_);
1473  ++topic_counter_;
1474  entity_id.entityKind = ENTITYKIND_OPENDDS_TOPIC;
1475 
1476  if (topic_counter_ == 0x1000000) {
1477  ACE_ERROR((LM_ERROR,
1478  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::make_topic_guid: ")
1479  ACE_TEXT("Exceeded Maximum number of topic entity keys!")
1480  ACE_TEXT("Next key will be a duplicate!\n")));
1481  topic_counter_ = 0;
1482  }
1483 
1484  return make_id(participant_id_, entity_id);
1485 }
#define ACE_ERROR(X)
const octet ENTITYKIND_OPENDDS_TOPIC
Definition: DdsDcpsGuid.idl:49
void assign(EntityId_t &dest, const EntityId_t &src)
Definition: GuidUtils.h:157
ACE_TEXT("TCP_Factory")
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ match()

void OpenDDS::DCPS::StaticEndpointManager::match ( const GUID_t writer,
const GUID_t reader 
)
private

Definition at line 1028 of file StaticDiscovery.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and match_continue().

Referenced by match_endpoints().

1029 {
1030  if (DCPS_debug_level >= 4) {
1031  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match: w: %C r: %C\n",
1032  LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1033  }
1034 
1035  match_continue(writer, reader);
1036 }
#define ACE_DEBUG(X)
void match_continue(const GUID_t &writer, const GUID_t &reader)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ match_continue()

void OpenDDS::DCPS::StaticEndpointManager::match_continue ( const GUID_t writer,
const GUID_t reader 
)
private

Definition at line 1060 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), ACE_Task_Base::activate(), add_assoc_i(), DDS::ALLOW_TYPE_COERCION, OpenDDS::XTypes::TypeAssignability::assignable(), OpenDDS::DCPS::compatibleQOS(), OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, DDS::DataReaderQos::deadline, DDS::DataWriterQos::destination_order, DDS::DataReaderQos::destination_order, discovered_publications_, discovered_subscriptions_, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, DDS::DataWriterQos::durability_service, DDS::PublisherQos::entity_factory, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::ContentFilterProperty_t::expressionParameters, OpenDDS::DCPS::ContentFilterProperty_t::filterClassName, OpenDDS::DCPS::ContentFilterProperty_t::filterExpression, DDS::TypeConsistencyEnforcementQosPolicy::force_type_validation, DDS::PublisherQos::group_data, DDS::SubscriberQos::group_data, DDS::DataWriterQos::history, DDS::DataReaderQos::history, OpenDDS::XTypes::TypeConsistencyAttributes::ignore_member_names, DDS::TypeConsistencyEnforcementQosPolicy::ignore_member_names, OpenDDS::XTypes::TypeConsistencyAttributes::ignore_sequence_bounds, DDS::TypeConsistencyEnforcementQosPolicy::ignore_sequence_bounds, OpenDDS::XTypes::TypeConsistencyAttributes::ignore_string_bounds, DDS::TypeConsistencyEnforcementQosPolicy::ignore_string_bounds, OpenDDS::XTypes::IS_APPENDABLE, DDS::TypeConsistencyEnforcementQosPolicy::kind, OpenDDS::XTypes::TypeIdentifier::kind(), OpenDDS::DCPS::Encoding::KIND_XCDR1, DDS::DataWriterQos::latency_budget, DDS::DataReaderQos::latency_budget, DDS::DataWriterQos::lifespan, DDS::DataWriterQos::liveliness, DDS::DataReaderQos::liveliness, LM_DEBUG, LM_ERROR, LM_WARNING, local_publications_, local_subscriptions_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), lock_, OpenDDS::XTypes::TypeInformation::minimal, OPENDDS_ASSERT, OPENDDS_STRING, DDS::DataWriterQos::ownership, DDS::DataReaderQos::ownership, DDS::DataWriterQos::ownership_strength, DDS::PublisherQos::partition, DDS::SubscriberQos::partition, populate_transport_locator_sequence(), DDS::PublisherQos::presentation, DDS::SubscriberQos::presentation, OpenDDS::XTypes::TypeConsistencyAttributes::prevent_type_widening, DDS::TypeConsistencyEnforcementQosPolicy::prevent_type_widening, DDS::DataReaderQos::reader_data_lifecycle, DDS::DataWriterQos::reliability, DDS::DataReaderQos::reliability, remove_assoc_i(), OpenDDS::DCPS::repr_to_encoding_kind(), DDS::DataWriterQos::representation, DDS::DataReaderQos::representation, DDS::DataWriterQos::resource_limits, DDS::DataReaderQos::resource_limits, OpenDDS::XTypes::serialize_type_info(), TheServiceParticipant, DDS::DataReaderQos::time_based_filter, OpenDDS::XTypes::TK_NONE, topic_names_, topics_, DDS::DataWriterQos::transport_priority, DDS::DataReaderQos::type_consistency, OpenDDS::XTypes::TypeIdentifierWithSize::type_id, type_lookup_service_, OpenDDS::XTypes::TypeIdentifierWithDependencies::typeid_with_size, DDS::DataWriterQos::user_data, DDS::DataReaderQos::user_data, DDS::DataRepresentationQosPolicy::value, DDS::DataWriterQos::writer_data_lifecycle, and OpenDDS::DCPS::DcpsUpcalls::writer_done().

Referenced by match().

1061 {
1062  if (DCPS_debug_level >= 4) {
1063  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_continue: w: %C r: %C\n",
1064  LogGuid(writer).c_str(), LogGuid(reader).c_str()));
1065  }
1066 
1067  // 0. For discovered endpoints, we'll have the QoS info in the form of the
1068  // publication or subscription BIT data which doesn't use the same structures
1069  // for QoS. In those cases we can copy the individual QoS policies to temp
1070  // QoS structs:
1071  DDS::DataWriterQos tempDwQos;
1072  DDS::PublisherQos tempPubQos;
1073  DDS::DataReaderQos tempDrQos;
1074  DDS::SubscriberQos tempSubQos;
1075  ContentFilterProperty_t tempCfp;
1076 
1079  if (dpi != discovered_publications_.end() && dsi != discovered_subscriptions_.end()) {
1080  // This is a discovered/discovered match, nothing for us to do
1081  return;
1082  }
1083 
1084  // 1. Collect details about the writer, which may be local or discovered
1085  const DDS::DataWriterQos* dwQos = 0;
1086  const DDS::PublisherQos* pubQos = 0;
1087  TransportLocatorSeq* wTls = 0;
1088  ACE_CDR::ULong wTransportContext = 0;
1089  XTypes::TypeInformation* writer_type_info = 0;
1090  OPENDDS_STRING topic_name;
1091  MonotonicTime_t writer_participant_discovered_at;
1092 
1093  const LocalPublicationIter lpi = local_publications_.find(writer);
1094  bool writer_local = false, already_matched = false;
1095  if (lpi != local_publications_.end()) {
1096  writer_local = true;
1097  dwQos = &lpi->second.qos_;
1098  pubQos = &lpi->second.publisher_qos_;
1099  wTls = &lpi->second.trans_info_;
1100  wTransportContext = lpi->second.transport_context_;
1101  already_matched = lpi->second.matched_endpoints_.count(reader);
1102  writer_type_info = &lpi->second.type_info_;
1103  topic_name = topic_names_[lpi->second.topic_id_];
1104  writer_participant_discovered_at = lpi->second.participant_discovered_at_;
1105  } else if (dpi != discovered_publications_.end()) {
1106  wTls = &dpi->second.writer_data_.writerProxy.allLocators;
1107  wTransportContext = dpi->second.transport_context_;
1108  writer_type_info = &dpi->second.type_info_;
1109  topic_name = dpi->second.get_topic_name();
1110  writer_participant_discovered_at = dpi->second.participant_discovered_at_;
1111 
1113  dpi->second.writer_data_.ddsPublicationData;
1114  tempDwQos.durability = bit.durability;
1115  tempDwQos.durability_service = bit.durability_service;
1116  tempDwQos.deadline = bit.deadline;
1117  tempDwQos.latency_budget = bit.latency_budget;
1118  tempDwQos.liveliness = bit.liveliness;
1119  tempDwQos.reliability = bit.reliability;
1120  tempDwQos.destination_order = bit.destination_order;
1121  tempDwQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1122  tempDwQos.resource_limits =
1123  TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1124  tempDwQos.transport_priority =
1125  TheServiceParticipant->initial_TransportPriorityQosPolicy();
1126  tempDwQos.lifespan = bit.lifespan;
1127  tempDwQos.user_data = bit.user_data;
1128  tempDwQos.ownership = bit.ownership;
1129  tempDwQos.ownership_strength = bit.ownership_strength;
1130  tempDwQos.writer_data_lifecycle =
1131  TheServiceParticipant->initial_WriterDataLifecycleQosPolicy();
1132  tempDwQos.representation = bit.representation;
1133  dwQos = &tempDwQos;
1134 
1135  tempPubQos.presentation = bit.presentation;
1136  tempPubQos.partition = bit.partition;
1137  tempPubQos.group_data = bit.group_data;
1138  tempPubQos.entity_factory =
1139  TheServiceParticipant->initial_EntityFactoryQosPolicy();
1140  pubQos = &tempPubQos;
1141 
1142  populate_transport_locator_sequence(wTls, dpi, writer);
1143  } else {
1144  return; // Possible and ok, since lock is released
1145  }
1146 
1147  // 2. Collect details about the reader, which may be local or discovered
1148  const DDS::DataReaderQos* drQos = 0;
1149  const DDS::SubscriberQos* subQos = 0;
1150  TransportLocatorSeq* rTls = 0;
1151  ACE_CDR::ULong rTransportContext = 0;
1152  const ContentFilterProperty_t* cfProp = 0;
1153  XTypes::TypeInformation* reader_type_info = 0;
1154  MonotonicTime_t reader_participant_discovered_at;
1155 
1156  const LocalSubscriptionIter lsi = local_subscriptions_.find(reader);
1157  bool reader_local = false;
1158  if (lsi != local_subscriptions_.end()) {
1159  reader_local = true;
1160  drQos = &lsi->second.qos_;
1161  subQos = &lsi->second.subscriber_qos_;
1162  rTls = &lsi->second.trans_info_;
1163  rTransportContext = lsi->second.transport_context_;
1164  reader_type_info = &lsi->second.type_info_;
1165  if (lsi->second.filterProperties.filterExpression[0] != 0) {
1166  tempCfp.filterExpression = lsi->second.filterProperties.filterExpression;
1167  tempCfp.expressionParameters = lsi->second.filterProperties.expressionParameters;
1168  }
1169  cfProp = &tempCfp;
1170  if (!already_matched) {
1171  already_matched = lsi->second.matched_endpoints_.count(writer);
1172  }
1173  reader_participant_discovered_at = lsi->second.participant_discovered_at_;
1174  } else if (dsi != discovered_subscriptions_.end()) {
1175  rTls = &dsi->second.reader_data_.readerProxy.allLocators;
1176 
1177  populate_transport_locator_sequence(rTls, dsi, reader);
1178  rTransportContext = dsi->second.transport_context_;
1179 
1181  dsi->second.reader_data_.ddsSubscriptionData;
1182  tempDrQos.durability = bit.durability;
1183  tempDrQos.deadline = bit.deadline;
1184  tempDrQos.latency_budget = bit.latency_budget;
1185  tempDrQos.liveliness = bit.liveliness;
1186  tempDrQos.reliability = bit.reliability;
1187  tempDrQos.destination_order = bit.destination_order;
1188  tempDrQos.history = TheServiceParticipant->initial_HistoryQosPolicy();
1189  tempDrQos.resource_limits =
1190  TheServiceParticipant->initial_ResourceLimitsQosPolicy();
1191  tempDrQos.user_data = bit.user_data;
1192  tempDrQos.ownership = bit.ownership;
1193  tempDrQos.time_based_filter = bit.time_based_filter;
1194  tempDrQos.reader_data_lifecycle =
1195  TheServiceParticipant->initial_ReaderDataLifecycleQosPolicy();
1196  tempDrQos.representation = bit.representation;
1197  tempDrQos.type_consistency = bit.type_consistency;
1198  drQos = &tempDrQos;
1199 
1200  tempSubQos.presentation = bit.presentation;
1201  tempSubQos.partition = bit.partition;
1202  tempSubQos.group_data = bit.group_data;
1203  tempSubQos.entity_factory =
1204  TheServiceParticipant->initial_EntityFactoryQosPolicy();
1205  subQos = &tempSubQos;
1206 
1207  cfProp = &dsi->second.reader_data_.contentFilterProperty;
1208  reader_type_info = &dsi->second.type_info_;
1209  reader_participant_discovered_at = dsi->second.participant_discovered_at_;
1210  } else {
1211  return; // Possible and ok, since lock is released
1212  }
1213 
1214  // 3. Perform type consistency check (XTypes 1.3, Section 7.6.3.4.2)
1215  bool consistent = false;
1216 
1217  TopicDetailsMap::iterator td_iter = topics_.find(topic_name);
1218  if (td_iter == topics_.end()) {
1219  ACE_ERROR((LM_ERROR,
1220  ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ERROR ")
1221  ACE_TEXT("Didn't find topic for consistency check\n")));
1222  return;
1223  } else {
1224  const XTypes::TypeIdentifier& writer_type_id = writer_type_info->minimal.typeid_with_size.type_id;
1225  const XTypes::TypeIdentifier& reader_type_id = reader_type_info->minimal.typeid_with_size.type_id;
1226  if (writer_type_id.kind() != XTypes::TK_NONE && reader_type_id.kind() != XTypes::TK_NONE) {
1227  if (!writer_local || !reader_local) {
1228  Encoding::Kind encoding_kind;
1229  if (tempDwQos.representation.value.length() > 0 &&
1230  repr_to_encoding_kind(tempDwQos.representation.value[0], encoding_kind) &&
1231  encoding_kind == Encoding::KIND_XCDR1) {
1232  const XTypes::TypeFlag extensibility_mask = XTypes::IS_APPENDABLE;
1233  if (type_lookup_service_->extensibility(extensibility_mask, writer_type_id)) {
1234  if (DCPS_debug_level) {
1235  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
1236  ACE_TEXT("StaticEndpointManager::match_continue: ")
1237  ACE_TEXT("Encountered unsupported combination of XCDR1 encoding and appendable extensibility\n")));
1238  }
1239  }
1240  }
1241  }
1242 
1243  XTypes::TypeConsistencyAttributes type_consistency;
1244  type_consistency.ignore_sequence_bounds = drQos->type_consistency.ignore_sequence_bounds;
1245  type_consistency.ignore_string_bounds = drQos->type_consistency.ignore_string_bounds;
1246  type_consistency.ignore_member_names = drQos->type_consistency.ignore_member_names;
1247  type_consistency.prevent_type_widening = drQos->type_consistency.prevent_type_widening;
1248  XTypes::TypeAssignability ta(type_lookup_service_, type_consistency);
1249 
1251  consistent = ta.assignable(reader_type_id, writer_type_id);
1252  } else {
1253  // The two types must be equivalent for DISALLOW_TYPE_COERCION
1254  consistent = reader_type_id == writer_type_id;
1255  }
1256  } else {
1258  // Cannot do type validation since not both TypeObjects are available
1259  consistent = false;
1260  } else {
1261  // Fall back to matching type names
1262  OPENDDS_STRING writer_type_name;
1263  OPENDDS_STRING reader_type_name;
1264  if (writer_local) {
1265  writer_type_name = td_iter->second.local_data_type_name();
1266  } else {
1267  writer_type_name = dpi->second.get_type_name();
1268  }
1269  if (reader_local) {
1270  reader_type_name = td_iter->second.local_data_type_name();
1271  } else {
1272  reader_type_name = dsi->second.get_type_name();
1273  }
1274  consistent = writer_type_name == reader_type_name;
1275  }
1276  }
1277 
1278  if (!consistent) {
1279  td_iter->second.increment_inconsistent();
1280  if (DCPS::DCPS_debug_level) {
1281  ACE_DEBUG((LM_WARNING,
1282  ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - WARNING ")
1283  ACE_TEXT("Data types of topic %C does not match (inconsistent)\n"),
1284  topic_name.c_str()));
1285  }
1286  return;
1287  }
1288  }
1289 
1290  // Need to release lock, below, for callbacks into DCPS which could
1291  // call into Spdp/Sedp. Note that this doesn't unlock, it just constructs
1292  // an ACE object which will be used below for unlocking.
1294 
1295  // 4. Check transport and QoS compatibility
1296 
1297  // Copy entries from local publication and local subscription maps
1298  // prior to releasing lock
1301  if (writer_local) {
1302  dwr = lpi->second.publication_;
1303  OPENDDS_ASSERT(lpi->second.publication_);
1304  OPENDDS_ASSERT(dwr);
1305  }
1306  if (reader_local) {
1307  drr = lsi->second.subscription_;
1308  OPENDDS_ASSERT(lsi->second.subscription_);
1309  OPENDDS_ASSERT(drr);
1310  }
1311 
1312  IncompatibleQosStatus writerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1313  IncompatibleQosStatus readerStatus = {0, 0, 0, DDS::QosPolicyCountSeq()};
1314 
1315  if (compatibleQOS(&writerStatus, &readerStatus, *wTls, *rTls,
1316  dwQos, drQos, pubQos, subQos)) {
1317 
1318  bool call_writer = false, call_reader = false;
1319 
1320  if (writer_local) {
1321  call_writer = lpi->second.matched_endpoints_.insert(reader).second;
1322  dwr = lpi->second.publication_;
1323  if (!reader_local) {
1324  dsi->second.matched_endpoints_.insert(writer);
1325  }
1326  }
1327  if (reader_local) {
1328  call_reader = lsi->second.matched_endpoints_.insert(writer).second;
1329  drr = lsi->second.subscription_;
1330  if (!writer_local) {
1331  dpi->second.matched_endpoints_.insert(reader);
1332  }
1333  }
1334 
1335  if (writer_local && !reader_local) {
1336  add_assoc_i(writer, lpi->second, reader, dsi->second);
1337  }
1338  if (reader_local && !writer_local) {
1339  add_assoc_i(reader, lsi->second, writer, dpi->second);
1340  }
1341 
1342  if (!call_writer && !call_reader) {
1343  return; // nothing more to do
1344  }
1345 
1346  // Copy reader and writer association data prior to releasing lock
1347  DDS::OctetSeq octet_seq_type_info_reader;
1348  XTypes::serialize_type_info(*reader_type_info, octet_seq_type_info_reader);
1349  const ReaderAssociation ra = {
1350  *rTls, TransportLocator(), rTransportContext, reader, *subQos, *drQos,
1351 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1352  cfProp->filterClassName, cfProp->filterExpression,
1353 #else
1354  "", "",
1355 #endif
1356  cfProp->expressionParameters,
1357  octet_seq_type_info_reader,
1358  reader_participant_discovered_at
1359  };
1360 
1361  DDS::OctetSeq octet_seq_type_info_writer;
1362  XTypes::serialize_type_info(*writer_type_info, octet_seq_type_info_writer);
1363  const WriterAssociation wa = {
1364  *wTls, TransportLocator(), wTransportContext, writer, *pubQos, *dwQos,
1365  octet_seq_type_info_writer,
1366  writer_participant_discovered_at
1367  };
1368 
1370  static const bool writer_active = true;
1371 
1372  if (call_writer) {
1373  if (DCPS_debug_level > 3) {
1374  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1375  ACE_TEXT("adding writer %C association for reader %C\n"), LogGuid(writer).c_str(),
1376  LogGuid(reader).c_str()));
1377  }
1378  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1379  if (dwr_lock) {
1380  if (call_reader) {
1381  DataReaderCallbacks_rch drr_lock = drr.lock();
1382  if (drr_lock) {
1383  DcpsUpcalls thr(drr_lock, reader, wa, !writer_active, dwr_lock);
1384  thr.activate();
1385  dwr_lock->add_association(writer, ra, writer_active);
1386  thr.writer_done();
1387  }
1388  } else {
1389  dwr_lock->add_association(writer, ra, writer_active);
1390  }
1391  }
1392  } else if (call_reader) {
1393  if (DCPS_debug_level > 3) {
1394  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match_continue - ")
1395  ACE_TEXT("adding reader %C association for writer %C\n"),
1396  LogGuid(reader).c_str(), LogGuid(writer).c_str()));
1397  }
1398  DataReaderCallbacks_rch drr_lock = drr.lock();
1399  if (drr_lock) {
1400  drr_lock->add_association(reader, wa, !writer_active);
1401  }
1402  }
1403 
1404  } else if (already_matched) { // break an existing associtaion
1405  if (writer_local) {
1406  lpi->second.matched_endpoints_.erase(reader);
1407  lpi->second.remote_expectant_opendds_associations_.erase(reader);
1408  if (dsi != discovered_subscriptions_.end()) {
1409  dsi->second.matched_endpoints_.erase(writer);
1410  }
1411  }
1412  if (reader_local) {
1413  lsi->second.matched_endpoints_.erase(writer);
1414  lsi->second.remote_expectant_opendds_associations_.erase(writer);
1415  if (dpi != discovered_publications_.end()) {
1416  dpi->second.matched_endpoints_.erase(reader);
1417  }
1418  }
1419  if (writer_local && !reader_local) {
1420  remove_assoc_i(writer, lpi->second, reader);
1421  }
1422  if (reader_local && !writer_local) {
1423  remove_assoc_i(reader, lsi->second, writer);
1424  }
1426  if (writer_local) {
1427  ReaderIdSeq reader_seq(1);
1428  reader_seq.length(1);
1429  reader_seq[0] = reader;
1430  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1431  if (dwr_lock) {
1432  dwr_lock->remove_associations(reader_seq, false /*notify_lost*/);
1433  }
1434  }
1435  if (reader_local) {
1436  WriterIdSeq writer_seq(1);
1437  writer_seq.length(1);
1438  writer_seq[0] = writer;
1439  DataReaderCallbacks_rch drr_lock = drr.lock();
1440  if (drr_lock) {
1441  drr_lock->remove_associations(writer_seq, false /*notify_lost*/);
1442  }
1443  }
1444  } else { // something was incompatible
1446  if (writer_local && writerStatus.count_since_last_send) {
1447  if (DCPS_debug_level > 3) {
1448  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1449  ACE_TEXT("writer incompatible\n")));
1450  }
1451  DataWriterCallbacks_rch dwr_lock = dwr.lock();
1452  if (dwr_lock) {
1453  dwr_lock->update_incompatible_qos(writerStatus);
1454  }
1455  }
1456  if (reader_local && readerStatus.count_since_last_send) {
1457  if (DCPS_debug_level > 3) {
1458  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) StaticEndpointManager::match - ")
1459  ACE_TEXT("reader incompatible\n")));
1460  }
1461  DataReaderCallbacks_rch drr_lock = drr.lock();
1462  if (drr_lock) {
1463  drr_lock->update_incompatible_qos(readerStatus);
1464  }
1465  }
1466  }
1467 }
#define ACE_DEBUG(X)
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
LocalSubscriptionMap local_subscriptions_
GroupDataQosPolicy group_data
DiscoveredSubscriptionMap discovered_subscriptions_
const TypeConsistencyEnforcementQosPolicyKind_t ALLOW_TYPE_COERCION
WeakRcHandle< DataReaderCallbacks > DataReaderCallbacks_wrch
DestinationOrderQosPolicy destination_order
LivelinessQosPolicy liveliness
ResourceLimitsQosPolicy resource_limits
sequence< TransportLocator > TransportLocatorSeq
PartitionQosPolicy partition
UserDataQosPolicy user_data
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
GroupDataQosPolicy group_data
ResourceLimitsQosPolicy resource_limits
virtual void populate_transport_locator_sequence(TransportLocatorSeq *&, DiscoveredSubscriptionIter &, const GUID_t &)
TimeBasedFilterQosPolicy time_based_filter
ReliabilityQosPolicy reliability
EntityFactoryQosPolicy entity_factory
const TypeKind TK_NONE
Definition: TypeObject.h:213
ReaderDataLifecycleQosPolicy reader_data_lifecycle
LatencyBudgetQosPolicy latency_budget
virtual void remove_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &)
#define OPENDDS_STRING
DataRepresentationQosPolicy representation
DurabilityServiceQosPolicy durability_service
DiscoveredPublicationMap::iterator DiscoveredPublicationIter
LatencyBudgetQosPolicy latency_budget
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
ReliabilityQosPolicy reliability
OwnershipStrengthQosPolicy ownership_strength
const TypeFlag IS_APPENDABLE
Definition: TypeObject.h:401
DurabilityQosPolicy durability
TypeConsistencyEnforcementQosPolicy type_consistency
XTypes::TypeLookupService_rch type_lookup_service_
UserDataQosPolicy user_data
OwnershipQosPolicy ownership
DataRepresentationIdSeq value
bool compatibleQOS(OpenDDS::DCPS::IncompatibleQosStatus *writerStatus, OpenDDS::DCPS::IncompatibleQosStatus *readerStatus, const OpenDDS::DCPS::TransportLocatorSeq &pubTLS, const OpenDDS::DCPS::TransportLocatorSeq &subTLS, DDS::DataWriterQos const *const writerQos, DDS::DataReaderQos const *const readerQos, DDS::PublisherQos const *const pubQos, DDS::SubscriberQos const *const subQos)
Definition: DCPS_Utils.cpp:237
HistoryQosPolicy history
WeakRcHandle< DataWriterCallbacks > DataWriterCallbacks_wrch
LifespanQosPolicy lifespan
DurabilityQosPolicy durability
LocalSubscriptionMap::iterator LocalSubscriptionIter
ACE_UINT32 ULong
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PresentationQosPolicy presentation
void serialize_type_info(const TypeInformation &type_info, T &seq, const DCPS::Encoding *encoding_option=0)
Definition: TypeObject.h:3382
sequence< GUID_t > WriterIdSeq
ACE_TEXT("TCP_Factory")
sequence< GUID_t > ReaderIdSeq
DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
PresentationQosPolicy presentation
TypeConsistencyEnforcementQosPolicyKind_t kind
virtual void add_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &, const DiscoveredSubscription &)
DataRepresentationQosPolicy representation
LocalPublicationMap::iterator LocalPublicationIter
DiscoveredPublicationMap discovered_publications_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
DeadlineQosPolicy deadline
WriterDataLifecycleQosPolicy writer_data_lifecycle
sequence< QosPolicyCount > QosPolicyCountSeq
Definition: DdsDcpsCore.idl:62
EntityFactoryQosPolicy entity_factory
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch
OwnershipQosPolicy ownership
PartitionQosPolicy partition
DeadlineQosPolicy deadline
#define TheServiceParticipant
TransportPriorityQosPolicy transport_priority
ACE_CDR::UShort TypeFlag
Definition: TypeObject.h:399
LivelinessQosPolicy liveliness
DestinationOrderQosPolicy destination_order
HistoryQosPolicy history

◆ match_endpoints()

void OpenDDS::DCPS::StaticEndpointManager::match_endpoints ( GUID_t  repoId,
const TopicDetails td,
bool  remove = false 
)

Definition at line 926 of file StaticDiscovery.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TopicDetails::discovered_publications(), OpenDDS::DCPS::TopicDetails::discovered_subscriptions(), OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::GuidConverter::isReader(), LM_DEBUG, OpenDDS::DCPS::TopicDetails::local_publications(), OpenDDS::DCPS::TopicDetails::local_subscriptions(), match(), participant_id_, and remove_assoc().

Referenced by add_publication(), add_subscription(), ignore(), remove_publication(), and remove_subscription().

928 {
929  if (DCPS_debug_level >= 4) {
930  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::match_endpoints %C%C\n",
931  remove ? "remove " : "", LogGuid(repoId).c_str()));
932  }
933 
934  const bool reader = GuidConverter(repoId).isReader();
935  // Copy the endpoint set - lock can be released in match()
936  RepoIdSet local_endpoints;
937  RepoIdSet discovered_endpoints;
938  if (reader) {
939  local_endpoints = td.local_publications();
940  discovered_endpoints = td.discovered_publications();
941  } else {
942  local_endpoints = td.local_subscriptions();
943  discovered_endpoints = td.discovered_subscriptions();
944  }
945 
946  const bool is_remote = !equal_guid_prefixes(repoId, participant_id_);
947  if (is_remote && local_endpoints.empty()) {
948  // Nothing to match.
949  return;
950  }
951 
952  for (RepoIdSet::const_iterator iter = local_endpoints.begin();
953  iter != local_endpoints.end(); ++iter) {
954  // check to make sure it's a Reader/Writer or Writer/Reader match
955  if (GuidConverter(*iter).isReader() != reader) {
956  if (remove) {
957  remove_assoc(*iter, repoId);
958  } else {
959  match(reader ? *iter : repoId, reader ? repoId : *iter);
960  }
961  }
962  }
963 
964  // Remote/remote matches are a waste of time
965  if (is_remote) {
966  return;
967  }
968 
969  for (RepoIdSet::const_iterator iter = discovered_endpoints.begin();
970  iter != discovered_endpoints.end(); ++iter) {
971  // check to make sure it's a Reader/Writer or Writer/Reader match
972  if (GuidConverter(*iter).isReader() != reader) {
973  if (remove) {
974  remove_assoc(*iter, repoId);
975  } else {
976  match(reader ? *iter : repoId, reader ? repoId : *iter);
977  }
978  }
979  }
980 }
#define ACE_DEBUG(X)
GuidSet RepoIdSet
Definition: GuidUtils.h:113
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void match(const GUID_t &writer, const GUID_t &reader)
void remove_assoc(const GUID_t &remove_from, const GUID_t &removing)

◆ need_minimal_and_or_complete_types()

void OpenDDS::DCPS::StaticEndpointManager::need_minimal_and_or_complete_types ( const XTypes::TypeInformation type_info,
bool &  need_minimal,
bool &  need_complete 
) const
private

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP ( SequenceNumber  ,
TypeIdOrigSeqNumber   
)
private

◆ OPENDDS_MAP_CMP() [1/5]

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP_CMP ( GUID_t  ,
DiscoveredSubscription  ,
GUID_tKeyLessThan   
)
protected

Referenced by has_dcps_key(), and ignore().

◆ OPENDDS_MAP_CMP() [2/5]

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP_CMP ( GUID_t  ,
DiscoveredPublication  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [3/5]

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP_CMP ( GUID_t  ,
LocalPublication  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [4/5]

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP_CMP ( GUID_t  ,
LocalSubscription  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [5/5]

typedef OpenDDS::DCPS::StaticEndpointManager::OPENDDS_MAP_CMP ( GUID_t  ,
String  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_SET()

OpenDDS::DCPS::StaticEndpointManager::OPENDDS_SET ( OPENDDS_STRING  )
private

◆ OPENDDS_SET_CMP()

OpenDDS::DCPS::StaticEndpointManager::OPENDDS_SET_CMP ( GUID_t  ,
GUID_tKeyLessThan   
)
private

◆ populate_transport_locator_sequence() [1/2]

void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence ( TransportLocatorSeq *&  ,
DiscoveredSubscriptionIter ,
const GUID_t  
)
virtual

Definition at line 469 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_TEXT(), and LM_NOTICE.

Referenced by match_continue().

472 {
473  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
474  // TODO
475 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

◆ populate_transport_locator_sequence() [2/2]

void OpenDDS::DCPS::StaticEndpointManager::populate_transport_locator_sequence ( TransportLocatorSeq *&  ,
DiscoveredPublicationIter ,
const GUID_t  
)
virtual

Definition at line 478 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_TEXT(), and LM_NOTICE.

481 {
482  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::populate_transport_locator_sequence TODO\n")));
483  // TODO
484 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

◆ pub_bit()

OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::pub_bit ( )

Definition at line 566 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::StaticParticipant::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, and participant_.

Referenced by init_bit().

567 {
568  DDS::Subscriber_var sub = participant_.bit_subscriber();
569  if (!sub.in())
570  return 0;
571 
572  DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
573  return dynamic_cast<OpenDDS::DCPS::PublicationBuiltinTopicDataDataReaderImpl*>(d.in());
574 }
const char *const BUILT_IN_PUBLICATION_TOPIC
DDS::Subscriber_var bit_subscriber() const

◆ purge_dead_topic()

void OpenDDS::DCPS::StaticEndpointManager::purge_dead_topic ( const String topic_name)

Definition at line 611 of file StaticDiscovery.cpp.

References topic_names_, and topics_.

Referenced by ignore(), and remove_topic().

612 {
613  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
614  topic_names_.erase(top_it->second.topic_id());
615  topics_.erase(top_it);
616 }

◆ reader_does_not_exist()

void OpenDDS::DCPS::StaticEndpointManager::reader_does_not_exist ( const GUID_t readerid,
const GUID_t writerid 
)
virtual

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 505 of file StaticDiscovery.cpp.

References ACE_GUARD, local_publications_, lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.

506 {
508  LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
509  EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
510  if (lp_pos != local_publications_.end() &&
511  reader_pos != registry_.reader_map.end()) {
512  DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
513  if (dwr) {
514  ReaderIdSeq ids;
515  ids.length(1);
516  ids[0] = readerid;
517  dwr->remove_associations(ids, true);
518  }
519  }
520 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const EndpointRegistry & registry_
sequence< GUID_t > ReaderIdSeq
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch

◆ reader_exists()

void OpenDDS::DCPS::StaticEndpointManager::reader_exists ( const GUID_t readerid,
const GUID_t writerid 
)
virtual

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 487 of file StaticDiscovery.cpp.

References ACE_GUARD, local_publications_, lock_, OpenDDS::DCPS::EndpointRegistry::reader_map, and registry_.

488 {
490  LocalPublicationMap::const_iterator lp_pos = local_publications_.find(writerid);
491  EndpointRegistry::ReaderMapType::const_iterator reader_pos = registry_.reader_map.find(readerid);
492  if (lp_pos != local_publications_.end() &&
493  reader_pos != registry_.reader_map.end()) {
494  DataWriterCallbacks_rch dwr = lp_pos->second.publication_.lock();
495  if (dwr) {
496  const ReaderAssociation ra =
497  {reader_pos->second.trans_info, TransportLocator(), 0, readerid, reader_pos->second.subscriber_qos, reader_pos->second.qos,
498  "", "", DDS::StringSeq(), DDS::OctetSeq(), {0, 0}};
499  dwr->add_association(writerid, ra, true);
500  }
501  }
502 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
const EndpointRegistry & registry_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch

◆ remove_assoc()

void OpenDDS::DCPS::StaticEndpointManager::remove_assoc ( const GUID_t remove_from,
const GUID_t removing 
)

Definition at line 982 of file StaticDiscovery.cpp.

References discovered_publications_, discovered_subscriptions_, local_publications_, local_subscriptions_, remove_assoc_i(), and write_subscription_data().

Referenced by match_endpoints().

983 {
984  if (GuidConverter(remove_from).isReader()) {
985  const LocalSubscriptionIter lsi = local_subscriptions_.find(remove_from);
986  if (lsi != local_subscriptions_.end()) {
987  lsi->second.matched_endpoints_.erase(removing);
988  const DiscoveredPublicationIter dpi = discovered_publications_.find(removing);
989  if (dpi != discovered_publications_.end()) {
990  dpi->second.matched_endpoints_.erase(remove_from);
991  }
992  WriterIdSeq writer_seq(1);
993  writer_seq.length(1);
994  writer_seq[0] = removing;
995  const size_t count = lsi->second.remote_expectant_opendds_associations_.erase(removing);
996  DataReaderCallbacks_rch drr = lsi->second.subscription_.lock();
997  if (drr) {
998  drr->remove_associations(writer_seq, false /*notify_lost*/);
999  }
1000  remove_assoc_i(remove_from, lsi->second, removing);
1001  // Update writer
1002  if (count) {
1003  write_subscription_data(remove_from, lsi->second);
1004  }
1005  }
1006 
1007  } else {
1008  const LocalPublicationIter lpi = local_publications_.find(remove_from);
1009  if (lpi != local_publications_.end()) {
1010  lpi->second.matched_endpoints_.erase(removing);
1011  const DiscoveredSubscriptionIter dsi = discovered_subscriptions_.find(removing);
1012  if (dsi != discovered_subscriptions_.end()) {
1013  dsi->second.matched_endpoints_.erase(remove_from);
1014  }
1015  ReaderIdSeq reader_seq(1);
1016  reader_seq.length(1);
1017  reader_seq[0] = removing;
1018  lpi->second.remote_expectant_opendds_associations_.erase(removing);
1019  DataWriterCallbacks_rch dwr = lpi->second.publication_.lock();
1020  if (dwr) {
1021  dwr->remove_associations(reader_seq, false /*notify_lost*/);
1022  }
1023  remove_assoc_i(remove_from, lpi->second, removing);
1024  }
1025  }
1026 }
LocalSubscriptionMap local_subscriptions_
DiscoveredSubscriptionMap discovered_subscriptions_
virtual void remove_assoc_i(const GUID_t &, const LocalPublication &, const GUID_t &)
virtual DDS::ReturnCode_t write_subscription_data(const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
DiscoveredPublicationMap::iterator DiscoveredPublicationIter
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
LocalSubscriptionMap::iterator LocalSubscriptionIter
sequence< GUID_t > WriterIdSeq
sequence< GUID_t > ReaderIdSeq
DiscoveredSubscriptionMap::iterator DiscoveredSubscriptionIter
LocalPublicationMap::iterator LocalPublicationIter
DiscoveredPublicationMap discovered_publications_
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch

◆ remove_assoc_i() [1/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::remove_assoc_i ( const GUID_t ,
const LocalPublication ,
const GUID_t  
)
inlinevirtual

Definition at line 412 of file StaticDiscovery.h.

Referenced by match_continue(), and remove_assoc().

413  {}

◆ remove_assoc_i() [2/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::remove_assoc_i ( const GUID_t ,
const LocalSubscription ,
const GUID_t  
)
inlinevirtual

Definition at line 416 of file StaticDiscovery.h.

417  {}

◆ remove_expired_endpoints()

void OpenDDS::DCPS::StaticEndpointManager::remove_expired_endpoints ( const MonotonicTimePoint )
private

Definition at line 1038 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_GUARD, OpenDDS::DCPS::LogGuid::c_str(), cleanup_type_lookup_data(), OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, lock_, max_type_lookup_service_reply_period_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and orig_seq_numbers_.

Referenced by type_lookup_init().

1040 {
1043 
1044  // Clean up internal data used by getTypeDependencies
1045  for (OrigSeqNumberMap::iterator it = orig_seq_numbers_.begin(); it != orig_seq_numbers_.end();) {
1046  if (now - it->second.time_started >= max_type_lookup_service_reply_period_) {
1047  if (DCPS_debug_level >= 4) {
1048  ACE_DEBUG((LM_DEBUG, "(%P|%t) StaticEndpointManager::remove_expired_endpoints: "
1049  "clean up type lookup data for %C\n",
1050  LogGuid(it->second.participant).c_str()));
1051  }
1052  cleanup_type_lookup_data(it->second.participant, it->second.type_id, it->second.secure);
1053  orig_seq_numbers_.erase(it++);
1054  } else {
1055  ++it;
1056  }
1057  }
1058 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void cleanup_type_lookup_data(const GuidPrefix_t &prefix, const XTypes::TypeIdentifier &ti, bool secure)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51

◆ remove_from_bit() [1/2]

void OpenDDS::DCPS::StaticEndpointManager::remove_from_bit ( const DiscoveredPublication pub)
inlineprivate

Definition at line 427 of file StaticDiscovery.h.

Referenced by ignore().

428  {
429  remove_from_bit_i(pub);
430  }
virtual void remove_from_bit_i(const DiscoveredPublication &)

◆ remove_from_bit() [2/2]

void OpenDDS::DCPS::StaticEndpointManager::remove_from_bit ( const DiscoveredSubscription sub)
inlineprivate

Definition at line 432 of file StaticDiscovery.h.

433  {
434  remove_from_bit_i(sub);
435  }
virtual void remove_from_bit_i(const DiscoveredPublication &)

◆ remove_from_bit_i() [1/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::remove_from_bit_i ( const DiscoveredPublication )
inlineprotectedvirtual

Definition at line 265 of file StaticDiscovery.h.

265 { }

◆ remove_from_bit_i() [2/2]

virtual void OpenDDS::DCPS::StaticEndpointManager::remove_from_bit_i ( const DiscoveredSubscription )
inlineprotectedvirtual

Definition at line 266 of file StaticDiscovery.h.

266 { }

◆ remove_publication()

void OpenDDS::DCPS::StaticEndpointManager::remove_publication ( const GUID_t publicationId)

Definition at line 801 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), LM_ERROR, local_publications_, lock_, match_endpoints(), OPENDDS_STRING, remove_publication_i(), DDS::RETCODE_OK, topic_names_, and topics_.

802 {
804  LocalPublicationIter iter = local_publications_.find(publicationId);
805  if (iter != local_publications_.end()) {
806  if (DDS::RETCODE_OK == remove_publication_i(publicationId, iter->second)) {
807  OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
808  local_publications_.erase(publicationId);
809  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
810  if (top_it != topics_.end()) {
811  match_endpoints(publicationId, top_it->second, true /*remove*/);
812  top_it->second.remove_local_publication(publicationId);
813  // Local, no need to check for dead topic.
814  }
815  } else {
816  ACE_ERROR((LM_ERROR,
817  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_publication - ")
818  ACE_TEXT("Failed to publish dispose msg\n")));
819  }
820  }
821 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const ReturnCode_t RETCODE_OK
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
#define OPENDDS_STRING
ACE_TEXT("TCP_Factory")
LocalPublicationMap::iterator LocalPublicationIter
virtual DDS::ReturnCode_t remove_publication_i(const GUID_t &, LocalPublication &)

◆ remove_publication_i()

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_publication_i ( const GUID_t writerid,
LocalPublication pub 
)
virtual

Definition at line 354 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_id_, OpenDDS::DCPS::StaticEndpointManager::LocalPublication::publication_, registry_, OpenDDS::DCPS::EndpointRegistry::Writer::reliable_readers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::EndpointRegistry::writer_map.

Referenced by remove_publication().

355 {
356  EndpointRegistry::WriterMapType::const_iterator pos = registry_.writer_map.find(writerid);
357  if (pos == registry_.writer_map.end()) {
358  return DDS::RETCODE_ERROR;
359  }
360 
361  const EndpointRegistry::Writer& writer = pos->second;
362 
363  ReaderIdSeq ids;
364  ids.length((CORBA::ULong)writer.reliable_readers.size());
365  CORBA::ULong idx = 0;
366  for (RepoIdSet::const_iterator pos = writer.reliable_readers.begin(), limit = writer.reliable_readers.end();
367  pos != limit;
368  ++pos, ++idx) {
369  const GUID_t& readerid = *pos;
370  ids[idx] = readerid;
371  DataWriterCallbacks_rch pl = pub.publication_.lock();
372  if (pl) {
373  pl->unregister_for_reader(participant_id_, writerid, readerid);
374  }
375  }
376 
377  return DDS::RETCODE_OK;
378 }
const ReturnCode_t RETCODE_OK
ACE_CDR::ULong ULong
const ReturnCode_t RETCODE_ERROR
const EndpointRegistry & registry_
sequence< GUID_t > ReaderIdSeq
RcHandle< DataWriterCallbacks > DataWriterCallbacks_rch

◆ remove_subscription()

void OpenDDS::DCPS::StaticEndpointManager::remove_subscription ( const GUID_t subscriptionId)

Definition at line 886 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), LM_ERROR, local_subscriptions_, lock_, match_endpoints(), OPENDDS_STRING, remove_subscription_i(), DDS::RETCODE_OK, topic_names_, and topics_.

887 {
889  LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
890  if (iter != local_subscriptions_.end()) {
891  if (DDS::RETCODE_OK == remove_subscription_i(subscriptionId, iter->second)) {
892  OPENDDS_STRING topic_name = topic_names_[iter->second.topic_id_];
893  local_subscriptions_.erase(subscriptionId);
894  TopicDetailsMap::iterator top_it = topics_.find(topic_name);
895  if (top_it != topics_.end()) {
896  match_endpoints(subscriptionId, top_it->second, true /*remove*/);
897  top_it->second.remove_local_subscription(subscriptionId);
898  // Local, no need to check for dead topic.
899  }
900  } else {
901  ACE_ERROR((LM_ERROR,
902  ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::remove_subscription - ")
903  ACE_TEXT("Failed to publish dispose msg\n")));
904  }
905  }
906 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
LocalSubscriptionMap local_subscriptions_
const ReturnCode_t RETCODE_OK
void match_endpoints(GUID_t repoId, const TopicDetails &td, bool remove=false)
#define OPENDDS_STRING
LocalSubscriptionMap::iterator LocalSubscriptionIter
ACE_TEXT("TCP_Factory")
virtual DDS::ReturnCode_t remove_subscription_i(const GUID_t &, LocalSubscription &)

◆ remove_subscription_i()

DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::remove_subscription_i ( const GUID_t readerid,
LocalSubscription sub 
)
virtual

Definition at line 425 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_id_, OpenDDS::DCPS::EndpointRegistry::reader_map, registry_, OpenDDS::DCPS::EndpointRegistry::Reader::reliable_writers, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and OpenDDS::DCPS::StaticEndpointManager::LocalSubscription::subscription_.

Referenced by remove_subscription().

427 {
428  EndpointRegistry::ReaderMapType::const_iterator pos = registry_.reader_map.find(readerid);
429  if (pos == registry_.reader_map.end()) {
430  return DDS::RETCODE_ERROR;
431  }
432 
433  const EndpointRegistry::Reader& reader = pos->second;
434 
435  WriterIdSeq ids;
436  ids.length((CORBA::ULong)reader.reliable_writers.size());
437  CORBA::ULong idx = 0;
438  for (RepoIdSet::const_iterator pos = reader.reliable_writers.begin(), limit = reader.reliable_writers.end();
439  pos != limit;
440  ++pos, ++idx) {
441  const GUID_t& writerid = *pos;
442  ids[idx] = writerid;
443  DataReaderCallbacks_rch sl = sub.subscription_.lock();
444  if (sl) {
445  sl->unregister_for_writer(participant_id_, readerid, writerid);
446  }
447  }
448 
449  return DDS::RETCODE_OK;
450 }
const ReturnCode_t RETCODE_OK
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
ACE_CDR::ULong ULong
const ReturnCode_t RETCODE_ERROR
const EndpointRegistry & registry_
sequence< GUID_t > WriterIdSeq

◆ remove_topic()

TopicStatus OpenDDS::DCPS::StaticEndpointManager::remove_topic ( const GUID_t topicId)

Definition at line 743 of file StaticDiscovery.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::INTERNAL_ERROR, OpenDDS::DCPS::TopicDetails::is_dead(), lock_, name, OpenDDS::DCPS::NOT_FOUND, purge_dead_topic(), OpenDDS::DCPS::REMOVED, topic_names_, topics_, and OpenDDS::DCPS::TopicDetails::unset_local().

744 {
746  TopicNameMap::iterator name_iter = topic_names_.find(topicId);
747  if (name_iter == topic_names_.end()) {
748  return NOT_FOUND;
749  }
750  const String& name = name_iter->second;
751  TopicDetails& td = topics_[name];
752  td.unset_local();
753  if (td.is_dead()) {
754  purge_dead_topic(name);
755  }
756 
757  return REMOVED;
758 }
void purge_dead_topic(const String &topic_name)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const char *const name
Definition: debug.cpp:60
std::string String

◆ send_type_lookup_request()

virtual bool OpenDDS::DCPS::StaticEndpointManager::send_type_lookup_request ( const XTypes::TypeIdentifierSeq ,
const GUID_t ,
bool  ,
bool   
)
inlineprotectedvirtual

Definition at line 284 of file StaticDiscovery.h.

288  {
289  return true;
290  }

◆ shutting_down()

bool OpenDDS::DCPS::StaticEndpointManager::shutting_down ( void  ) const
virtual

Definition at line 461 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_TEXT(), and LM_NOTICE.

Referenced by ignore().

462 {
463  ACE_DEBUG((LM_NOTICE, ACE_TEXT("(%P|%t) StaticEndpointManager::shutting_down TODO\n")));
464  // TODO
465  return false;
466 }
#define ACE_DEBUG(X)
ACE_TEXT("TCP_Factory")

◆ sub_bit()

OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl * OpenDDS::DCPS::StaticEndpointManager::sub_bit ( )

Definition at line 577 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::StaticParticipant::bit_subscriber(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, and participant_.

Referenced by init_bit().

578 {
579  DDS::Subscriber_var sub = participant_.bit_subscriber();
580  if (!sub.in())
581  return 0;
582 
583  DDS::DataReader_var d = sub->lookup_datareader(BUILT_IN_SUBSCRIPTION_TOPIC);
584  return dynamic_cast<OpenDDS::DCPS::SubscriptionBuiltinTopicDataDataReaderImpl*>(d.in());
585 }
DDS::Subscriber_var bit_subscriber() const
const char *const BUILT_IN_SUBSCRIPTION_TOPIC

◆ type_lookup_fini()

void OpenDDS::DCPS::StaticEndpointManager::type_lookup_fini ( )

Definition at line 597 of file StaticDiscovery.cpp.

References type_lookup_reply_deadline_processor_.

Referenced by ~StaticEndpointManager().

598 {
602  }
603 }
RcHandle< StaticEndpointManagerSporadic > type_lookup_reply_deadline_processor_

◆ type_lookup_init()

void OpenDDS::DCPS::StaticEndpointManager::type_lookup_init ( ReactorInterceptor_rch  reactor_interceptor)

Definition at line 588 of file StaticDiscovery.cpp.

References OpenDDS::DCPS::rchandle_from(), remove_expired_endpoints(), TheServiceParticipant, and type_lookup_reply_deadline_processor_.

Referenced by StaticEndpointManager().

589 {
592  DCPS::make_rch<StaticEndpointManagerSporadic>(TheServiceParticipant->time_source(), reactor_interceptor,
594  }
595 }
void remove_expired_endpoints(const MonotonicTimePoint &)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define TheServiceParticipant
RcHandle< StaticEndpointManagerSporadic > type_lookup_reply_deadline_processor_

◆ type_lookup_service()

void OpenDDS::DCPS::StaticEndpointManager::type_lookup_service ( const XTypes::TypeLookupService_rch  type_lookup_service)

Definition at line 605 of file StaticDiscovery.cpp.

References type_lookup_service_.

607 {
609 }
XTypes::TypeLookupService_rch type_lookup_service_
void type_lookup_service(const XTypes::TypeLookupService_rch type_lookup_service)

◆ update_publication_locators()

void OpenDDS::DCPS::StaticEndpointManager::update_publication_locators ( const GUID_t publicationId,
const TransportLocatorSeq transInfo 
)

Definition at line 823 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_INFO, local_publications_, lock_, and write_publication_data().

825 {
827  LocalPublicationIter iter = local_publications_.find(publicationId);
828  if (iter != local_publications_.end()) {
829  if (DCPS_debug_level > 3) {
830  ACE_DEBUG((LM_INFO,
831  ACE_TEXT("(%P|%t) StaticEndpointManager::update_publication_locators - updating locators for %C\n"),
832  LogGuid(publicationId).c_str()));
833  }
834  iter->second.trans_info_ = transInfo;
835  write_publication_data(publicationId, iter->second);
836  }
837 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual DDS::ReturnCode_t write_publication_data(const GUID_t &, LocalPublication &, const GUID_t &reader=GUID_UNKNOWN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
LocalPublicationMap::iterator LocalPublicationIter

◆ update_publication_qos()

bool OpenDDS::DCPS::StaticEndpointManager::update_publication_qos ( const GUID_t ,
const DDS::DataWriterQos ,
const DDS::PublisherQos  
)
virtual

Definition at line 274 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_TEXT(), and LM_ERROR.

277 {
278  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_publication_qos - ")
279  ACE_TEXT("Not allowed\n")));
280  return false;
281 }
#define ACE_ERROR(X)
ACE_TEXT("TCP_Factory")

◆ update_subscription_locators()

void OpenDDS::DCPS::StaticEndpointManager::update_subscription_locators ( const GUID_t subscriptionId,
const TransportLocatorSeq transInfo 
)

Definition at line 908 of file StaticDiscovery.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_INFO, local_subscriptions_, lock_, and write_subscription_data().

911 {
913  LocalSubscriptionIter iter = local_subscriptions_.find(subscriptionId);
914  if (iter != local_subscriptions_.end()) {
915  if (DCPS_debug_level > 3) {
916  ACE_DEBUG((LM_INFO,
917  ACE_TEXT("(%P|%t) StaticEndpointManager::update_subscription_locators updating locators for %C\n"),
918  LogGuid(subscriptionId).c_str()));
919  }
920  iter->second.trans_info_ = transInfo;
921  write_subscription_data(subscriptionId, iter->second);
922  }
923 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
LocalSubscriptionMap local_subscriptions_
virtual DDS::ReturnCode_t write_subscription_data(const GUID_t &, LocalSubscription &, const GUID_t &reader=GUID_UNKNOWN)
LocalSubscriptionMap::iterator LocalSubscriptionIter
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ update_subscription_params()

bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_params ( const GUID_t ,
const DDS::StringSeq  
)
virtual

Definition at line 294 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_TEXT(), and LM_ERROR.

296 {
297  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
298  ACE_TEXT("Not allowed\n")));
299  return false;
300 }
#define ACE_ERROR(X)
ACE_TEXT("TCP_Factory")

◆ update_subscription_qos()

bool OpenDDS::DCPS::StaticEndpointManager::update_subscription_qos ( const GUID_t ,
const DDS::DataReaderQos ,
const DDS::SubscriberQos  
)
virtual

Definition at line 284 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_TEXT(), and LM_ERROR.

287 {
288  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_subscription_qos - ")
289  ACE_TEXT("Not allowed\n")));
290  return false;
291 }
#define ACE_ERROR(X)
ACE_TEXT("TCP_Factory")

◆ update_topic_qos()

bool OpenDDS::DCPS::StaticEndpointManager::update_topic_qos ( const GUID_t ,
const DDS::TopicQos  
)
virtual

Definition at line 265 of file StaticDiscovery.cpp.

References ACE_ERROR, ACE_TEXT(), and LM_ERROR.

267 {
268  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: StaticEndpointManager::update_topic_qos - ")
269  ACE_TEXT("Not allowed\n")));
270  return false;
271 }
#define ACE_ERROR(X)
ACE_TEXT("TCP_Factory")

◆ write_publication_data()

virtual DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::write_publication_data ( const GUID_t ,
LocalPublication ,
const GUID_t reader = GUID_UNKNOWN 
)
inlineprotectedvirtual

Definition at line 268 of file StaticDiscovery.h.

References DDS::RETCODE_OK.

Referenced by add_publication(), and update_publication_locators().

271  {
272  ACE_UNUSED_ARG(reader);
273  return DDS::RETCODE_OK;
274  }
const ReturnCode_t RETCODE_OK

◆ write_subscription_data()

virtual DDS::ReturnCode_t OpenDDS::DCPS::StaticEndpointManager::write_subscription_data ( const GUID_t ,
LocalSubscription ,
const GUID_t reader = GUID_UNKNOWN 
)
inlineprotectedvirtual

Definition at line 276 of file StaticDiscovery.h.

References DDS::RETCODE_OK.

Referenced by add_subscription(), remove_assoc(), and update_subscription_locators().

279  {
280  ACE_UNUSED_ARG(reader);
281  return DDS::RETCODE_OK;
282  }
const ReturnCode_t RETCODE_OK

◆ writer_does_not_exist()

void OpenDDS::DCPS::StaticEndpointManager::writer_does_not_exist ( const GUID_t writerid,
const GUID_t readerid 
)
virtual

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 540 of file StaticDiscovery.cpp.

References ACE_GUARD, local_subscriptions_, lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.

541 {
543  LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
544  EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
545  if (ls_pos != local_subscriptions_.end() &&
546  writer_pos != registry_.writer_map.end()) {
547  DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
548  if (drr) {
549  WriterIdSeq ids;
550  ids.length(1);
551  ids[0] = writerid;
552  drr->remove_associations(ids, true);
553  }
554  }
555 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
LocalSubscriptionMap local_subscriptions_
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
const EndpointRegistry & registry_
sequence< GUID_t > WriterIdSeq

◆ writer_exists()

void OpenDDS::DCPS::StaticEndpointManager::writer_exists ( const GUID_t writerid,
const GUID_t readerid 
)
virtual

Implements OpenDDS::DCPS::DiscoveryListener.

Definition at line 523 of file StaticDiscovery.cpp.

References ACE_GUARD, local_subscriptions_, lock_, registry_, and OpenDDS::DCPS::EndpointRegistry::writer_map.

524 {
526  LocalSubscriptionMap::const_iterator ls_pos = local_subscriptions_.find(readerid);
527  EndpointRegistry::WriterMapType::const_iterator writer_pos = registry_.writer_map.find(writerid);
528  if (ls_pos != local_subscriptions_.end() &&
529  writer_pos != registry_.writer_map.end()) {
530  DataReaderCallbacks_rch drr = ls_pos->second.subscription_.lock();
531  if (drr) {
532  const WriterAssociation wa =
533  {writer_pos->second.trans_info, TransportLocator(), 0, writerid, writer_pos->second.publisher_qos, writer_pos->second.qos, DDS::OctetSeq(), {0,0}};
534  drr->add_association(readerid, wa, false);
535  }
536  }
537 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
LocalSubscriptionMap local_subscriptions_
RcHandle< DataReaderCallbacks > DataReaderCallbacks_rch
const EndpointRegistry & registry_
sequence< octet > OctetSeq
Definition: DdsDcpsCore.idl:64

Member Data Documentation

◆ discovered_publications_

DiscoveredPublicationMap OpenDDS::DCPS::StaticEndpointManager::discovered_publications_
private

Definition at line 447 of file StaticDiscovery.h.

Referenced by ignore(), match_continue(), and remove_assoc().

◆ discovered_subscriptions_

DiscoveredSubscriptionMap OpenDDS::DCPS::StaticEndpointManager::discovered_subscriptions_
private

Definition at line 448 of file StaticDiscovery.h.

Referenced by ignore(), match_continue(), and remove_assoc().

◆ ignored_guids_

RepoIdSet OpenDDS::DCPS::StaticEndpointManager::ignored_guids_
private

Definition at line 443 of file StaticDiscovery.h.

Referenced by ignore(), and ignoring().

◆ local_publications_

LocalPublicationMap OpenDDS::DCPS::StaticEndpointManager::local_publications_
private

◆ local_subscriptions_

LocalSubscriptionMap OpenDDS::DCPS::StaticEndpointManager::local_subscriptions_
private

◆ lock_

ACE_Thread_Mutex& OpenDDS::DCPS::StaticEndpointManager::lock_
private

◆ max_type_lookup_service_reply_period_

TimeDuration OpenDDS::DCPS::StaticEndpointManager::max_type_lookup_service_reply_period_
private

Definition at line 461 of file StaticDiscovery.h.

Referenced by remove_expired_endpoints().

◆ orig_seq_numbers_

OrigSeqNumberMap OpenDDS::DCPS::StaticEndpointManager::orig_seq_numbers_
private

Definition at line 476 of file StaticDiscovery.h.

Referenced by remove_expired_endpoints().

◆ participant_

StaticParticipant& OpenDDS::DCPS::StaticEndpointManager::participant_
private

Definition at line 455 of file StaticDiscovery.h.

Referenced by pub_bit(), and sub_bit().

◆ participant_id_

GUID_t OpenDDS::DCPS::StaticEndpointManager::participant_id_
private

◆ registry_

const EndpointRegistry& OpenDDS::DCPS::StaticEndpointManager::registry_
private

◆ topic_counter_

unsigned int OpenDDS::DCPS::StaticEndpointManager::topic_counter_
private

Definition at line 444 of file StaticDiscovery.h.

Referenced by make_topic_guid().

◆ topic_names_

TopicNameMap OpenDDS::DCPS::StaticEndpointManager::topic_names_
private

◆ topics_

TopicDetailsMap OpenDDS::DCPS::StaticEndpointManager::topics_
private

◆ type_lookup_reply_deadline_processor_

RcHandle<StaticEndpointManagerSporadic> OpenDDS::DCPS::StaticEndpointManager::type_lookup_reply_deadline_processor_
private

Definition at line 460 of file StaticDiscovery.h.

Referenced by type_lookup_fini(), and type_lookup_init().

◆ type_lookup_service_

XTypes::TypeLookupService_rch OpenDDS::DCPS::StaticEndpointManager::type_lookup_service_
private

Definition at line 458 of file StaticDiscovery.h.

Referenced by match_continue(), and type_lookup_service().

◆ type_lookup_service_sequence_number_

SequenceNumber OpenDDS::DCPS::StaticEndpointManager::type_lookup_service_sequence_number_
private

Definition at line 462 of file StaticDiscovery.h.


The documentation for this class was generated from the following files: