Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 :
10 : #include "BuiltInTopicUtils.h"
11 :
12 : #include "BuiltInTopicDataReaderImpls.h"
13 : #include "BitPubListenerImpl.h"
14 : #include "Logging.h"
15 :
16 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
17 :
18 : namespace OpenDDS {
19 : namespace DCPS {
20 :
21 : const char* const BUILT_IN_PARTICIPANT_TOPIC = "DCPSParticipant";
22 : const char* const BUILT_IN_PARTICIPANT_TOPIC_TYPE = "PARTICIPANT_BUILT_IN_TOPIC_TYPE";
23 :
24 : const char* const BUILT_IN_TOPIC_TOPIC = "DCPSTopic";
25 : const char* const BUILT_IN_TOPIC_TOPIC_TYPE = "TOPIC_BUILT_IN_TOPIC_TYPE";
26 :
27 : const char* const BUILT_IN_SUBSCRIPTION_TOPIC = "DCPSSubscription";
28 : const char* const BUILT_IN_SUBSCRIPTION_TOPIC_TYPE = "SUBSCRIPTION_BUILT_IN_TOPIC_TYPE";
29 :
30 : const char* const BUILT_IN_PUBLICATION_TOPIC = "DCPSPublication";
31 : const char* const BUILT_IN_PUBLICATION_TOPIC_TYPE = "PUBLICATION_BUILT_IN_TOPIC_TYPE";
32 :
33 : const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC = "OpenDDSParticipantLocation";
34 : const char* const BUILT_IN_PARTICIPANT_LOCATION_TOPIC_TYPE = "PARTICIPANT_LOCATION_BUILT_IN_TOPIC_TYPE";
35 :
36 : const char* const BUILT_IN_CONNECTION_RECORD_TOPIC = "OpenDDSConnectionRecord";
37 : const char* const BUILT_IN_CONNECTION_RECORD_TOPIC_TYPE = "CONNECTION_RECORD_BUILT_IN_TOPIC_TYPE";
38 :
39 : const char* const BUILT_IN_INTERNAL_THREAD_TOPIC = "OpenDDSInternalThread";
40 : const char* const BUILT_IN_INTERNAL_THREAD_TOPIC_TYPE = "INTERNAL_THREAD_BUILT_IN_TOPIC_TYPE";
41 :
42 0 : DDS::InstanceHandle_t BitSubscriber::add_participant(const DDS::ParticipantBuiltinTopicData& part,
43 : DDS::ViewStateKind view_state)
44 : {
45 0 : return add_i<ParticipantBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_TOPIC, part, view_state);
46 : }
47 :
48 0 : void BitSubscriber::remove_participant(DDS::InstanceHandle_t part_ih,
49 : DDS::InstanceHandle_t loc_ih)
50 : {
51 : #ifndef DDS_HAS_MINIMUM_BIT
52 0 : remove_i(BUILT_IN_PARTICIPANT_TOPIC, part_ih);
53 0 : remove_i(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc_ih);
54 : #else
55 : ACE_UNUSED_ARG(part_ih);
56 : ACE_UNUSED_ARG(loc_ih);
57 : #endif
58 0 : }
59 :
60 0 : DDS::ReturnCode_t BitSubscriber::get_discovered_participant_data(DDS::ParticipantBuiltinTopicData& participant_data,
61 : DDS::InstanceHandle_t participant_handle)
62 : {
63 : #ifndef DDS_HAS_MINIMUM_BIT
64 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::RETCODE_NO_DATA);
65 :
66 0 : if (!bit_subscriber_) {
67 0 : return DDS::RETCODE_NO_DATA;
68 : }
69 :
70 0 : DDS::SampleInfoSeq info;
71 0 : DDS::ParticipantBuiltinTopicDataSeq data;
72 0 : DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_PARTICIPANT_TOPIC);
73 : DDS::ParticipantBuiltinTopicDataDataReader_var bit_part_dr =
74 0 : DDS::ParticipantBuiltinTopicDataDataReader::_narrow(dr);
75 :
76 0 : const DDS::ReturnCode_t ret = bit_part_dr->read_instance(data,
77 : info,
78 : 1,
79 : participant_handle,
80 : DDS::ANY_SAMPLE_STATE,
81 : DDS::ANY_VIEW_STATE,
82 : DDS::ANY_INSTANCE_STATE);
83 :
84 0 : if (ret == DDS::RETCODE_OK) {
85 0 : if (info[0].valid_data) {
86 0 : participant_data = data[0];
87 : } else {
88 0 : return DDS::RETCODE_NO_DATA;
89 : }
90 : }
91 :
92 0 : return ret;
93 : #else
94 : ACE_UNUSED_ARG(participant_data);
95 : ACE_UNUSED_ARG(participant_handle);
96 : return DDS::RETCODE_NO_DATA;
97 : #endif
98 0 : }
99 :
100 0 : DDS::ReturnCode_t BitSubscriber::get_discovered_topic_data(DDS::TopicBuiltinTopicData& topic_data,
101 : DDS::InstanceHandle_t topic_handle)
102 : {
103 : #ifndef DDS_HAS_MINIMUM_BIT
104 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::RETCODE_NO_DATA);
105 :
106 0 : if (!bit_subscriber_) {
107 0 : return DDS::RETCODE_NO_DATA;
108 : }
109 :
110 0 : DDS::SampleInfoSeq info;
111 0 : DDS::TopicBuiltinTopicDataSeq data;
112 0 : DDS::DataReader_var dr = bit_subscriber_->lookup_datareader(BUILT_IN_TOPIC_TOPIC);
113 : DDS::TopicBuiltinTopicDataDataReader_var bit_topic_dr =
114 0 : DDS::TopicBuiltinTopicDataDataReader::_narrow(dr);
115 :
116 0 : const DDS::ReturnCode_t ret = bit_topic_dr->read_instance(data,
117 : info,
118 : 1,
119 : topic_handle,
120 : DDS::ANY_SAMPLE_STATE,
121 : DDS::ANY_VIEW_STATE,
122 : DDS::ANY_INSTANCE_STATE);
123 :
124 0 : if (ret == DDS::RETCODE_OK) {
125 0 : if (info[0].valid_data) {
126 0 : topic_data = data[0];
127 : } else {
128 0 : return DDS::RETCODE_NO_DATA;
129 : }
130 : }
131 :
132 0 : return ret;
133 : #else
134 : ACE_UNUSED_ARG(topic_data);
135 : ACE_UNUSED_ARG(topic_handle);
136 : return DDS::RETCODE_NO_DATA;
137 : #endif
138 0 : }
139 :
140 0 : DDS::InstanceHandle_t BitSubscriber::add_publication(const DDS::PublicationBuiltinTopicData& pub,
141 : DDS::ViewStateKind view_state)
142 : {
143 0 : return add_i<PublicationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PUBLICATION_TOPIC, pub, view_state);
144 : }
145 :
146 0 : void BitSubscriber::remove_publication(DDS::InstanceHandle_t pub_ih)
147 : {
148 0 : remove_i(BUILT_IN_PUBLICATION_TOPIC, pub_ih);
149 0 : }
150 :
151 0 : DDS::InstanceHandle_t BitSubscriber::add_subscription(const DDS::SubscriptionBuiltinTopicData& sub,
152 : DDS::ViewStateKind view_state)
153 : {
154 0 : return add_i<SubscriptionBuiltinTopicDataDataReaderImpl>(BUILT_IN_SUBSCRIPTION_TOPIC, sub, view_state);
155 : }
156 :
157 0 : void BitSubscriber::remove_subscription(DDS::InstanceHandle_t sub_ih)
158 : {
159 0 : remove_i(BUILT_IN_SUBSCRIPTION_TOPIC, sub_ih);
160 0 : }
161 :
162 0 : DDS::InstanceHandle_t BitSubscriber::add_participant_location(const ParticipantLocationBuiltinTopicData& loc,
163 : DDS::ViewStateKind view_state)
164 : {
165 0 : return add_i<ParticipantLocationBuiltinTopicDataDataReaderImpl>(BUILT_IN_PARTICIPANT_LOCATION_TOPIC, loc, view_state);
166 : }
167 :
168 0 : DDS::InstanceHandle_t BitSubscriber::add_connection_record(const ConnectionRecord& cr,
169 : DDS::ViewStateKind view_state)
170 : {
171 0 : return add_i<ConnectionRecordDataReaderImpl>(BUILT_IN_CONNECTION_RECORD_TOPIC, cr, view_state);
172 : }
173 :
174 0 : void BitSubscriber::remove_connection_record(const ConnectionRecord& cr)
175 : {
176 : #ifndef DDS_HAS_MINIMUM_BIT
177 0 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
178 :
179 0 : if (!bit_subscriber_) {
180 0 : return;
181 : }
182 :
183 0 : DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_CONNECTION_RECORD_TOPIC);
184 :
185 0 : if (!d) {
186 0 : return;
187 : }
188 :
189 0 : ConnectionRecordDataReaderImpl* bit = dynamic_cast<ConnectionRecordDataReaderImpl*>(d.in());
190 0 : if (!bit) {
191 0 : return;
192 : }
193 :
194 0 : bit->set_instance_state(bit->lookup_instance(cr), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
195 : #else
196 : ACE_UNUSED_ARG(cr);
197 : #endif
198 0 : }
199 :
200 0 : DDS::InstanceHandle_t BitSubscriber::add_thread_status(const InternalThreadBuiltinTopicData& ts,
201 : DDS::ViewStateKind view_state,
202 : const SystemTimePoint& timestamp)
203 : {
204 : #ifndef DDS_HAS_MINIMUM_BIT
205 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::HANDLE_NIL);
206 :
207 0 : if (!bit_subscriber_) {
208 0 : return DDS::HANDLE_NIL;
209 : }
210 :
211 0 : DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
212 0 : if (!d) {
213 0 : return DDS::HANDLE_NIL;
214 : }
215 :
216 0 : InternalThreadBuiltinTopicDataDataReaderImpl* bit = dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
217 0 : if (!bit) {
218 0 : return DDS::HANDLE_NIL;
219 : }
220 :
221 0 : return bit->store_synthetic_data(ts, view_state, timestamp);
222 : #else
223 : ACE_UNUSED_ARG(ts);
224 : ACE_UNUSED_ARG(view_state);
225 : ACE_UNUSED_ARG(timestamp);
226 : return DDS::HANDLE_NIL;
227 : #endif /* DDS_HAS_MINIMUM_BIT */
228 0 : }
229 :
230 0 : void BitSubscriber::remove_thread_status(const InternalThreadBuiltinTopicData& ts)
231 : {
232 : #ifndef DDS_HAS_MINIMUM_BIT
233 0 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
234 :
235 0 : if (!bit_subscriber_) {
236 0 : return;
237 : }
238 :
239 0 : DDS::DataReader_var d = bit_subscriber_->lookup_datareader(BUILT_IN_INTERNAL_THREAD_TOPIC);
240 :
241 0 : if (!d) {
242 0 : return;
243 : }
244 :
245 0 : InternalThreadBuiltinTopicDataDataReaderImpl* bit = dynamic_cast<InternalThreadBuiltinTopicDataDataReaderImpl*>(d.in());
246 0 : if (!bit) {
247 0 : return;
248 : }
249 :
250 0 : bit->set_instance_state(bit->lookup_instance(ts), DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
251 : #else
252 : ACE_UNUSED_ARG(ts);
253 : #endif
254 0 : }
255 :
256 0 : void BitSubscriber::bit_pub_listener_hack(DomainParticipantImpl* participant)
257 : {
258 : #ifndef DDS_HAS_MINIMUM_BIT
259 0 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
260 :
261 0 : if (!bit_subscriber_) {
262 0 : return;
263 : }
264 :
265 : DDS::DataReader_var dr =
266 0 : bit_subscriber_->lookup_datareader(BUILT_IN_PUBLICATION_TOPIC);
267 : DDS::PublicationBuiltinTopicDataDataReader_var bit_pub_dr =
268 0 : DDS::PublicationBuiltinTopicDataDataReader::_narrow(dr);
269 :
270 0 : if (bit_pub_dr) {
271 0 : DDS::DataReaderListener_var listener = bit_pub_dr->get_listener();
272 0 : if (!listener) {
273 : DDS::DataReaderListener_var bit_pub_listener =
274 0 : new BitPubListenerImpl(participant);
275 0 : bit_pub_dr->set_listener(bit_pub_listener, DDS::DATA_AVAILABLE_STATUS);
276 : // Must call on_data_available when attaching a listener late - samples may be waiting
277 0 : DataReaderImpl* reader = dynamic_cast<DataReaderImpl*>(bit_pub_dr.in());
278 0 : if (!reader) {
279 0 : return;
280 : }
281 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(bit_pub_listener, rchandle_from(reader), true, false, false));
282 0 : }
283 0 : }
284 : #else
285 : ACE_UNUSED_ARG(participant);
286 : #endif
287 0 : }
288 :
289 : template <typename DataReaderImpl, typename Sample>
290 0 : DDS::InstanceHandle_t BitSubscriber::add_i(const char* topic_name,
291 : const Sample& sample,
292 : DDS::ViewStateKind view_state)
293 : {
294 : #ifndef DDS_HAS_MINIMUM_BIT
295 0 : ACE_GUARD_RETURN(ACE_Thread_Mutex, g, mutex_, DDS::HANDLE_NIL);
296 :
297 0 : if (!bit_subscriber_) {
298 0 : if (log_bits) {
299 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ bit_subscriber_ is null for topic %C, returning nil\n", this, topic_name));
300 : }
301 0 : return DDS::HANDLE_NIL;
302 : }
303 :
304 0 : DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
305 0 : if (!d) {
306 0 : if (log_bits) {
307 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DataReader is null for topic %C, returning nil\n", this, topic_name));
308 : }
309 0 : return DDS::HANDLE_NIL;
310 : }
311 :
312 0 : DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
313 0 : if (!bit) {
314 0 : if (log_bits) {
315 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ dynamic_cast failed for topic %C, returning nil\n", this, topic_name));
316 : }
317 0 : return DDS::HANDLE_NIL;
318 : }
319 :
320 0 : const DDS::InstanceHandle_t ih = bit->store_synthetic_data(sample, view_state);
321 0 : if (log_bits) {
322 0 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ returning instance handle %d for topic %C\n", this, ih, topic_name));
323 : }
324 0 : return ih;
325 : #else
326 : if (log_bits) {
327 : ACE_DEBUG((LM_DEBUG, "(%P|%t) DEBUG: BitSubscriber::add_i: %@ DDS_HAS_MINIMUM_BIT is not defined, returning nil\n", this, topic_name));
328 : }
329 : ACE_UNUSED_ARG(sample);
330 : ACE_UNUSED_ARG(view_state);
331 : return DDS::HANDLE_NIL;
332 : #endif /* DDS_HAS_MINIMUM_BIT */
333 0 : }
334 :
335 0 : void BitSubscriber::remove_i(const char* topic_name,
336 : DDS::InstanceHandle_t ih)
337 : {
338 : #ifndef DDS_HAS_MINIMUM_BIT
339 0 : if (ih != DDS::HANDLE_NIL) {
340 0 : ACE_GUARD(ACE_Thread_Mutex, g, mutex_);
341 :
342 0 : if (!bit_subscriber_) {
343 0 : return;
344 : }
345 :
346 0 : DDS::DataReader_var d = bit_subscriber_->lookup_datareader(topic_name);
347 0 : if (!d) {
348 0 : return;
349 : }
350 :
351 0 : DataReaderImpl* bit = dynamic_cast<DataReaderImpl*>(d.in());
352 0 : if (!bit) {
353 0 : return;
354 : }
355 0 : bit->set_instance_state(ih, DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE);
356 0 : }
357 : #else
358 : ACE_UNUSED_ARG(topic_name);
359 : ACE_UNUSED_ARG(ih);
360 : #endif
361 : }
362 :
363 : } // namespace DCPS
364 : } // namespace OpenDDS
365 :
366 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|