OpenDDS  Snapshot(2023/04/28-20:55)
BuiltInTopicUtils.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "BuiltInTopicUtils.h"
11 
13 #include "BitPubListenerImpl.h"
14 #include "Logging.h"
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
21 const char* const BUILT_IN_PARTICIPANT_TOPIC = "DCPSParticipant";
22 const char* const BUILT_IN_PARTICIPANT_TOPIC_TYPE = "PARTICIPANT_BUILT_IN_TOPIC_TYPE";
23 
24 const char* const BUILT_IN_TOPIC_TOPIC = "DCPSTopic";
25 const char* const BUILT_IN_TOPIC_TOPIC_TYPE = "TOPIC_BUILT_IN_TOPIC_TYPE";
26 
27 const char* const BUILT_IN_SUBSCRIPTION_TOPIC = "DCPSSubscription";
28 const char* const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE = "SUBSCRIPTION_BUILT_IN_TOPIC_TYPE";
29 
30 const char* const BUILT_IN_PUBLICATION_TOPIC = "DCPSPublication";
31 const char* const BUILT_IN_PUBLICATION_TOPIC_TYPE = "PUBLICATION_BUILT_IN_TOPIC_TYPE";
32 
33 const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC = "OpenDDSParticipantLocation";
34 const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE = "PARTICIPANT_LOCATION_BUILT_IN_TOPIC_TYPE";
35 
36 const char* const BUILT_IN_CONNECTION_RECORD_TOPIC = "OpenDDSConnectionRecord";
37 const char* const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE = "CONNECTION_RECORD_BUILT_IN_TOPIC_TYPE";
38 
39 const char* const BUILT_IN_INTERNAL_THREAD_TOPIC = "OpenDDSInternalThread";
40 const char* const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE = "INTERNAL_THREAD_BUILT_IN_TOPIC_TYPE";
41 
43  DDS::ViewStateKind view_state)
44 {
45  return add_i<ParticipantBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_TOPIC, part, view_state);
46 }
47 
49  DDS::InstanceHandle_t loc_ih)
50 {
51 #ifndef DDS_HAS_MINIMUM_BIT
52  remove_i(BUILT_IN_PARTICIPANT_TOPIC, part_ih);
53  remove_i(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc_ih);
54 #else
55  ACE_UNUSED_ARG(part_ih);
56  ACE_UNUSED_ARG(loc_ih);
57 #endif
58 }
59 
61  DDS::InstanceHandle_t participant_handle)
62 {
63 #ifndef DDS_HAS_MINIMUM_BIT
65 
66  if (!bit_subscriber_) {
67  return DDS::RETCODE_NO_DATA;
68  }
69 
70  DDS::SampleInfoSeq info;
71  DDS::ParticipantBuiltinTopicDataSeq data;
72  DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
73  DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
74  DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
75 
76  const DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
77  info,
78  1,
79  participant_handle,
83 
84  if (ret == DDS::RETCODE_OK) {
85  if (info[0].valid_data) {
86  participant_data = data[0];
87  } else {
88  return DDS::RETCODE_NO_DATA;
89  }
90  }
91 
92  return ret;
93 #else
94  ACE_UNUSED_ARG(participant_data);
95  ACE_UNUSED_ARG(participant_handle);
96  return DDS::RETCODE_NO_DATA;
97 #endif
98 }
99 
101  DDS::InstanceHandle_t topic_handle)
102 {
103 #ifndef DDS_HAS_MINIMUM_BIT
105 
106  if (!bit_subscriber_) {
107  return DDS::RETCODE_NO_DATA;
108  }
109 
110  DDS::SampleInfoSeq info;
111  DDS::TopicBuiltinTopicDataSeq data;
112  DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
113  DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
114  DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
115 
116  const DDS::ReturnCode_t ret = bit_topic_dr->read_instance(data,
117  info,
118  1,
119  topic_handle,
123 
124  if (ret == DDS::RETCODE_OK) {
125  if (info[0].valid_data) {
126  topic_data = data[0];
127  } else {
128  return DDS::RETCODE_NO_DATA;
129  }
130  }
131 
132  return ret;
133 #else
134  ACE_UNUSED_ARG(topic_data);
135  ACE_UNUSED_ARG(topic_handle);
136  return DDS::RETCODE_NO_DATA;
137 #endif
138 }
139 
141  DDS::ViewStateKind view_state)
142 {
143  return add_i<PublicationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PUBLICATION_TOPIC, pub, view_state);
144 }
145 
147 {
148  remove_i(BUILT_IN_PUBLICATION_TOPIC, pub_ih);
149 }
150 
152  DDS::ViewStateKind view_state)
153 {
154  return add_i<SubscriptionBuiltinTopicDataDataReaderImpl>(BUILT_IN_SUBSCRIPTION_TOPIC, sub, view_state);
155 }
156 
158 {
159  remove_i(BUILT_IN_SUBSCRIPTION_TOPIC, sub_ih);
160 }
161 
163  DDS::ViewStateKind view_state)
164 {
165  return add_i<ParticipantLocationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc, view_state);
166 }
167 
169  DDS::ViewStateKind view_state)
170 {
171  return add_i<ConnectionRecordDataReaderImpl>(BUILT_IN_CONNECTION_RECORD_TOPIC, cr, view_state);
172 }
173 
175 {
176 #ifndef DDS_HAS_MINIMUM_BIT
178 
179  if (!bit_subscriber_) {
180  return;
181  }
182 
183  DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_CONNECTION_RECORD_TOPIC);
184 
185  if (!d) {
186  return;
187  }
188 
190  if (!bit) {
191  return;
192  }
193 
195 #else
196  ACE_UNUSED_ARG(cr);
197 #endif
198 }
199 
201  DDS::ViewStateKind view_state,
202  const SystemTimePoint& timestamp)
203 {
204 #ifndef DDS_HAS_MINIMUM_BIT
206 
207  if (!bit_subscriber_) {
208  return DDS::HANDLE_NIL;
209  }
210 
211  DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
212  if (!d) {
213  return DDS::HANDLE_NIL;
214  }
215 
217  if (!bit) {
218  return DDS::HANDLE_NIL;
219  }
220 
221  return bit->store_synthetic_data(ts, view_state, timestamp);
222 #else
223  ACE_UNUSED_ARG(ts);
224  ACE_UNUSED_ARG(view_state);
225  ACE_UNUSED_ARG(timestamp);
226  return DDS::HANDLE_NIL;
227 #endif /* DDS_HAS_MINIMUM_BIT */
228 }
229 
231 {
232 #ifndef DDS_HAS_MINIMUM_BIT
234 
235  if (!bit_subscriber_) {
236  return;
237  }
238 
239  DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
240 
241  if (!d) {
242  return;
243  }
244 
246  if (!bit) {
247  return;
248  }
249 
251 #else
252  ACE_UNUSED_ARG(ts);
253 #endif
254 }
255 
257 {
258 #ifndef DDS_HAS_MINIMUM_BIT
260 
261  if (!bit_subscriber_) {
262  return;
263  }
264 
265  DDS::DataReader_var dr =
266  bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
267  DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
268  DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
269 
270  if (bit_pub_dr) {
271  DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
272  if (!listener) {
273  DDS::DataReaderListener_var bit_pub_listener =
274  new BitPubListenerImpl(participant);
275  bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
276  // Must call on_data_available when attaching a listener late - samples may be waiting
277  DataReaderImpl* reader = dynamic_cast<DataReaderImpl*>(bit_pub_dr.in());
278  if (!reader) {
279  return;
280  }
281  TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(bit_pub_listener, rchandle_from(reader), true, false, false));
282  }
283  }
284 #else
285  ACE_UNUSED_ARG(participant);
286 #endif
287 }
288 
289 template <typename DataReaderImpl, typename Sample>
291  const Sample& sample,
292  DDS::ViewStateKind view_state)
293 {
294 #ifndef DDS_HAS_MINIMUM_BIT
296 
297  if (!bit_subscriber_) {
298  if (log_bits) {
299  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ bit_subscriber_ is null for topic %C, returning nil\n", this, topic_name));
300  }
301  return DDS::HANDLE_NIL;
302  }
303 
304  DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
305  if (!d) {
306  if (log_bits) {
307  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DataReader is null for topic %C, returning nil\n", this, topic_name));
308  }
309  return DDS::HANDLE_NIL;
310  }
311 
312  DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
313  if (!bit) {
314  if (log_bits) {
315  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ dynamic_cast failed for topic %C, returning nil\n", this, topic_name));
316  }
317  return DDS::HANDLE_NIL;
318  }
319 
320  const DDS::InstanceHandle_t ih = bit->store_synthetic_data(sample, view_state);
321  if (log_bits) {
322  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ returning instance handle %d for topic %C\n", this, ih, topic_name));
323  }
324  return ih;
325 #else
326  if (log_bits) {
327  ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DDS_HAS_MINIMUM_BIT is not defined, returning nil\n", this, topic_name));
328  }
329  ACE_UNUSED_ARG(sample);
330  ACE_UNUSED_ARG(view_state);
331  return DDS::HANDLE_NIL;
332 #endif /* DDS_HAS_MINIMUM_BIT */
333 }
334 
335 void BitSubscriber::remove_i(const char* topic_name,
337 {
338 #ifndef DDS_HAS_MINIMUM_BIT
339  if (ih != DDS::HANDLE_NIL) {
341 
342  if (!bit_subscriber_) {
343  return;
344  }
345 
346  DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
347  if (!d) {
348  return;
349  }
350 
351  DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
352  if (!bit) {
353  return;
354  }
356  }
357 #else
358  ACE_UNUSED_ARG(topic_name);
359  ACE_UNUSED_ARG(ih);
360 #endif
361 }
362 
363 } // namespace DCPS
364 } // namespace OpenDDS
365 
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)
const char *const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE
DDS::InstanceHandle_t add_participant(const DDS::ParticipantBuiltinTopicData &part, DDS::ViewStateKind view_state)
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void remove_subscription(DDS::InstanceHandle_t sub_ih)
const InstanceHandle_t HANDLE_NIL
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE
const char *const BUILT_IN_CONNECTION_RECORD_TOPIC
sequence< SampleInfo > SampleInfoSeq
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
const SampleStateMask ANY_SAMPLE_STATE
void bit_pub_listener_hack(DomainParticipantImpl *participant)
DDS::Subscriber_var bit_subscriber_
void remove_connection_record(const ConnectionRecord &cr)
DDS::InstanceHandle_t add_thread_status(const InternalThreadBuiltinTopicData &ts, DDS::ViewStateKind view_state, const SystemTimePoint &timestamp)
DDS::InstanceHandle_t add_i(const char *topic_name, const Sample &sample, DDS::ViewStateKind view_state)
const char *const BUILT_IN_PUBLICATION_TOPIC
LM_DEBUG
void remove_i(const char *topic_name, DDS::InstanceHandle_t ih)
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE
void remove_participant(DDS::InstanceHandle_t part_ih, DDS::InstanceHandle_t loc_ih)
DDS::ReturnCode_t get_discovered_participant_data(DDS::ParticipantBuiltinTopicData &participant_data, DDS::InstanceHandle_t participant_handle)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const StatusKind DATA_AVAILABLE_STATUS
const InstanceStateMask ANY_INSTANCE_STATE
Implements the DDS::DataReader interface.
const ViewStateMask ANY_VIEW_STATE
bool log_bits
Definition: Logging.cpp:18
DDS::InstanceHandle_t add_subscription(const DDS::SubscriptionBuiltinTopicData &sub, DDS::ViewStateKind view_state)
const char *const BUILT_IN_PARTICIPANT_TOPIC
const char *const BUILT_IN_PARTICIPANT_LOCATION_TOPIC
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const char *const BUILT_IN_PARTICIPANT_TOPIC_TYPE
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
const ReturnCode_t RETCODE_NO_DATA
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC
const char *const BUILT_IN_TOPIC_TOPIC
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::InstanceHandle_t add_publication(const DDS::PublicationBuiltinTopicData &pub, DDS::ViewStateKind view_state)
const ReturnCode_t RETCODE_OK
DDS::InstanceHandle_t store_synthetic_data(const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
unsigned long ViewStateKind
void remove_thread_status(const InternalThreadBuiltinTopicData &ts)
DDS::InstanceHandle_t add_participant_location(const ParticipantLocationBuiltinTopicData &loc, DDS::ViewStateKind view_state)
const char *const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE
#define TheServiceParticipant
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DDS::InstanceHandle_t add_connection_record(const ConnectionRecord &cr, DDS::ViewStateKind view_state)
const char *const BUILT_IN_TOPIC_TOPIC_TYPE
const char *const BUILT_IN_PUBLICATION_TOPIC_TYPE
DDS::ReturnCode_t get_discovered_topic_data(DDS::TopicBuiltinTopicData &topic_data, DDS::InstanceHandle_t topic_handle)
void remove_publication(DDS::InstanceHandle_t pub_ih)