OpenDDS  Snapshot(2023/04/28-20:55)
SubscriberImpl.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_SUBSCRIBERIMPL_H
7 #define OPENDDS_DCPS_SUBSCRIBERIMPL_H
8 
9 #include "DataReaderCallbacks.h"
10 #include "EntityImpl.h"
11 #include "Definitions.h"
12 #include "DataCollector_T.h"
13 #include "DataReaderImpl.h"
14 #include "PoolAllocator.h"
15 
16 #include <dds/DdsDcpsInfoUtilsC.h>
17 
19 
20 #ifndef ACE_LACKS_PRAGMA_ONCE
21 # pragma once
22 #endif
23 
25 
26 namespace OpenDDS {
27 namespace DCPS {
28 
29 class DomainParticipantImpl;
30 class Monitor;
31 
32 #ifndef OPENDDS_NO_MULTI_TOPIC
33 class MultiTopicImpl;
34 #endif
35 
37  : public virtual OpenDDS::DCPS::LocalObject<DDS::Subscriber>
38  , public virtual EntityImpl {
39 public:
40 
42  const DDS::SubscriberQos& qos,
43  DDS::SubscriberListener_ptr a_listener,
44  const DDS::StatusMask& mask,
45  DomainParticipantImpl* participant);
46 
47  virtual ~SubscriberImpl();
48 
49  virtual DDS::InstanceHandle_t get_instance_handle();
50 
51  bool contains_reader(DDS::InstanceHandle_t a_handle);
52 
53  virtual DDS::DataReader_ptr create_datareader(
54  DDS::TopicDescription_ptr a_topic_desc,
55  const DDS::DataReaderQos& qos,
56  DDS::DataReaderListener_ptr a_listener,
57  DDS::StatusMask mask);
58 
59  virtual DDS::ReturnCode_t delete_datareader(
60  DDS::DataReader_ptr a_datareader);
61 
62  virtual DDS::ReturnCode_t delete_contained_entities();
63 
64  virtual DDS::DataReader_ptr lookup_datareader(
65  const char* topic_name);
66 
67  virtual DDS::ReturnCode_t get_datareaders(
68  DDS::DataReaderSeq& readers,
72 
73  virtual DDS::ReturnCode_t notify_datareaders();
74 
75  virtual DDS::ReturnCode_t set_qos(
76  const DDS::SubscriberQos& qos);
77 
78  virtual DDS::ReturnCode_t get_qos(
79  DDS::SubscriberQos& qos);
80 
81  virtual DDS::ReturnCode_t set_listener(
82  DDS::SubscriberListener_ptr a_listener,
83  DDS::StatusMask mask);
84 
85  virtual DDS::SubscriberListener_ptr get_listener();
86 
87 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
88 
89  virtual DDS::ReturnCode_t begin_access();
90 
91  virtual DDS::ReturnCode_t end_access();
92 
93 #endif
94 
95  virtual DDS::DomainParticipant_ptr get_participant();
96 
97  virtual DDS::ReturnCode_t set_default_datareader_qos(
98  const DDS::DataReaderQos& qos);
99 
100  virtual DDS::ReturnCode_t get_default_datareader_qos(
101  DDS::DataReaderQos& qos);
102 
103  virtual DDS::ReturnCode_t copy_from_topic_qos(
104  DDS::DataReaderQos& a_datareader_qos,
105  const DDS::TopicQos& a_topic_qos);
106 
107  virtual DDS::ReturnCode_t enable();
108 
109  /**
110  * This method is not defined in the IDL and is defined for
111  * internal use.
112  * Check if there is any datareader associated with it.
113  */
114  bool is_clean(String* leftover_entities = 0) const;
115 
116  // called by DataReaderImpl::data_received
117  void data_received(DataReaderImpl* reader);
118 
119  DDS::ReturnCode_t reader_enabled(const char* topic_name,
120  DataReaderImpl* reader);
121 
122 #ifndef OPENDDS_NO_MULTI_TOPIC
123  DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader);
124  void remove_from_datareader_set(DataReaderImpl* reader);
125 #endif
126 
127  DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind);
128 
129  /// @name Raw Latency Statistics Configuration Interfaces
130  /// @{
131 
132  /// Configure the size of the raw data collection buffer.
133  unsigned int& raw_latency_buffer_size();
134 
135  /// Configure the type of the raw data collection buffer.
136  DataCollector<double>::OnFull& raw_latency_buffer_type();
137 
138  /// @}
139 
140  typedef OPENDDS_VECTOR(GUID_t) SubscriptionIdVec;
141  /// Populates a std::vector with the SubscriptionIds (GUIDs)
142  /// of this Subscriber's Data Readers
143  void get_subscription_ids(SubscriptionIdVec& subs);
144 
145 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
146  void update_ownership_strength (const GUID_t& pub_id,
147  const CORBA::Long& ownership_strength);
148 #endif
149 
150 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
151  void coherent_change_received(const GUID_t& publisher_id,
152  DataReaderImpl* reader,
153  Coherent_State& group_state);
154 #endif
155 
156  virtual RcHandle<EntityImpl> parent() const;
157 
158  static bool validate_datareader_qos(const DDS::DataReaderQos & qos,
159  const DDS::DataReaderQos & default_qos,
160  DDS::Topic_ptr a_topic,
161  DDS::DataReaderQos & result_qos, bool mt);
162 
163 private:
164 
165  /// Keep track of all the DataReaders attached to this
166  /// Subscriber: key is the topic_name
167  typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap;
168 
169  /// Keep track of DataReaders with data
170  /// std::set for now, want to encapsulate
171  /// this so we can switch between a set or
172  /// list depending on Presentation QoS.
173  typedef OPENDDS_SET(DataReaderImpl_rch) DataReaderSet;
174 
175  /// DataReader id to qos map.
177 
179 
182 
185  DDS::SubscriberListener_var listener_;
186 
187  DataReaderSet readers_not_enabled_;
188  DataReaderMap datareader_map_;
189  DataReaderSet datareader_set_;
190 
191 #ifndef OPENDDS_NO_MULTI_TOPIC
192  typedef OPENDDS_MAP(String, DDS::DataReader_var) MultitopicReaderMap;
193  MultitopicReaderMap multitopic_reader_map_;
194 #endif
195 
197 
200 
201  /// Bound (or initial reservation) of raw latency buffers.
203 
204  /// Type of raw latency data buffers.
206 
207  /// This lock protects datareader_set_. Only this lock needs to
208  /// be acquired if only datareader_set_ is accessed.
210 
211  /// General lock protects the data structures in this class.
212  /// If datareader_set_ is accessed together with other data members,
213  /// acquire dr_set_lock_ in the scope of this lock.
215 
216  /// Monitor object for this entity
218 
220 };
221 
222 } // namespace DCPS
223 } // namespace OpenDDS
224 
226 
227 #endif /* OPENDDS_DCPS_SUBSCRIBER_H */
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
ACE_CDR::Long Long
std::string String
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.
DDS::DataReaderQos default_datareader_qos_
#define OPENDDS_MULTIMAP(K, T)
unsigned long InstanceStateMask
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
DOMAINID_TYPE_NATIVE DomainId_t
Implements the DDS::DataReader interface.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.
ACE_Recursive_Thread_Mutex si_lock_
DDS::SubscriberListener_var listener_
DDS::InstanceHandle_t handle_
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
unsigned long StatusMask
WeakRcHandle< DomainParticipantImpl > participant_
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
unsigned long StatusKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
sequence< DataReader > DataReaderSeq
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
ACE_Recursive_Thread_Mutex dr_set_lock_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
ACE_Thread_Mutex listener_mutex_
typedef OPENDDS_SET(NetworkAddress) AddrSet
MultitopicReaderMap multitopic_reader_map_