Line data Source code
1 : #ifndef OPENDDS_DCPS_DATAREADERIMPL_T_H
2 : #define OPENDDS_DCPS_DATAREADERIMPL_T_H
3 :
4 : #include <ace/config-lite.h>
5 :
6 : #ifdef ACE_HAS_CPP11
7 : # define OPENDDS_HAS_STD_SHARED_PTR
8 : #endif
9 :
10 : #include "MultiTopicImpl.h"
11 : #include "RakeResults_T.h"
12 : #include "SubscriberImpl.h"
13 : #include "BuiltInTopicUtils.h"
14 : #include "Util.h"
15 : #include "TypeSupportImpl.h"
16 : #include "dcps_export.h"
17 : #include "GuidConverter.h"
18 : #include "XTypes/DynamicDataAdapter.h"
19 :
20 : #ifndef OPENDDS_HAS_STD_SHARED_PTR
21 : # include <ace/Bound_Ptr.h>
22 : #endif
23 : #include <ace/Time_Value.h>
24 :
25 : #ifndef OPENDDS_HAS_STD_SHARED_PTR
26 : # include <memory>
27 : #endif
28 :
29 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
30 :
31 : namespace OpenDDS {
32 : namespace DCPS {
33 :
34 : /** Servant for DataReader interface of Traits::MessageType data type.
35 : *
36 : * See the DDS specification, OMG formal/2015-04-10, for a description of
37 : * this interface.
38 : *
39 : */
40 : template <typename MessageType>
41 : class
42 : #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
43 : OpenDDS_Dcps_Export
44 : #endif
45 : DataReaderImpl_T
46 : : public virtual LocalObject<typename DDSTraits<MessageType>::DataReaderType>
47 : , public virtual DataReaderImpl
48 : {
49 : public:
50 : typedef DDSTraits<MessageType> TraitsType;
51 : typedef MarshalTraits<MessageType> MarshalTraitsType;
52 : typedef typename TraitsType::MessageSequenceType MessageSequenceType;
53 :
54 : typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
55 : typename TraitsType::LessThanType) InstanceMap;
56 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, typename InstanceMap::iterator) ReverseInstanceMap;
57 :
58 : class SharedInstanceMap
59 : : public virtual RcObject
60 : , public InstanceMap
61 : {
62 : };
63 :
64 : typedef RcHandle<SharedInstanceMap> SharedInstanceMap_rch;
65 :
66 : typedef typename TraitsType::DataReaderType Interface;
67 :
68 0 : CORBA::Boolean _is_a(const char* type_id)
69 : {
70 0 : return Interface::_is_a(type_id);
71 : }
72 :
73 0 : const char* _interface_repository_id() const
74 : {
75 0 : return Interface::_interface_repository_id();
76 : }
77 :
78 0 : CORBA::Boolean marshal(TAO_OutputCDR&)
79 : {
80 0 : return false;
81 : }
82 :
83 : // work around "hides overloaded virtual" warnings when MessageType=DynamicSample
84 : using Interface::read_next_sample;
85 : using Interface::take_next_sample;
86 : using Interface::lookup_instance;
87 : using Interface::get_key_value;
88 :
89 : class MessageTypeWithAllocator
90 : : public MessageType
91 : , public EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>
92 : {
93 : public:
94 : void* operator new(size_t size, ACE_New_Allocator& pool);
95 : void operator delete(void* memory, ACE_New_Allocator& pool);
96 : void operator delete(void* memory);
97 :
98 0 : MessageTypeWithAllocator(){}
99 0 : MessageTypeWithAllocator(const MessageType& other)
100 0 : : MessageType(other)
101 : {
102 0 : }
103 :
104 0 : const MessageType* message() const { return this; }
105 :
106 : #ifndef OPENDDS_HAS_STD_UNIQUE_PTR
107 : using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::_remove_ref;
108 : using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::_add_ref;
109 : using EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>::ref_count;
110 : #endif
111 : };
112 :
113 : struct MessageTypeMemoryBlock {
114 : MessageTypeWithAllocator element_;
115 : ACE_New_Allocator* allocator_;
116 : };
117 :
118 : typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow<MessageTypeMemoryBlock, ACE_Thread_Mutex> DataAllocator;
119 :
120 0 : DataReaderImpl_T()
121 0 : : filter_delayed_sample_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl_T::filter_delayed))
122 0 : , marshal_skip_serialize_(false)
123 : {
124 0 : initialize_lookup_maps();
125 0 : }
126 :
127 0 : virtual ~DataReaderImpl_T()
128 : {
129 0 : filter_delayed_sample_task_->cancel();
130 :
131 0 : for (typename InstanceMap::iterator it = instance_map_.begin();
132 0 : it != instance_map_.end(); ++it)
133 : {
134 0 : OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
135 0 : if (!ptr) continue;
136 0 : purge_data(ptr);
137 : }
138 : //X SHH release the data samples in the instance_map_.
139 0 : }
140 :
141 : /**
142 : * Do parts of enable specific to the datatype.
143 : * Called by DataReaderImpl::enable().
144 : */
145 0 : virtual DDS::ReturnCode_t enable_specific ()
146 : {
147 0 : data_allocator().reset(new DataAllocator(get_n_chunks ()));
148 0 : if (OpenDDS::DCPS::DCPS_debug_level >= 2)
149 0 : ACE_DEBUG((LM_DEBUG,
150 : ACE_TEXT("(%P|%t) %CDataReaderImpl::")
151 : ACE_TEXT("enable_specific-data")
152 : ACE_TEXT(" Cached_Allocator_With_Overflow ")
153 : ACE_TEXT("%x with %d chunks\n"),
154 : TraitsType::type_name(),
155 : data_allocator().get(),
156 : get_n_chunks ()));
157 :
158 0 : return DDS::RETCODE_OK;
159 : }
160 :
161 0 : virtual DDS::ReturnCode_t read (
162 : MessageSequenceType & received_data,
163 : DDS::SampleInfoSeq & info_seq,
164 : ::CORBA::Long max_samples,
165 : DDS::SampleStateMask sample_states,
166 : DDS::ViewStateMask view_states,
167 : DDS::InstanceStateMask instance_states)
168 : {
169 : DDS::ReturnCode_t const precond =
170 0 : check_inputs("read", received_data, info_seq, max_samples);
171 0 : if (DDS::RETCODE_OK != precond)
172 : {
173 0 : return precond;
174 : }
175 :
176 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
177 : guard,
178 : sample_lock_,
179 : DDS::RETCODE_ERROR);
180 :
181 0 : return read_i(received_data, info_seq, max_samples, sample_states,
182 0 : view_states, instance_states, 0);
183 0 : }
184 :
185 0 : virtual DDS::ReturnCode_t take (
186 : MessageSequenceType & received_data,
187 : DDS::SampleInfoSeq & info_seq,
188 : ::CORBA::Long max_samples,
189 : DDS::SampleStateMask sample_states,
190 : DDS::ViewStateMask view_states,
191 : DDS::InstanceStateMask instance_states)
192 : {
193 : DDS::ReturnCode_t const precond =
194 0 : check_inputs("take", received_data, info_seq, max_samples);
195 0 : if (DDS::RETCODE_OK != precond)
196 : {
197 0 : return precond;
198 : }
199 :
200 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
201 : guard,
202 : sample_lock_,
203 : DDS::RETCODE_ERROR);
204 :
205 0 : return take_i(received_data, info_seq, max_samples, sample_states,
206 0 : view_states, instance_states, 0);
207 0 : }
208 :
209 0 : virtual DDS::ReturnCode_t read_w_condition (
210 : MessageSequenceType & received_data,
211 : DDS::SampleInfoSeq & sample_info,
212 : ::CORBA::Long max_samples,
213 : DDS::ReadCondition_ptr a_condition)
214 : {
215 : DDS::ReturnCode_t const precond =
216 0 : check_inputs("read_w_condition", received_data, sample_info, max_samples);
217 0 : if (DDS::RETCODE_OK != precond)
218 : {
219 0 : return precond;
220 : }
221 :
222 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
223 : DDS::RETCODE_ERROR);
224 :
225 0 : if (!has_readcondition(a_condition))
226 : {
227 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
228 : }
229 :
230 0 : return read_i(received_data, sample_info, max_samples,
231 0 : a_condition->get_sample_state_mask(),
232 0 : a_condition->get_view_state_mask(),
233 0 : a_condition->get_instance_state_mask(),
234 : #ifndef OPENDDS_NO_QUERY_CONDITION
235 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
236 : #else
237 : 0);
238 : #endif
239 0 : }
240 :
241 0 : virtual DDS::ReturnCode_t take_w_condition (
242 : MessageSequenceType & received_data,
243 : DDS::SampleInfoSeq & sample_info,
244 : ::CORBA::Long max_samples,
245 : DDS::ReadCondition_ptr a_condition)
246 : {
247 : DDS::ReturnCode_t const precond =
248 0 : check_inputs("take_w_condition", received_data, sample_info, max_samples);
249 0 : if (DDS::RETCODE_OK != precond)
250 : {
251 0 : return precond;
252 : }
253 :
254 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
255 : DDS::RETCODE_ERROR);
256 :
257 0 : if (!has_readcondition(a_condition))
258 : {
259 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
260 : }
261 :
262 0 : return take_i(received_data, sample_info, max_samples,
263 0 : a_condition->get_sample_state_mask(),
264 0 : a_condition->get_view_state_mask(),
265 0 : a_condition->get_instance_state_mask(),
266 : #ifndef OPENDDS_NO_QUERY_CONDITION
267 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
268 : #else
269 : 0
270 : #endif
271 0 : );
272 0 : }
273 :
274 0 : virtual DDS::ReturnCode_t read_next_sample(MessageType& received_data,
275 : DDS::SampleInfo& sample_info_ref)
276 : {
277 0 : bool found_data = false;
278 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
279 :
280 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
281 :
282 0 : const CORBA::ULong sample_states = DDS::NOT_READ_SAMPLE_STATE;
283 0 : const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
284 0 : for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
285 0 : ++next; // pre-increment iterator, in case updates cause changes to match set
286 0 : const DDS::InstanceHandle_t handle = *it;
287 0 : const SubscriptionInstance_rch inst = get_handle_instance(handle);
288 0 : if (!inst) continue;
289 :
290 0 : bool most_recent_generation = false;
291 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
292 0 : !found_data && item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
293 0 : if (item->registered_data_) {
294 0 : received_data = *static_cast<MessageType*>(item->registered_data_);
295 : }
296 0 : inst->instance_state_->sample_info(sample_info_ref, item);
297 0 : inst->rcvd_samples_.mark_read(item);
298 :
299 0 : const ValueDispatcher* vd = get_value_dispatcher();
300 0 : if (observer && item->registered_data_ && vd) {
301 0 : Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
302 0 : observer->on_sample_read(this, s);
303 : }
304 :
305 0 : if (!most_recent_generation) {
306 0 : most_recent_generation = inst->instance_state_->most_recent_generation(item);
307 : }
308 :
309 0 : found_data = true;
310 : }
311 :
312 0 : if (found_data) {
313 0 : if (most_recent_generation) {
314 0 : inst->instance_state_->accessed();
315 : }
316 : // Get the sample_ranks, generation_ranks, and
317 : // absolute_generation_ranks for this info_seq
318 0 : sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
319 :
320 0 : break;
321 : }
322 : }
323 :
324 0 : post_read_or_take();
325 0 : return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
326 0 : }
327 :
328 0 : virtual DDS::ReturnCode_t take_next_sample(MessageType& received_data,
329 : DDS::SampleInfo& sample_info_ref)
330 : {
331 0 : bool found_data = false;
332 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
333 :
334 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
335 :
336 0 : const CORBA::ULong sample_states = DDS::NOT_READ_SAMPLE_STATE;
337 0 : const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
338 0 : for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
339 0 : ++next; // pre-increment iterator, in case updates cause changes to match set
340 0 : const DDS::InstanceHandle_t handle = *it;
341 0 : const SubscriptionInstance_rch inst = get_handle_instance(handle);
342 0 : if (!inst) continue;
343 :
344 0 : bool most_recent_generation = false;
345 0 : ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
346 0 : if (item) {
347 0 : if (item->registered_data_) {
348 0 : received_data = *static_cast<MessageType*>(item->registered_data_);
349 : }
350 0 : inst->instance_state_->sample_info(sample_info_ref, item);
351 0 : inst->rcvd_samples_.mark_read(item);
352 :
353 0 : const ValueDispatcher* vd = get_value_dispatcher();
354 0 : if (observer && item->registered_data_ && vd) {
355 0 : Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
356 0 : observer->on_sample_taken(this, s);
357 : }
358 :
359 0 : if (!most_recent_generation) {
360 0 : most_recent_generation = inst->instance_state_->most_recent_generation(item);
361 : }
362 :
363 0 : if (most_recent_generation) {
364 0 : inst->instance_state_->accessed();
365 : }
366 :
367 : // Get the sample_ranks, generation_ranks, and
368 : // absolute_generation_ranks for this info_seq
369 0 : sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
370 :
371 0 : inst->rcvd_samples_.remove(item);
372 0 : item->dec_ref();
373 0 : item = 0;
374 :
375 0 : found_data = true;
376 :
377 0 : break;
378 : }
379 : }
380 :
381 0 : post_read_or_take();
382 0 : return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
383 0 : }
384 :
385 0 : virtual DDS::ReturnCode_t read_instance (
386 : MessageSequenceType & received_data,
387 : DDS::SampleInfoSeq & info_seq,
388 : ::CORBA::Long max_samples,
389 : DDS::InstanceHandle_t a_handle,
390 : DDS::SampleStateMask sample_states,
391 : DDS::ViewStateMask view_states,
392 : DDS::InstanceStateMask instance_states)
393 : {
394 : DDS::ReturnCode_t const precond =
395 0 : check_inputs("read_instance", received_data, info_seq, max_samples);
396 0 : if (DDS::RETCODE_OK != precond)
397 : {
398 0 : return precond;
399 : }
400 :
401 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
402 : guard,
403 : sample_lock_,
404 : DDS::RETCODE_ERROR);
405 0 : return read_instance_i(received_data, info_seq, max_samples, a_handle,
406 0 : sample_states, view_states, instance_states, 0);
407 0 : }
408 :
409 0 : virtual DDS::ReturnCode_t take_instance (
410 : MessageSequenceType & received_data,
411 : DDS::SampleInfoSeq & info_seq,
412 : ::CORBA::Long max_samples,
413 : DDS::InstanceHandle_t a_handle,
414 : DDS::SampleStateMask sample_states,
415 : DDS::ViewStateMask view_states,
416 : DDS::InstanceStateMask instance_states)
417 : {
418 : DDS::ReturnCode_t const precond =
419 0 : check_inputs("take_instance", received_data, info_seq, max_samples);
420 0 : if (DDS::RETCODE_OK != precond)
421 : {
422 0 : return precond;
423 : }
424 :
425 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
426 : guard,
427 : sample_lock_,
428 : DDS::RETCODE_ERROR);
429 0 : return take_instance_i(received_data, info_seq, max_samples, a_handle,
430 0 : sample_states, view_states, instance_states, 0);
431 0 : }
432 :
433 0 : virtual DDS::ReturnCode_t read_instance_w_condition (
434 : MessageSequenceType & received_data,
435 : DDS::SampleInfoSeq & info_seq,
436 : ::CORBA::Long max_samples,
437 : DDS::InstanceHandle_t a_handle,
438 : DDS::ReadCondition_ptr a_condition)
439 : {
440 : DDS::ReturnCode_t const precond =
441 0 : check_inputs("read_instance_w_condition", received_data, info_seq,
442 : max_samples);
443 0 : if (DDS::RETCODE_OK != precond)
444 : {
445 0 : return precond;
446 : }
447 :
448 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
449 : DDS::RETCODE_ERROR);
450 :
451 0 : if (!has_readcondition(a_condition))
452 : {
453 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
454 : }
455 :
456 : #ifndef OPENDDS_NO_QUERY_CONDITION
457 0 : DDS::QueryCondition_ptr query_condition =
458 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
459 : #endif
460 :
461 0 : return read_instance_i(received_data, info_seq, max_samples, a_handle,
462 0 : a_condition->get_sample_state_mask(),
463 0 : a_condition->get_view_state_mask(),
464 0 : a_condition->get_instance_state_mask(),
465 : #ifndef OPENDDS_NO_QUERY_CONDITION
466 : query_condition
467 : #else
468 : 0
469 : #endif
470 0 : );
471 0 : }
472 :
473 0 : virtual DDS::ReturnCode_t take_instance_w_condition (
474 : MessageSequenceType & received_data,
475 : DDS::SampleInfoSeq & info_seq,
476 : ::CORBA::Long max_samples,
477 : DDS::InstanceHandle_t a_handle,
478 : DDS::ReadCondition_ptr a_condition)
479 : {
480 : DDS::ReturnCode_t const precond =
481 0 : check_inputs("take_instance_w_condition", received_data, info_seq,
482 : max_samples);
483 0 : if (DDS::RETCODE_OK != precond)
484 : {
485 0 : return precond;
486 : }
487 :
488 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
489 : DDS::RETCODE_ERROR);
490 :
491 0 : if (!has_readcondition(a_condition))
492 : {
493 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
494 : }
495 :
496 : #ifndef OPENDDS_NO_QUERY_CONDITION
497 0 : DDS::QueryCondition_ptr query_condition =
498 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
499 : #endif
500 :
501 0 : return take_instance_i(received_data, info_seq, max_samples, a_handle,
502 0 : a_condition->get_sample_state_mask(),
503 0 : a_condition->get_view_state_mask(),
504 0 : a_condition->get_instance_state_mask(),
505 : #ifndef OPENDDS_NO_QUERY_CONDITION
506 : query_condition
507 : #else
508 : 0
509 : #endif
510 0 : );
511 0 : }
512 :
513 0 : virtual DDS::ReturnCode_t read_next_instance (
514 : MessageSequenceType & received_data,
515 : DDS::SampleInfoSeq & info_seq,
516 : ::CORBA::Long max_samples,
517 : DDS::InstanceHandle_t a_handle,
518 : DDS::SampleStateMask sample_states,
519 : DDS::ViewStateMask view_states,
520 : DDS::InstanceStateMask instance_states)
521 : {
522 : DDS::ReturnCode_t const precond =
523 0 : check_inputs("read_next_instance", received_data, info_seq, max_samples);
524 0 : if (DDS::RETCODE_OK != precond)
525 : {
526 0 : return precond;
527 : }
528 :
529 0 : return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
530 0 : sample_states, view_states, instance_states, 0);
531 : }
532 :
533 0 : virtual DDS::ReturnCode_t take_next_instance (
534 : MessageSequenceType & received_data,
535 : DDS::SampleInfoSeq & info_seq,
536 : ::CORBA::Long max_samples,
537 : DDS::InstanceHandle_t a_handle,
538 : DDS::SampleStateMask sample_states,
539 : DDS::ViewStateMask view_states,
540 : DDS::InstanceStateMask instance_states)
541 : {
542 : DDS::ReturnCode_t const precond =
543 0 : check_inputs("take_next_instance", received_data, info_seq, max_samples);
544 0 : if (DDS::RETCODE_OK != precond)
545 : {
546 0 : return precond;
547 : }
548 :
549 0 : return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
550 0 : sample_states, view_states, instance_states, 0);
551 : }
552 :
553 0 : virtual DDS::ReturnCode_t read_next_instance_w_condition (
554 : MessageSequenceType & received_data,
555 : DDS::SampleInfoSeq & info_seq,
556 : ::CORBA::Long max_samples,
557 : DDS::InstanceHandle_t a_handle,
558 : DDS::ReadCondition_ptr a_condition)
559 : {
560 : DDS::ReturnCode_t const precond =
561 0 : check_inputs("read_next_instance_w_condition", received_data, info_seq,
562 : max_samples);
563 0 : if (DDS::RETCODE_OK != precond)
564 : {
565 0 : return precond;
566 : }
567 :
568 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
569 : DDS::RETCODE_ERROR);
570 :
571 0 : if (!has_readcondition(a_condition))
572 : {
573 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
574 : }
575 :
576 : #ifndef OPENDDS_NO_QUERY_CONDITION
577 0 : DDS::QueryCondition_ptr query_condition =
578 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
579 : #endif
580 :
581 0 : return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
582 0 : a_condition->get_sample_state_mask(),
583 0 : a_condition->get_view_state_mask(),
584 0 : a_condition->get_instance_state_mask(),
585 : #ifndef OPENDDS_NO_QUERY_CONDITION
586 : query_condition
587 : #else
588 : 0
589 : #endif
590 0 : );
591 0 : }
592 :
593 0 : virtual DDS::ReturnCode_t take_next_instance_w_condition (
594 : MessageSequenceType & received_data,
595 : DDS::SampleInfoSeq & info_seq,
596 : ::CORBA::Long max_samples,
597 : DDS::InstanceHandle_t a_handle,
598 : DDS::ReadCondition_ptr a_condition)
599 : {
600 : DDS::ReturnCode_t const precond =
601 0 : check_inputs("take_next_instance_w_condition", received_data, info_seq,
602 : max_samples);
603 0 : if (DDS::RETCODE_OK != precond)
604 : {
605 0 : return precond;
606 : }
607 :
608 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
609 : DDS::RETCODE_ERROR);
610 :
611 0 : if (!has_readcondition(a_condition))
612 : {
613 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
614 : }
615 :
616 : #ifndef OPENDDS_NO_QUERY_CONDITION
617 0 : DDS::QueryCondition_ptr query_condition =
618 0 : dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
619 : #endif
620 :
621 0 : return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
622 0 : a_condition->get_sample_state_mask(),
623 0 : a_condition->get_view_state_mask(),
624 0 : a_condition->get_instance_state_mask(),
625 : #ifndef OPENDDS_NO_QUERY_CONDITION
626 : query_condition
627 : #else
628 : 0
629 : #endif
630 0 : );
631 0 : }
632 :
633 0 : virtual DDS::ReturnCode_t return_loan (
634 : MessageSequenceType & received_data,
635 : DDS::SampleInfoSeq & info_seq)
636 : {
637 : // Some incomplete tests to see that the data and info are from the
638 : // same read.
639 0 : if (received_data.length() != info_seq.length())
640 : {
641 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
642 : }
643 :
644 0 : if (received_data.release())
645 : {
646 : // nothing to do because this is not zero-copy data
647 0 : return DDS::RETCODE_OK;
648 : }
649 : else
650 : {
651 0 : info_seq.length(0);
652 0 : received_data.length(0);
653 : }
654 0 : return DDS::RETCODE_OK;
655 : }
656 :
657 0 : virtual DDS::ReturnCode_t get_key_value(MessageType& key_holder,
658 : DDS::InstanceHandle_t handle)
659 : {
660 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
661 :
662 0 : const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
663 0 : if (pos != reverse_instance_map_.end()) {
664 0 : key_holder = pos->second->first;
665 0 : return DDS::RETCODE_OK;
666 : }
667 :
668 0 : return DDS::RETCODE_BAD_PARAMETER;
669 0 : }
670 :
671 0 : virtual DDS::InstanceHandle_t lookup_instance(const MessageType& instance_data)
672 : {
673 0 : ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
674 :
675 0 : const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
676 0 : if (it != instance_map_.end()) {
677 0 : return it->second;
678 : }
679 0 : return DDS::HANDLE_NIL;
680 0 : }
681 :
682 0 : virtual DDS::ReturnCode_t auto_return_loan(void* seq)
683 : {
684 0 : MessageSequenceType& received_data =
685 : *static_cast< MessageSequenceType*> (seq);
686 :
687 0 : if (!received_data.release())
688 : {
689 : // release_loan(received_data);
690 0 : received_data.length(0);
691 : }
692 0 : return DDS::RETCODE_OK;
693 : }
694 :
695 : void release_loan (MessageSequenceType & received_data)
696 : {
697 : received_data.length(0);
698 : }
699 :
700 : #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
701 0 : bool contains_sample_filtered(DDS::SampleStateMask sample_states,
702 : DDS::ViewStateMask view_states,
703 : DDS::InstanceStateMask instance_states,
704 : const FilterEvaluator& evaluator,
705 : const DDS::StringSeq& params)
706 : {
707 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
708 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_, false);
709 :
710 0 : TopicDescriptionPtr<TopicImpl> topic(topic_servant_);
711 0 : TypeSupport* const ts = topic->get_type_support();
712 0 : TypeSupportImpl* const type_support = dynamic_cast<TypeSupportImpl*>(ts);
713 0 : const bool filter_has_non_key_fields = type_support ? evaluator.has_non_key_fields(*type_support) : true;
714 :
715 0 : const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
716 0 : for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
717 0 : ++next; // pre-increment iterator, in case updates cause changes to match set
718 0 : const DDS::InstanceHandle_t handle = *it;
719 0 : const SubscriptionInstance_rch inst = get_handle_instance(handle);
720 0 : if (!inst) continue;
721 :
722 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
723 0 : item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
724 0 : if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
725 0 : continue;
726 : }
727 0 : if (evaluator.eval(*static_cast<MessageType*>(item->registered_data_), params)) {
728 0 : return true;
729 : }
730 : }
731 : }
732 :
733 0 : return false;
734 0 : }
735 :
736 0 : DDS::ReturnCode_t read_generic(GenericBundle& gen,
737 : DDS::SampleStateMask sample_states,
738 : DDS::ViewStateMask view_states,
739 : DDS::InstanceStateMask instance_states,
740 : bool adjust_ref_count = false)
741 : {
742 0 : MessageSequenceType data;
743 : DDS::ReturnCode_t rc;
744 : {
745 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
746 0 : rc = read_i(data, gen.info_, DDS::LENGTH_UNLIMITED,
747 : sample_states, view_states, instance_states, 0);
748 0 : if (adjust_ref_count) {
749 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(data);
750 0 : received_data_p.increment_references();
751 : }
752 0 : }
753 0 : gen.samples_.reserve(data.length());
754 0 : for (CORBA::ULong i = 0; i < data.length(); ++i) {
755 0 : gen.samples_.push_back(&data[i]);
756 : }
757 0 : return rc;
758 0 : }
759 :
760 0 : DDS::InstanceHandle_t lookup_instance_generic(const void* data)
761 : {
762 0 : return lookup_instance(*static_cast<const MessageType*>(data));
763 : }
764 :
765 0 : virtual DDS::ReturnCode_t take(AbstractSamples& samples,
766 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
767 : DDS::InstanceStateMask instance_states)
768 : {
769 0 : ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex,
770 : guard,
771 : sample_lock_,
772 : DDS::RETCODE_ERROR);
773 :
774 0 : MessageSequenceType data;
775 0 : DDS::SampleInfoSeq infos;
776 0 : const DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
777 : sample_states, view_states, instance_states, 0);
778 :
779 0 : samples.reserve(data.length());
780 :
781 0 : for (CORBA::ULong i = 0; i < data.length(); ++i) {
782 0 : samples.push_back(infos[i], &data[i]);
783 : }
784 :
785 0 : return rc;
786 0 : }
787 :
788 0 : DDS::ReturnCode_t read_instance_generic(void*& data,
789 : DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
790 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
791 : DDS::InstanceStateMask instance_states)
792 : {
793 0 : MessageSequenceType dataseq;
794 0 : DDS::SampleInfoSeq infoseq;
795 0 : const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
796 : DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
797 : instance_states, 0);
798 0 : if (rc != DDS::RETCODE_NO_DATA)
799 : {
800 0 : const CORBA::ULong last = dataseq.length() - 1;
801 0 : data = new MessageType(dataseq[last]);
802 0 : info = infoseq[last];
803 : }
804 0 : return rc;
805 0 : }
806 :
807 0 : DDS::ReturnCode_t read_next_instance_generic(void*& data,
808 : DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
809 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
810 : DDS::InstanceStateMask instance_states)
811 : {
812 0 : MessageSequenceType dataseq;
813 0 : DDS::SampleInfoSeq infoseq;
814 0 : const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
815 : DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
816 : instance_states, 0);
817 0 : if (rc != DDS::RETCODE_NO_DATA)
818 : {
819 0 : const CORBA::ULong last = dataseq.length() - 1;
820 0 : data = new MessageType(dataseq[last]);
821 0 : info = infoseq[last];
822 : }
823 0 : return rc;
824 0 : }
825 :
826 : #endif
827 :
828 0 : DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
829 : DDS::ViewStateKind view,
830 : const SystemTimePoint& timestamp = SystemTimePoint::now())
831 : {
832 : using namespace OpenDDS::DCPS;
833 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
834 : DDS::HANDLE_NIL);
835 : #ifndef OPENDDS_NO_MULTI_TOPIC
836 0 : DDS::TopicDescription_var descr = get_topicdescription();
837 0 : if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
838 0 : if (!mt->filter(sample)) {
839 0 : return DDS::HANDLE_NIL;
840 : }
841 : }
842 : #endif
843 :
844 0 : get_subscriber_servant()->data_received(this);
845 :
846 0 : DDS::InstanceHandle_t inst = lookup_instance(sample);
847 0 : bool filtered = false;
848 0 : SubscriptionInstance_rch instance;
849 :
850 0 : const DDS::Time_t now = timestamp.to_dds_time();
851 0 : DataSampleHeader header;
852 0 : header.source_timestamp_sec_ = now.sec;
853 0 : header.source_timestamp_nanosec_ = now.nanosec;
854 :
855 : // Call store_instance_data() once or twice, depending on if we need to
856 : // process the INSTANCE_REGISTRATION. In either case, store_instance_data()
857 : // owns the memory for the sample and it must come from the correct allocator.
858 0 : for (int i = 0; i < 2; ++i) {
859 0 : if (i == 0 && inst != DDS::HANDLE_NIL) continue;
860 :
861 0 : const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
862 0 : header.message_id_ = static_cast<char>(msg);
863 :
864 : bool just_registered;
865 0 : unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
866 0 : store_instance_data(move(data), DDS::HANDLE_NIL, header, instance, just_registered, filtered);
867 0 : if (instance) inst = instance->instance_handle_;
868 : }
869 :
870 0 : if (!filtered) {
871 0 : if (view == DDS::NOT_NEW_VIEW_STATE) {
872 0 : if (instance) instance->instance_state_->accessed();
873 : }
874 0 : notify_read_conditions();
875 : }
876 :
877 0 : const ValueDispatcher* vd = get_value_dispatcher();
878 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_RECEIVED);
879 0 : if (observer && vd) {
880 0 : Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, header.instance_state(), now, header.sequence_, &sample, *vd);
881 0 : observer->on_sample_received(this, s);
882 : }
883 :
884 0 : return inst;
885 0 : }
886 :
887 0 : void set_instance_state_i(DDS::InstanceHandle_t instance,
888 : DDS::InstanceHandle_t publication_handle,
889 : DDS::InstanceStateKind state,
890 : const SystemTimePoint& timestamp,
891 : const GUID_t& publication_id)
892 : {
893 : // sample_lock_ must be held.
894 : using namespace OpenDDS::DCPS;
895 :
896 0 : SubscriptionInstance_rch si = get_handle_instance(instance);
897 0 : if (si && state != DDS::ALIVE_INSTANCE_STATE) {
898 0 : const DDS::Time_t now = timestamp.to_dds_time();
899 0 : DataSampleHeader header;
900 0 : header.publication_id_ = publication_id;
901 0 : header.source_timestamp_sec_ = now.sec;
902 0 : header.source_timestamp_nanosec_ = now.nanosec;
903 0 : const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
904 0 : ? DISPOSE_INSTANCE : UNREGISTER_INSTANCE;
905 0 : header.message_id_ = static_cast<char>(msg);
906 : bool just_registered, filtered;
907 0 : unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
908 0 : get_key_value(*data, instance);
909 0 : store_instance_data(move(data), publication_handle, header, si, just_registered, filtered);
910 0 : if (!filtered) {
911 0 : notify_read_conditions();
912 : }
913 0 : }
914 0 : }
915 :
916 0 : virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample& sample,
917 : OpenDDS::DCPS::SubscriptionInstance_rch& instance)
918 : {
919 : //!!! caller should already have the sample_lock_
920 0 : const bool encapsulated = sample.header_.cdr_encapsulation_;
921 0 : Message_Block_Ptr payload(sample.data(&mb_alloc_));
922 0 : OpenDDS::DCPS::Serializer ser(
923 : payload.get(),
924 : encapsulated ? Encoding::KIND_XCDR1 : Encoding::KIND_UNALIGNED_CDR,
925 0 : static_cast<Endianness>(sample.header_.byte_order_));
926 :
927 0 : if (encapsulated) {
928 0 : EncapsulationHeader encap;
929 0 : if (!(ser >> encap)) {
930 0 : if (DCPS_debug_level > 0) {
931 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
932 : ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
933 : ACE_TEXT("deserialization of encapsulation header failed.\n"),
934 : TraitsType::type_name()));
935 : }
936 0 : return;
937 : }
938 0 : Encoding encoding;
939 0 : if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
940 0 : return;
941 : }
942 :
943 0 : if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
944 0 : if (DCPS_debug_level >= 1) {
945 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
946 : ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
947 : ACE_TEXT("Encoding kind of the received sample (%C) does not ")
948 : ACE_TEXT("match the ones specified by DataReader.\n"),
949 : TraitsType::type_name(),
950 : Encoding::kind_to_string(encoding.kind()).c_str()));
951 : }
952 0 : return;
953 : }
954 0 : if (DCPS_debug_level >= 8) {
955 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
956 : ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
957 : ACE_TEXT("Deserializing with encoding kind %C.\n"),
958 : TraitsType::type_name(),
959 : Encoding::kind_to_string(encoding.kind()).c_str()));
960 : }
961 :
962 0 : ser.encoding(encoding);
963 : }
964 :
965 0 : bool ser_ret = true;
966 0 : MessageType data;
967 0 : if (sample.header_.key_fields_only_) {
968 0 : ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(data);
969 : } else {
970 0 : ser_ret = ser >> data;
971 : }
972 0 : if (!ser_ret) {
973 0 : if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
974 0 : if (DCPS_debug_level > 1) {
975 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
976 : ACE_TEXT("object construction failure, dropping sample.\n"),
977 : TraitsType::type_name()));
978 : }
979 : } else {
980 0 : if (DCPS_debug_level > 0) {
981 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
982 : ACE_TEXT("deserialization failed.\n"),
983 : TraitsType::type_name()));
984 : }
985 : }
986 0 : return;
987 : }
988 :
989 0 : DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
990 0 : typename InstanceMap::const_iterator const it = instance_map_.find(data);
991 0 : if (it != instance_map_.end()) {
992 0 : handle = it->second;
993 : }
994 :
995 0 : if (handle == DDS::HANDLE_NIL) {
996 0 : instance.reset();
997 : } else {
998 0 : instance = get_handle_instance(handle);
999 : }
1000 0 : }
1001 :
1002 0 : virtual void qos_change(const DDS::DataReaderQos& qos)
1003 : {
1004 : // reliability is not changeable, just time_based_filter
1005 0 : if (qos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1006 0 : if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
1007 0 : const DDS::Duration_t zero = { DDS::DURATION_ZERO_SEC, DDS::DURATION_ZERO_NSEC };
1008 0 : if (qos_.time_based_filter.minimum_separation != zero) {
1009 0 : if (qos.time_based_filter.minimum_separation != zero) {
1010 0 : const MonotonicTimePoint now = MonotonicTimePoint::now();
1011 0 : const TimeDuration interval(qos_.time_based_filter.minimum_separation);
1012 0 : FilterDelayedSampleQueue queue;
1013 :
1014 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1015 0 : for (typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
1016 0 : FilterDelayedSample& sample = pos->second;
1017 0 : sample.expiration_time = now + (interval - (sample.expiration_time - now));
1018 0 : queue.insert(std::make_pair(sample.expiration_time, pos->first));
1019 : }
1020 0 : std::swap(queue, filter_delayed_sample_queue_);
1021 :
1022 0 : if (!filter_delayed_sample_queue_.empty()) {
1023 0 : filter_delayed_sample_task_->cancel();
1024 0 : filter_delayed_sample_task_->schedule(interval);
1025 : }
1026 :
1027 0 : } else {
1028 0 : filter_delayed_sample_task_->cancel();
1029 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1030 0 : filter_delayed_sample_map_.clear();
1031 0 : filter_delayed_sample_queue_.clear();
1032 0 : }
1033 : }
1034 : // else no existing timers to change/cancel
1035 : }
1036 : // else no qos change so nothing to change
1037 : }
1038 :
1039 0 : DataReaderImpl::qos_change(qos);
1040 : }
1041 :
1042 : void set_marshal_skip_serialize(bool value)
1043 : {
1044 : marshal_skip_serialize_ = value;
1045 : }
1046 :
1047 : bool get_marshal_skip_serialize() const
1048 : {
1049 : return marshal_skip_serialize_;
1050 : }
1051 :
1052 0 : void release_all_instances()
1053 : {
1054 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1055 :
1056 0 : const typename InstanceMap::iterator end = instance_map_.end();
1057 0 : typename InstanceMap::iterator it = instance_map_.begin();
1058 0 : while (it != end) {
1059 0 : const DDS::InstanceHandle_t handle = it->second;
1060 0 : ++it; // it will be invalid, so iterate now.
1061 0 : release_instance(handle);
1062 : }
1063 0 : }
1064 :
1065 : protected:
1066 :
1067 0 : virtual RcHandle<MessageHolder> dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample& sample,
1068 : DDS::InstanceHandle_t publication_handle,
1069 : OpenDDS::DCPS::SubscriptionInstance_rch& instance,
1070 : bool& just_registered,
1071 : bool& filtered,
1072 : OpenDDS::DCPS::MarshalingType marshaling_type,
1073 : bool full_copy)
1074 : {
1075 0 : unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
1076 0 : dynamic_hook(*data);
1077 0 : RcHandle<MessageHolder> message_holder;
1078 :
1079 0 : Message_Block_Ptr payload(sample.data(&mb_alloc_));
1080 0 : if (marshal_skip_serialize_) {
1081 0 : if (!MarshalTraitsType::from_message_block(*data, *payload)) {
1082 0 : if (DCPS_debug_level > 0) {
1083 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::dds_demarshal: ")
1084 : ACE_TEXT("attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
1085 : }
1086 0 : return message_holder;
1087 : }
1088 0 : store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1089 0 : return message_holder;
1090 : }
1091 0 : const bool encapsulated = sample.header_.cdr_encapsulation_;
1092 :
1093 0 : OpenDDS::DCPS::Serializer ser(
1094 : payload.get(),
1095 : encapsulated ? Encoding::KIND_XCDR1 : Encoding::KIND_UNALIGNED_CDR,
1096 0 : static_cast<Endianness>(sample.header_.byte_order_));
1097 :
1098 0 : if (encapsulated) {
1099 0 : EncapsulationHeader encap;
1100 0 : if (!(ser >> encap)) {
1101 0 : if (DCPS_debug_level > 0) {
1102 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1103 : ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1104 : ACE_TEXT("deserialization of encapsulation header failed.\n"),
1105 : TraitsType::type_name()));
1106 : }
1107 0 : return message_holder;
1108 : }
1109 0 : Encoding encoding;
1110 0 : if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
1111 0 : return message_holder;
1112 : }
1113 :
1114 0 : if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
1115 0 : if (DCPS_debug_level >= 1) {
1116 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
1117 : ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1118 : ACE_TEXT("Encoding kind %C of the received sample does not ")
1119 : ACE_TEXT("match the ones specified by DataReader.\n"),
1120 : TraitsType::type_name(),
1121 : Encoding::kind_to_string(encoding.kind()).c_str()));
1122 : }
1123 0 : return message_holder;
1124 : }
1125 0 : if (DCPS_debug_level >= 8) {
1126 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
1127 : ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1128 : ACE_TEXT("Deserializing with encoding kind %C.\n"),
1129 : TraitsType::type_name(),
1130 : Encoding::kind_to_string(encoding.kind()).c_str()));
1131 : }
1132 :
1133 0 : ser.encoding(encoding);
1134 : }
1135 :
1136 0 : const bool key_only_marshaling =
1137 : marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING;
1138 :
1139 0 : bool ser_ret = true;
1140 0 : if (key_only_marshaling) {
1141 0 : ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(*data);
1142 : } else {
1143 0 : ser_ret = ser >> *data;
1144 0 : if (full_copy) {
1145 0 : message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
1146 : }
1147 : }
1148 0 : if (!ser_ret) {
1149 0 : if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
1150 0 : if (DCPS_debug_level > 1) {
1151 0 : ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
1152 : ACE_TEXT("object construction failure, dropping sample.\n"),
1153 : TraitsType::type_name()));
1154 : }
1155 : } else {
1156 0 : if (DCPS_debug_level > 0) {
1157 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR %CDataReaderImpl::dds_demarshal ")
1158 : ACE_TEXT("deserialization failed, dropping sample.\n"),
1159 : TraitsType::type_name()));
1160 : }
1161 : }
1162 0 : return message_holder;
1163 : }
1164 :
1165 : #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1166 : /*
1167 : * If sample.header_.content_filter_ is true, the writer has already
1168 : * filtered.
1169 : */
1170 0 : if (!sample.header_.content_filter_) {
1171 0 : ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
1172 0 : if (content_filtered_topic_) {
1173 0 : const bool sample_only_has_key_fields = !sample.header_.valid_data();
1174 0 : if (key_only_marshaling != sample_only_has_key_fields) {
1175 0 : if (DCPS_debug_level > 0) {
1176 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1177 : ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1178 : ACE_TEXT("Mismatch between the key only and valid data properties ")
1179 : ACE_TEXT("of a %C message of a content filtered topic!\n"),
1180 : TraitsType::type_name(),
1181 : to_string(static_cast<MessageId>(sample.header_.message_id_))));
1182 : }
1183 0 : filtered = true;
1184 0 : message_holder.reset();
1185 0 : return message_holder;
1186 : }
1187 0 : const MessageType& type = static_cast<MessageType&>(*data);
1188 0 : if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
1189 0 : filtered = true;
1190 0 : message_holder.reset();
1191 0 : return message_holder;
1192 : }
1193 : }
1194 0 : }
1195 : #endif
1196 :
1197 0 : store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1198 0 : return message_holder;
1199 0 : }
1200 :
1201 0 : virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample& sample,
1202 : DDS::InstanceHandle_t publication_handle,
1203 : OpenDDS::DCPS::SubscriptionInstance_rch& instance)
1204 : {
1205 : //!!! caller should already have the sample_lock_
1206 :
1207 : // The data sample in this dispose message does not contain any valid data.
1208 : // What it needs here is the key value to identify the instance to dispose.
1209 : // The demarshal push this "sample" to received sample list so the user
1210 : // can be notified the dispose event.
1211 0 : bool just_registered = false;
1212 0 : bool filtered = false;
1213 0 : OpenDDS::DCPS::MarshalingType marshaling = OpenDDS::DCPS::FULL_MARSHALING;
1214 0 : if (sample.header_.key_fields_only_) {
1215 0 : marshaling = OpenDDS::DCPS::KEY_ONLY_MARSHALING;
1216 : }
1217 0 : dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling, false);
1218 0 : }
1219 :
1220 0 : virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
1221 : {
1222 0 : drop_sample(instance->instance_handle_);
1223 :
1224 :
1225 0 : instance->instance_state_->cancel_release();
1226 :
1227 0 : while (instance->rcvd_samples_.size() > 0)
1228 : {
1229 : OpenDDS::DCPS::ReceivedDataElement* head =
1230 0 : instance->rcvd_samples_.remove_head();
1231 0 : head->dec_ref();
1232 : }
1233 0 : }
1234 :
1235 0 : virtual void release_instance_i(DDS::InstanceHandle_t handle)
1236 : {
1237 0 : const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
1238 0 : if (pos != reverse_instance_map_.end()) {
1239 0 : remove_from_lookup_maps(handle);
1240 0 : instance_map_.erase(pos->second);
1241 0 : reverse_instance_map_.erase(pos);
1242 : }
1243 0 : }
1244 :
1245 0 : virtual void state_updated_i(DDS::InstanceHandle_t handle)
1246 : {
1247 0 : const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
1248 0 : if (pos != instances_.end()) {
1249 0 : update_lookup_maps(pos);
1250 : }
1251 0 : }
1252 :
1253 : private:
1254 :
1255 : /// Available for specialization so that some types of MessageType can observe and
1256 : /// change the sample before dds_demarshal deserializes into it
1257 0 : void dynamic_hook(MessageType&) {}
1258 :
1259 0 : bool store_instance_data_check(unique_ptr<MessageTypeWithAllocator>& instance_data,
1260 : DDS::InstanceHandle_t publication_handle,
1261 : const OpenDDS::DCPS::DataSampleHeader& header,
1262 : OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr)
1263 : {
1264 : #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
1265 0 : const bool is_dispose_msg =
1266 0 : header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
1267 0 : header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
1268 :
1269 0 : if (!is_bit() && security_config_) {
1270 0 : if (header.message_id_ == SAMPLE_DATA ||
1271 0 : header.message_id_ == INSTANCE_REGISTRATION) {
1272 :
1273 : // Pubulisher has already gone through the check.
1274 0 : if (instance_ptr &&
1275 0 : instance_ptr->instance_state_ &&
1276 0 : instance_ptr->instance_state_->writes_instance(header.publication_id_)) {
1277 0 : return true;
1278 : }
1279 :
1280 0 : DDS::Security::SecurityException ex;
1281 0 : const GUID_t local_participant = make_part_guid(get_guid());
1282 0 : const GUID_t remote_participant = make_part_guid(header.publication_id_);
1283 0 : const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1284 : // Construct a DynamicData around the deserialized sample.
1285 0 : DDS::DynamicData_var dda =
1286 0 : XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
1287 : // The remote participant might not be using security.
1288 0 : if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1289 0 : !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1290 0 : if (log_level >= LogLevel::Warning) {
1291 0 : ACE_ERROR((LM_WARNING,
1292 : "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
1293 : ex.code, ex.minor_code, ex.message.in()));
1294 : }
1295 0 : return false;
1296 : }
1297 0 : } else if (is_dispose_msg) {
1298 :
1299 0 : DDS::Security::SecurityException ex;
1300 0 : const GUID_t local_participant = make_part_guid(get_guid());
1301 0 : const GUID_t remote_participant = make_part_guid(header.publication_id_);
1302 0 : const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1303 : // Construct a DynamicData around the deserialized sample.
1304 0 : DDS::DynamicData_var dda =
1305 0 : XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
1306 : // The remote participant might not be using security.
1307 0 : if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1308 0 : !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1309 0 : if (log_level >= LogLevel::Warning) {
1310 0 : ACE_ERROR((LM_WARNING,
1311 : "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
1312 : ex.code, ex.minor_code, ex.message.in()));
1313 : }
1314 0 : return false;
1315 : }
1316 0 : }
1317 : }
1318 : #else
1319 : ACE_UNUSED_ARG(instance_data);
1320 : ACE_UNUSED_ARG(publication_handle);
1321 : ACE_UNUSED_ARG(header);
1322 : ACE_UNUSED_ARG(instance_ptr);
1323 : #endif
1324 :
1325 0 : return true;
1326 : }
1327 :
1328 0 : DDS::ReturnCode_t read_i(MessageSequenceType& received_data,
1329 : DDS::SampleInfoSeq& info_seq,
1330 : CORBA::Long max_samples,
1331 : DDS::SampleStateMask sample_states,
1332 : DDS::ViewStateMask view_states,
1333 : DDS::InstanceStateMask instance_states,
1334 : #ifndef OPENDDS_NO_QUERY_CONDITION
1335 : DDS::QueryCondition_ptr a_condition)
1336 : #else
1337 : int)
1338 : #endif
1339 : {
1340 :
1341 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1342 :
1343 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1344 0 : if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
1345 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1346 : }
1347 :
1348 0 : const bool group_coherent_ordered =
1349 0 : subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
1350 0 : && subqos_.presentation.coherent_access
1351 0 : && subqos_.presentation.ordered_access;
1352 :
1353 0 : if (group_coherent_ordered && coherent_) {
1354 0 : max_samples = 1;
1355 : }
1356 : #endif
1357 :
1358 0 : RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1359 : #ifndef OPENDDS_NO_QUERY_CONDITION
1360 : a_condition,
1361 : #endif
1362 : DDS_OPERATION_READ);
1363 :
1364 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
1365 :
1366 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1367 0 : if (!group_coherent_ordered) {
1368 : #endif
1369 0 : const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1370 0 : for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1371 0 : ++next; // pre-increment iterator, in case updates cause changes to match set
1372 0 : const DDS::InstanceHandle_t handle = *it;
1373 0 : const SubscriptionInstance_rch inst = get_handle_instance(handle);
1374 0 : if (!inst) continue;
1375 :
1376 0 : size_t i(0);
1377 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1378 0 : item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1379 0 : results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1380 :
1381 0 : const ValueDispatcher* vd = get_value_dispatcher();
1382 0 : if (observer && item->registered_data_ && vd) {
1383 0 : Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1384 0 : observer->on_sample_read(this, s);
1385 : }
1386 : }
1387 : }
1388 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1389 : } else {
1390 0 : const RakeData item = group_coherent_ordered_data_.get_data();
1391 0 : results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1392 0 : const ValueDispatcher* vd = get_value_dispatcher();
1393 0 : if (observer && item.rde_->registered_data_ && vd) {
1394 0 : typename InstanceMap::iterator i = instance_map_.begin();
1395 0 : const DDS::InstanceHandle_t handle = (i != instance_map_.end()) ? i->second : DDS::HANDLE_NIL;
1396 0 : Observer::Sample s(handle, item.si_->instance_state_->instance_state(), *item.rde_, *vd);
1397 0 : observer->on_sample_read(this, s);
1398 : }
1399 0 : }
1400 : #endif
1401 :
1402 0 : results.copy_to_user();
1403 :
1404 0 : DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
1405 0 : if (received_data.length()) {
1406 0 : ret = DDS::RETCODE_OK;
1407 0 : if (received_data.maximum() == 0) { // using ZeroCopy
1408 0 : received_data_p.set_loaner(this);
1409 : }
1410 : }
1411 :
1412 0 : post_read_or_take();
1413 0 : return ret;
1414 0 : }
1415 :
1416 0 : DDS::ReturnCode_t take_i(MessageSequenceType& received_data,
1417 : DDS::SampleInfoSeq& info_seq,
1418 : CORBA::Long max_samples,
1419 : DDS::SampleStateMask sample_states,
1420 : DDS::ViewStateMask view_states,
1421 : DDS::InstanceStateMask instance_states,
1422 : #ifndef OPENDDS_NO_QUERY_CONDITION
1423 : DDS::QueryCondition_ptr a_condition)
1424 : #else
1425 : int)
1426 : #endif
1427 : {
1428 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1429 :
1430 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1431 0 : if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
1432 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
1433 : }
1434 :
1435 0 : const bool group_coherent_ordered =
1436 0 : subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
1437 0 : && subqos_.presentation.coherent_access
1438 0 : && subqos_.presentation.ordered_access;
1439 :
1440 0 : if (group_coherent_ordered && coherent_) {
1441 0 : max_samples = 1;
1442 : }
1443 : #endif
1444 :
1445 0 : RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1446 : #ifndef OPENDDS_NO_QUERY_CONDITION
1447 : a_condition,
1448 : #endif
1449 : DDS_OPERATION_TAKE);
1450 :
1451 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
1452 :
1453 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1454 0 : if (!group_coherent_ordered) {
1455 : #endif
1456 0 : const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1457 0 : for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1458 0 : ++next; // pre-increment iterator, in case updates cause changes to match set
1459 0 : const DDS::InstanceHandle_t handle = *it;
1460 0 : const SubscriptionInstance_rch inst = get_handle_instance(handle);
1461 0 : if (!inst) continue;
1462 :
1463 0 : size_t i(0);
1464 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1465 0 : item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1466 0 : results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1467 :
1468 0 : const ValueDispatcher* vd = get_value_dispatcher();
1469 0 : if (observer && item->registered_data_ && vd) {
1470 0 : Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1471 0 : observer->on_sample_taken(this, s);
1472 : }
1473 : }
1474 : }
1475 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1476 : } else {
1477 0 : const RakeData item = group_coherent_ordered_data_.get_data();
1478 0 : results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1479 0 : }
1480 : #endif
1481 :
1482 0 : results.copy_to_user();
1483 :
1484 0 : DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
1485 0 : if (received_data.length()) {
1486 0 : ret = DDS::RETCODE_OK;
1487 0 : if (received_data.maximum() == 0) { // using ZeroCopy
1488 0 : received_data_p.set_loaner(this);
1489 : }
1490 : }
1491 :
1492 0 : post_read_or_take();
1493 0 : return ret;
1494 0 : }
1495 :
1496 0 : DDS::ReturnCode_t read_instance_i(MessageSequenceType& received_data,
1497 : DDS::SampleInfoSeq& info_seq,
1498 : CORBA::Long max_samples,
1499 : DDS::InstanceHandle_t a_handle,
1500 : DDS::SampleStateMask sample_states,
1501 : DDS::ViewStateMask view_states,
1502 : DDS::InstanceStateMask instance_states,
1503 : #ifndef OPENDDS_NO_QUERY_CONDITION
1504 : DDS::QueryCondition_ptr a_condition)
1505 : #else
1506 : int)
1507 : #endif
1508 : {
1509 0 : const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1510 0 : if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1511 :
1512 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1513 :
1514 0 : RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1515 : #ifndef OPENDDS_NO_QUERY_CONDITION
1516 : a_condition,
1517 : #endif
1518 : DDS_OPERATION_READ);
1519 :
1520 0 : const InstanceState_rch state_obj = inst->instance_state_;
1521 0 : if (state_obj->match(view_states, instance_states)) {
1522 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
1523 0 : size_t i(0);
1524 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1525 0 : item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1526 0 : results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1527 0 : const ValueDispatcher* vd = get_value_dispatcher();
1528 0 : if (observer && item->registered_data_ && vd) {
1529 0 : Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1530 0 : observer->on_sample_read(this, s);
1531 : }
1532 : }
1533 0 : } else if (DCPS_debug_level >= 8) {
1534 0 : OPENDDS_STRING msg;
1535 0 : if ((state_obj->view_state() & view_states) == 0) {
1536 0 : msg = "view state is not valid";
1537 : }
1538 0 : if ((state_obj->instance_state() & instance_states) == 0) {
1539 0 : if (!msg.empty()) msg += " and ";
1540 0 : msg += "instance state is ";
1541 0 : msg += state_obj->instance_state_string();
1542 0 : msg += " while the validity mask is " + InstanceState::instance_state_mask_string(instance_states);
1543 : }
1544 0 : ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl_T::read_instance_i: ")
1545 : ACE_TEXT("will return no data reading sub %C because:\n %C\n"),
1546 : LogGuid(get_guid()).c_str(), msg.c_str()));
1547 0 : }
1548 :
1549 0 : results.copy_to_user();
1550 :
1551 0 : DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
1552 0 : if (received_data.length()) {
1553 0 : ret = DDS::RETCODE_OK;
1554 0 : if (received_data.maximum() == 0) { // using ZeroCopy
1555 0 : received_data_p.set_loaner(this);
1556 : }
1557 : }
1558 :
1559 0 : post_read_or_take();
1560 0 : return ret;
1561 0 : }
1562 :
1563 0 : DDS::ReturnCode_t take_instance_i(MessageSequenceType& received_data,
1564 : DDS::SampleInfoSeq& info_seq,
1565 : CORBA::Long max_samples,
1566 : DDS::InstanceHandle_t a_handle,
1567 : DDS::SampleStateMask sample_states,
1568 : DDS::ViewStateMask view_states,
1569 : DDS::InstanceStateMask instance_states,
1570 : #ifndef OPENDDS_NO_QUERY_CONDITION
1571 : DDS::QueryCondition_ptr a_condition)
1572 : #else
1573 : int)
1574 : #endif
1575 : {
1576 0 : const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1577 0 : if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1578 :
1579 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1580 :
1581 0 : RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1582 : #ifndef OPENDDS_NO_QUERY_CONDITION
1583 : a_condition,
1584 : #endif
1585 : DDS_OPERATION_TAKE);
1586 :
1587 0 : const InstanceState_rch state_obj = inst->instance_state_;
1588 0 : if (state_obj->match(view_states, instance_states)) {
1589 0 : const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
1590 0 : size_t i(0);
1591 0 : for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1592 0 : item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1593 0 : results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1594 0 : const ValueDispatcher* vd = get_value_dispatcher();
1595 0 : if (observer && item->registered_data_ && vd) {
1596 0 : Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1597 0 : observer->on_sample_taken(this, s);
1598 : }
1599 : }
1600 0 : }
1601 :
1602 0 : results.copy_to_user();
1603 :
1604 0 : DDS::ReturnCode_t ret = DDS::RETCODE_NO_DATA;
1605 0 : if (received_data.length()) {
1606 0 : ret = DDS::RETCODE_OK;
1607 0 : if (received_data.maximum() == 0) { // using ZeroCopy
1608 0 : received_data_p.set_loaner(this);
1609 : }
1610 : }
1611 :
1612 0 : post_read_or_take();
1613 0 : return ret;
1614 0 : }
1615 :
1616 0 : DDS::ReturnCode_t read_next_instance_i(MessageSequenceType& received_data,
1617 : DDS::SampleInfoSeq& info_seq,
1618 : CORBA::Long max_samples,
1619 : DDS::InstanceHandle_t a_handle,
1620 : DDS::SampleStateMask sample_states,
1621 : DDS::ViewStateMask view_states,
1622 : DDS::InstanceStateMask instance_states,
1623 : #ifndef OPENDDS_NO_QUERY_CONDITION
1624 : DDS::QueryCondition_ptr a_condition)
1625 : #else
1626 : int)
1627 : #endif
1628 : {
1629 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
1630 :
1631 0 : typename InstanceMap::iterator it = instance_map_.begin();
1632 0 : const typename InstanceMap::iterator the_end = instance_map_.end();
1633 0 : if (a_handle != DDS::HANDLE_NIL) {
1634 0 : const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1635 0 : if (pos != reverse_instance_map_.end()) {
1636 0 : it = pos->second;
1637 0 : ++it;
1638 : } else {
1639 0 : it = the_end;
1640 : }
1641 : }
1642 :
1643 0 : DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
1644 0 : for (; it != the_end; ++it) {
1645 0 : handle = it->second;
1646 : const DDS::ReturnCode_t status =
1647 0 : read_instance_i(received_data, info_seq, max_samples, handle,
1648 : sample_states, view_states, instance_states,
1649 : #ifndef OPENDDS_NO_QUERY_CONDITION
1650 : a_condition);
1651 : #else
1652 : 0);
1653 : #endif
1654 0 : if (status != DDS::RETCODE_NO_DATA) {
1655 0 : post_read_or_take();
1656 0 : return status;
1657 : }
1658 : }
1659 :
1660 0 : post_read_or_take();
1661 0 : return DDS::RETCODE_NO_DATA;
1662 0 : }
1663 :
1664 0 : DDS::ReturnCode_t take_next_instance_i(MessageSequenceType& received_data,
1665 : DDS::SampleInfoSeq& info_seq,
1666 : CORBA::Long max_samples,
1667 : DDS::InstanceHandle_t a_handle,
1668 : DDS::SampleStateMask sample_states,
1669 : DDS::ViewStateMask view_states,
1670 : DDS::InstanceStateMask instance_states,
1671 : #ifndef OPENDDS_NO_QUERY_CONDITION
1672 : DDS::QueryCondition_ptr a_condition)
1673 : #else
1674 : int)
1675 : #endif
1676 : {
1677 0 : ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, DDS::RETCODE_ERROR);
1678 :
1679 0 : typename InstanceMap::iterator it = instance_map_.begin();
1680 0 : const typename InstanceMap::iterator the_end = instance_map_.end();
1681 0 : if (a_handle != DDS::HANDLE_NIL) {
1682 0 : const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1683 0 : if (pos != reverse_instance_map_.end()) {
1684 0 : it = pos->second;
1685 0 : ++it;
1686 : } else {
1687 0 : it = the_end;
1688 : }
1689 : }
1690 :
1691 0 : DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
1692 0 : for (; it != the_end; ++it) {
1693 0 : handle = it->second;
1694 : const DDS::ReturnCode_t status =
1695 0 : take_instance_i(received_data, info_seq, max_samples, handle,
1696 : sample_states, view_states, instance_states,
1697 : #ifndef OPENDDS_NO_QUERY_CONDITION
1698 : a_condition);
1699 : #else
1700 : 0);
1701 : #endif
1702 0 : if (status != DDS::RETCODE_NO_DATA) {
1703 0 : total_samples(); // see if we are empty
1704 0 : post_read_or_take();
1705 0 : return status;
1706 : }
1707 : }
1708 :
1709 0 : post_read_or_take();
1710 0 : return DDS::RETCODE_NO_DATA;
1711 0 : }
1712 :
1713 0 : void store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data,
1714 : DDS::InstanceHandle_t publication_handle,
1715 : const OpenDDS::DCPS::DataSampleHeader& header,
1716 : OpenDDS::DCPS::SubscriptionInstance_rch& instance_ptr,
1717 : bool& just_registered,
1718 : bool& filtered)
1719 : {
1720 : ACE_UNUSED_ARG(publication_handle);
1721 :
1722 0 : const bool is_dispose_msg =
1723 0 : header.message_id_ == OpenDDS::DCPS::DISPOSE_INSTANCE ||
1724 0 : header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
1725 0 : const bool is_unregister_msg =
1726 0 : header.message_id_ == OpenDDS::DCPS::UNREGISTER_INSTANCE ||
1727 0 : header.message_id_ == OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE;
1728 :
1729 0 : if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
1730 0 : return;
1731 : }
1732 :
1733 : // not filtering any data, except what is specifically identified as filtered below
1734 0 : filtered = false;
1735 :
1736 0 : DDS::InstanceHandle_t handle(DDS::HANDLE_NIL);
1737 :
1738 : //!!! caller should already have the sample_lock_
1739 : //We will unlock it before calling into listeners
1740 :
1741 0 : typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
1742 :
1743 0 : if (it == instance_map_.end()) {
1744 0 : if (is_dispose_msg || is_unregister_msg) {
1745 0 : return;
1746 : }
1747 :
1748 0 : std::size_t instances_size = 0;
1749 : {
1750 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1751 0 : instances_size = instances_.size();
1752 0 : }
1753 0 : if ((qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
1754 0 : ((::CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
1755 : {
1756 0 : DDS::DataReaderListener_var listener
1757 : = listener_for (DDS::SAMPLE_REJECTED_STATUS);
1758 :
1759 0 : set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
1760 :
1761 0 : sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
1762 0 : ++sample_rejected_status_.total_count;
1763 0 : ++sample_rejected_status_.total_count_change;
1764 0 : sample_rejected_status_.last_instance_handle = handle;
1765 :
1766 0 : if (!CORBA::is_nil(listener.in()))
1767 : {
1768 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
1769 :
1770 0 : listener->on_sample_rejected(this, sample_rejected_status_);
1771 0 : sample_rejected_status_.total_count_change = 0;
1772 0 : } // do we want to do something if listener is nil???
1773 0 : notify_status_condition_no_sample_lock();
1774 :
1775 0 : return;
1776 0 : }
1777 :
1778 : {
1779 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1780 :
1781 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1782 0 : SharedInstanceMap_rch inst;
1783 0 : OwnershipManagerScopedAccess ownership_scoped_access;
1784 0 : OwnershipManagerPtr owner_manager = ownership_manager();
1785 :
1786 0 : bool new_handle = true;
1787 0 : if (is_exclusive_ownership_) {
1788 0 : OwnershipManagerScopedAccess temp(owner_manager);
1789 0 : temp.swap(ownership_scoped_access);
1790 0 : if (!owner_manager || ownership_scoped_access.lock_result_ != 0) {
1791 0 : if (DCPS_debug_level > 0) {
1792 0 : ACE_ERROR ((LM_ERROR,
1793 : ACE_TEXT("(%P|%t) ")
1794 : ACE_TEXT("%CDataReaderImpl::")
1795 : ACE_TEXT("store_instance_data, ")
1796 : ACE_TEXT("acquire instance_lock failed.\n"), TraitsType::type_name()));
1797 : }
1798 0 : return;
1799 : }
1800 :
1801 0 : inst = dynamic_rchandle_cast<SharedInstanceMap>(
1802 0 : owner_manager->get_instance_map(topic_servant_->type_name(), this));
1803 0 : if (inst != 0) {
1804 0 : typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
1805 0 : if (iter != inst->end ()) {
1806 0 : handle = iter->second;
1807 0 : new_handle = false;
1808 : }
1809 : }
1810 0 : }
1811 : #endif
1812 :
1813 0 : just_registered = true;
1814 0 : DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
1815 0 : bool owns_handle = false;
1816 0 : if (handle == DDS::HANDLE_NIL) {
1817 0 : handle = get_next_handle(key);
1818 0 : owns_handle = true;
1819 : }
1820 0 : OpenDDS::DCPS::SubscriptionInstance_rch instance =
1821 : OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
1822 : rchandle_from(this),
1823 0 : qos_,
1824 0 : ref(instances_lock_),
1825 : handle, owns_handle);
1826 :
1827 : const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
1828 0 : instances_.insert(typename SubscriptionInstanceMapType::value_type(handle, instance));
1829 :
1830 0 : if (bpair.second == false) {
1831 0 : if (DCPS_debug_level > 0) {
1832 0 : ACE_ERROR((LM_ERROR,
1833 : ACE_TEXT("(%P|%t) ")
1834 : ACE_TEXT("%CDataReaderImpl::")
1835 : ACE_TEXT("store_instance_data, ")
1836 : ACE_TEXT("insert handle failed.\n"), TraitsType::type_name()));
1837 : }
1838 0 : return;
1839 : }
1840 0 : update_lookup_maps(bpair.first);
1841 :
1842 : #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1843 0 : if (owner_manager) {
1844 0 : if (!inst) {
1845 0 : inst = make_rch<SharedInstanceMap>();
1846 0 : owner_manager->set_instance_map(
1847 0 : topic_servant_->type_name(),
1848 : inst,
1849 : this);
1850 : }
1851 :
1852 0 : if (new_handle) {
1853 : const std::pair<typename InstanceMap::iterator, bool> bpair =
1854 0 : inst->insert(typename InstanceMap::value_type(*instance_data, handle));
1855 0 : if (!bpair.second) {
1856 0 : if (DCPS_debug_level > 0) {
1857 0 : ACE_ERROR ((LM_ERROR,
1858 : ACE_TEXT("(%P|%t) ")
1859 : ACE_TEXT("%CDataReaderImpl::")
1860 : ACE_TEXT("store_instance_data, ")
1861 : ACE_TEXT("insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1862 : }
1863 0 : return;
1864 : }
1865 : }
1866 :
1867 0 : OwnershipManagerScopedAccess temp;
1868 0 : temp.swap(ownership_scoped_access);
1869 0 : if (temp.release() != 0) {
1870 0 : if (DCPS_debug_level > 0) {
1871 0 : ACE_ERROR ((LM_ERROR,
1872 : ACE_TEXT("(%P|%t) ")
1873 : ACE_TEXT("%CDataReaderImpl::")
1874 : ACE_TEXT("store_instance_data, ")
1875 : ACE_TEXT("release instance_lock failed.\n"), TraitsType::type_name()));
1876 : }
1877 0 : return;
1878 : }
1879 0 : }
1880 : #endif
1881 0 : } // scope for instances_lock_
1882 :
1883 : std::pair<typename InstanceMap::iterator, bool> bpair =
1884 0 : instance_map_.insert(typename InstanceMap::value_type(*instance_data,
1885 : handle));
1886 0 : if (bpair.second == false)
1887 : {
1888 0 : if (DCPS_debug_level > 0) {
1889 0 : ACE_ERROR ((LM_ERROR,
1890 : ACE_TEXT("(%P|%t) ")
1891 : ACE_TEXT("%CDataReaderImpl::")
1892 : ACE_TEXT("store_instance_data, ")
1893 : ACE_TEXT("insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1894 : }
1895 0 : return;
1896 : }
1897 0 : reverse_instance_map_[handle] = bpair.first;
1898 : }
1899 : else
1900 : {
1901 0 : just_registered = false;
1902 0 : handle = it->second;
1903 : }
1904 :
1905 0 : if (header.message_id_ != OpenDDS::DCPS::INSTANCE_REGISTRATION)
1906 : {
1907 0 : instance_ptr = get_handle_instance(handle);
1908 0 : OPENDDS_ASSERT(instance_ptr);
1909 :
1910 0 : if (header.message_id_ == OpenDDS::DCPS::SAMPLE_DATA)
1911 : {
1912 : {
1913 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1914 0 : filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
1915 0 : }
1916 :
1917 0 : MonotonicTimePoint now;
1918 0 : MonotonicTimePoint deadline;
1919 0 : if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
1920 0 : filtered = true;
1921 0 : if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1922 0 : delay_sample(handle, move(instance_data), header, just_registered, now, deadline);
1923 : }
1924 : } else {
1925 : // nothing time based filtered now
1926 0 : clear_sample(handle);
1927 :
1928 : }
1929 :
1930 0 : if (filtered) {
1931 0 : return;
1932 : }
1933 0 : }
1934 :
1935 0 : finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
1936 : }
1937 : else
1938 : {
1939 0 : instance_ptr = get_handle_instance(handle);
1940 0 : OPENDDS_ASSERT(instance_ptr);
1941 0 : instance_ptr->instance_state_->lively(header.publication_id_);
1942 : }
1943 : }
1944 :
1945 0 : void finish_store_instance_data(unique_ptr<MessageTypeWithAllocator> instance_data, const DataSampleHeader& header,
1946 : SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg )
1947 : {
1948 0 : if ((qos_.resource_limits.max_samples_per_instance !=
1949 0 : DDS::LENGTH_UNLIMITED) &&
1950 0 : (instance_ptr->rcvd_samples_.size() >=
1951 0 : static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
1952 :
1953 : // According to spec 1.2, Samples that contain no data do not
1954 : // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
1955 : // so do not remove the oldest sample when unregister/dispose
1956 : // message arrives.
1957 :
1958 0 : if (!is_dispose_msg && !is_unregister_msg
1959 0 : && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
1960 : {
1961 0 : DDS::DataReaderListener_var listener
1962 : = listener_for(DDS::SAMPLE_REJECTED_STATUS);
1963 :
1964 0 : set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
1965 :
1966 0 : sample_rejected_status_.last_reason =
1967 : DDS::REJECTED_BY_SAMPLES_PER_INSTANCE_LIMIT;
1968 0 : ++sample_rejected_status_.total_count;
1969 0 : ++sample_rejected_status_.total_count_change;
1970 0 : sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
1971 :
1972 0 : if (!CORBA::is_nil(listener.in()))
1973 : {
1974 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
1975 :
1976 0 : listener->on_sample_rejected(this, sample_rejected_status_);
1977 0 : sample_rejected_status_.total_count_change = 0;
1978 0 : } // do we want to do something if listener is nil???
1979 0 : notify_status_condition_no_sample_lock();
1980 0 : return;
1981 0 : }
1982 0 : else if (!is_dispose_msg && !is_unregister_msg)
1983 : {
1984 : // Discard the oldest previously-read sample
1985 : OpenDDS::DCPS::ReceivedDataElement* item =
1986 0 : instance_ptr->rcvd_samples_.remove_head();
1987 0 : item->dec_ref();
1988 : }
1989 : }
1990 0 : else if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
1991 : {
1992 0 : CORBA::Long total_samples = 0;
1993 : {
1994 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1995 0 : for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
1996 0 : iter != instances_.end();
1997 0 : ++iter) {
1998 0 : OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
1999 :
2000 0 : total_samples += (CORBA::Long) ptr->rcvd_samples_.size();
2001 : }
2002 0 : }
2003 :
2004 0 : if (total_samples >= qos_.resource_limits.max_samples)
2005 : {
2006 : // According to spec 1.2, Samples that contain no data do not
2007 : // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
2008 : // so do not remove the oldest sample when unregister/dispose
2009 : // message arrives.
2010 :
2011 0 : if (!is_dispose_msg && !is_unregister_msg
2012 0 : && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
2013 : {
2014 0 : DDS::DataReaderListener_var listener
2015 : = listener_for(DDS::SAMPLE_REJECTED_STATUS);
2016 :
2017 0 : set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
2018 :
2019 0 : sample_rejected_status_.last_reason =
2020 : DDS::REJECTED_BY_SAMPLES_LIMIT;
2021 0 : ++sample_rejected_status_.total_count;
2022 0 : ++sample_rejected_status_.total_count_change;
2023 0 : sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
2024 0 : if (!CORBA::is_nil(listener.in()))
2025 : {
2026 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2027 :
2028 0 : listener->on_sample_rejected(this, sample_rejected_status_);
2029 0 : sample_rejected_status_.total_count_change = 0;
2030 0 : } // do we want to do something if listener is nil???
2031 0 : notify_status_condition_no_sample_lock();
2032 :
2033 0 : return;
2034 0 : }
2035 0 : else if (!is_dispose_msg && !is_unregister_msg)
2036 : {
2037 : // Discard the oldest previously-read sample
2038 : OpenDDS::DCPS::ReceivedDataElement *item =
2039 0 : instance_ptr->rcvd_samples_.remove_head();
2040 0 : item->dec_ref();
2041 : }
2042 : }
2043 : }
2044 :
2045 0 : bool event_notify = false;
2046 :
2047 0 : if (is_dispose_msg) {
2048 0 : event_notify = instance_ptr->instance_state_->dispose_was_received(header.publication_id_);
2049 : }
2050 :
2051 0 : if (is_unregister_msg) {
2052 0 : if (instance_ptr->instance_state_->unregister_was_received(header.publication_id_)) {
2053 0 : event_notify = true;
2054 : }
2055 : }
2056 :
2057 0 : if (!is_dispose_msg && !is_unregister_msg) {
2058 0 : event_notify = true;
2059 0 : instance_ptr->instance_state_->data_was_received(header.publication_id_);
2060 : }
2061 :
2062 0 : if (!event_notify) {
2063 0 : return;
2064 : }
2065 :
2066 0 : ReceivedDataElement* const ptr =
2067 0 : new (*rd_allocator_.get()) ReceivedDataElementWithType<MessageTypeWithAllocator>(
2068 0 : header, instance_data.release(), &sample_lock_);
2069 :
2070 0 : ptr->disposed_generation_count_ =
2071 0 : instance_ptr->instance_state_->disposed_generation_count();
2072 0 : ptr->no_writers_generation_count_ =
2073 0 : instance_ptr->instance_state_->no_writers_generation_count();
2074 :
2075 0 : instance_ptr->last_sequence_ = header.sequence_;
2076 :
2077 0 : instance_ptr->rcvd_strategy_->add(ptr);
2078 :
2079 0 : if (! is_dispose_msg && ! is_unregister_msg
2080 0 : && instance_ptr->rcvd_samples_.size() > get_depth())
2081 : {
2082 : OpenDDS::DCPS::ReceivedDataElement* head_ptr =
2083 0 : instance_ptr->rcvd_samples_.remove_head();
2084 :
2085 0 : if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
2086 : {
2087 0 : DDS::DataReaderListener_var listener
2088 : = listener_for (DDS::SAMPLE_LOST_STATUS);
2089 :
2090 0 : ++sample_lost_status_.total_count;
2091 0 : ++sample_lost_status_.total_count_change;
2092 :
2093 0 : set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
2094 :
2095 0 : if (!CORBA::is_nil(listener.in()))
2096 : {
2097 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2098 :
2099 0 : listener->on_sample_lost(this, sample_lost_status_);
2100 :
2101 0 : sample_lost_status_.total_count_change = 0;
2102 0 : }
2103 :
2104 0 : notify_status_condition_no_sample_lock();
2105 0 : }
2106 :
2107 0 : head_ptr->dec_ref();
2108 : }
2109 :
2110 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2111 0 : if (! ptr->coherent_change_) {
2112 : #endif
2113 0 : RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant();
2114 0 : if (!sub || get_deleted())
2115 0 : return;
2116 :
2117 0 : sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, true);
2118 :
2119 0 : set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
2120 :
2121 0 : DDS::SubscriberListener_var sub_listener =
2122 : sub->listener_for(DDS::DATA_ON_READERS_STATUS);
2123 0 : if (!CORBA::is_nil(sub_listener.in()) && !coherent_) {
2124 0 : if (!is_bit()) {
2125 0 : sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
2126 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2127 0 : sub_listener->on_data_on_readers(sub.in());
2128 0 : } else {
2129 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(sub, sub_listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, false));
2130 : }
2131 : } else {
2132 0 : sub->notify_status_condition();
2133 :
2134 0 : DDS::DataReaderListener_var listener =
2135 : listener_for (DDS::DATA_AVAILABLE_STATUS);
2136 :
2137 0 : if (!CORBA::is_nil(listener.in())) {
2138 0 : if (!is_bit()) {
2139 0 : set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
2140 0 : sub->set_status_changed_flag(DDS::DATA_ON_READERS_STATUS, false);
2141 0 : sub.reset();
2142 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2143 0 : listener->on_data_available(this);
2144 0 : } else {
2145 0 : TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, true, true));
2146 : }
2147 : } else {
2148 0 : notify_status_condition_no_sample_lock();
2149 : }
2150 0 : }
2151 : #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2152 0 : }
2153 : #endif
2154 : }
2155 :
2156 : /// Release sample_lock_ during status notifications in store_instance_data()
2157 : /// as the lock is not needed and could cause deadlock condition.
2158 : /// See comments in member function implementation for details.
2159 0 : void notify_status_condition_no_sample_lock()
2160 : {
2161 : // This member function avoids a deadlock condition which otherwise
2162 : // could occur as follows:
2163 : // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
2164 : // eventually DataReaderImpl::sample_lock_ to lock in call to
2165 : // DataReaderImpl::contains_samples().
2166 : // Thread2: Call to DataReaderImpl::data_received()
2167 : // causes DataReaderImpl::sample_lock_ to lock and eventually
2168 : // during notify of status condition a call to WaitSet::signal()
2169 : // causes WaitSet::lock_ to lock.
2170 : // Because the DataReaderImpl::sample_lock_ is not needed during
2171 : // status notification this member function is used in
2172 : // store_instance_data() to release sample_lock_ before making
2173 : // the notification.
2174 0 : ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2175 0 : notify_status_condition();
2176 0 : }
2177 :
2178 :
2179 : /// Common input read* & take* input processing and precondition checks
2180 0 : DDS::ReturnCode_t check_inputs(const char* method_name,
2181 : MessageSequenceType& received_data,
2182 : DDS::SampleInfoSeq& info_seq,
2183 : ::CORBA::Long max_samples)
2184 : {
2185 0 : typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
2186 :
2187 : // ---- start of preconditions common to read and take -----
2188 : // SPEC ref v1.2 7.1.2.5.3.8 #1
2189 : // NOTE: We can't check maximum() or release() here since those are
2190 : // implementation details of the sequences. In general, the
2191 : // info_seq will have release() == true and maximum() == 0.
2192 : // If we're in zero-copy mode, the received_data will have
2193 : // release() == false and maximum() == 0. If it's not
2194 : // zero-copy then received_data will have release == true()
2195 : // and maximum() == anything.
2196 0 : if (received_data.length() != info_seq.length())
2197 : {
2198 0 : ACE_DEBUG((LM_DEBUG,
2199 : ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2200 : ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
2201 : ACE_TEXT("sequences do not match.\n"),
2202 : TraitsType::type_name(),
2203 : method_name ));
2204 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
2205 : }
2206 :
2207 : //SPEC ref v1.2 7.1.2.5.3.8 #4
2208 0 : if ((received_data.maximum() > 0) && (received_data.release() == false))
2209 : {
2210 0 : ACE_DEBUG((LM_DEBUG,
2211 : ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2212 : ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
2213 : ACE_TEXT("maximum %d and owns %d\n"),
2214 : TraitsType::type_name(),
2215 : method_name,
2216 : received_data.maximum(),
2217 : received_data.release() ));
2218 :
2219 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
2220 : }
2221 :
2222 0 : if (received_data.maximum() == 0)
2223 : {
2224 : // not in SPEC but needed.
2225 0 : if (max_samples == DDS::LENGTH_UNLIMITED)
2226 : {
2227 0 : max_samples =
2228 0 : static_cast< ::CORBA::Long> (received_data_p.max_slots());
2229 : }
2230 : }
2231 : else
2232 : {
2233 0 : if (max_samples == DDS::LENGTH_UNLIMITED)
2234 : {
2235 : //SPEC ref v1.2 7.1.2.5.3.8 #5a
2236 0 : max_samples = received_data.maximum();
2237 : }
2238 0 : else if (
2239 0 : max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
2240 : {
2241 : //SPEC ref v1.2 7.1.2.5.3.8 #5c
2242 0 : ACE_DEBUG((LM_DEBUG,
2243 : ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2244 : ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
2245 : TraitsType::type_name(),
2246 : method_name,
2247 : max_samples,
2248 : received_data.maximum()));
2249 0 : return DDS::RETCODE_PRECONDITION_NOT_MET;
2250 : }
2251 : //else
2252 : //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
2253 : }
2254 :
2255 : // The spec does not say what to do in this case but it appears to be a good thing.
2256 : // Note: max_slots is the greater of the sequence's maximum and init_size.
2257 0 : if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
2258 : {
2259 0 : max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
2260 : }
2261 : //---- end of preconditions common to read and take -----
2262 :
2263 0 : return DDS::RETCODE_OK;
2264 : }
2265 :
2266 0 : void delay_sample(DDS::InstanceHandle_t handle,
2267 : unique_ptr<MessageTypeWithAllocator> data,
2268 : const OpenDDS::DCPS::DataSampleHeader& header,
2269 : const bool just_registered,
2270 : const MonotonicTimePoint& now,
2271 : const MonotonicTimePoint& deadline)
2272 : {
2273 : // sample_lock_ should already be held
2274 0 : DataSampleHeader_ptr hdr(new OpenDDS::DCPS::DataSampleHeader(header));
2275 :
2276 0 : typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
2277 0 : if (i == filter_delayed_sample_map_.end()) {
2278 :
2279 : // emplace()/insert() only if the sample is going to be
2280 : // new (otherwise we call move(data) twice).
2281 : std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
2282 : #ifdef ACE_HAS_CPP11
2283 0 : filter_delayed_sample_map_.emplace(std::piecewise_construct,
2284 : std::forward_as_tuple(handle),
2285 0 : std::forward_as_tuple(move(data), hdr, just_registered));
2286 : #else
2287 : filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
2288 : #endif
2289 0 : FilterDelayedSample& sample = result.first->second;
2290 0 : sample.expiration_time = deadline;
2291 0 : const bool schedule = filter_delayed_sample_queue_.empty();
2292 0 : filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
2293 0 : if (schedule) {
2294 0 : filter_delayed_sample_task_->schedule(now - deadline);
2295 0 : } else if (filter_delayed_sample_queue_.begin()->second == handle) {
2296 0 : filter_delayed_sample_task_->cancel();
2297 0 : filter_delayed_sample_task_->schedule(now - deadline);
2298 : }
2299 : } else {
2300 0 : FilterDelayedSample& sample = i->second;
2301 : // we only care about the most recently filtered sample, so clean up the last one
2302 :
2303 0 : sample.message = move(data);
2304 0 : sample.header = hdr;
2305 0 : sample.new_instance = just_registered;
2306 : // already scheduled for timeout at the desired time
2307 : }
2308 0 : }
2309 :
2310 0 : void clear_sample(DDS::InstanceHandle_t handle)
2311 : {
2312 : // sample_lock_ should already be held
2313 :
2314 0 : typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2315 0 : if (sample != filter_delayed_sample_map_.end()) {
2316 : // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
2317 0 : sample->second.message.reset();
2318 : }
2319 0 : }
2320 :
2321 0 : void drop_sample(DDS::InstanceHandle_t handle)
2322 : {
2323 : // sample_lock_ should already be held
2324 :
2325 0 : typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2326 0 : if (sample != filter_delayed_sample_map_.end()) {
2327 0 : for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
2328 0 : if (pos->second == handle) {
2329 0 : filter_delayed_sample_queue_.erase(pos);
2330 0 : break;
2331 : }
2332 : }
2333 :
2334 : // use the handle to erase, since the sample lock was released
2335 0 : filter_delayed_sample_map_.erase(handle);
2336 : }
2337 0 : }
2338 :
2339 0 : void filter_delayed(const MonotonicTimePoint& now)
2340 : {
2341 0 : ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2342 :
2343 : // Make a copy because finish_store_instance_data will release the sample lock.
2344 : typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) Handles;
2345 0 : Handles handles;
2346 :
2347 0 : ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2348 :
2349 0 : for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
2350 0 : handles.push_back(pos->second);
2351 0 : filter_delayed_sample_queue_.erase(pos++);
2352 : }
2353 :
2354 0 : const TimeDuration interval(qos_.time_based_filter.minimum_separation);
2355 :
2356 0 : for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
2357 0 : const DDS::InstanceHandle_t handle = *pos;
2358 :
2359 0 : SubscriptionInstance_rch instance = get_handle_instance(handle);
2360 0 : if (!instance) {
2361 0 : continue;
2362 : }
2363 :
2364 0 : typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
2365 0 : if (data == filter_delayed_sample_map_.end()) {
2366 0 : continue;
2367 : }
2368 :
2369 0 : if (data->second.message) {
2370 0 : const bool NOT_DISPOSE_MSG = false;
2371 0 : const bool NOT_UNREGISTER_MSG = false;
2372 : // clear the message, since ownership is being transferred to finish_store_instance_data.
2373 :
2374 0 : instance->last_accepted_.set_to_now();
2375 0 : const DataSampleHeader_ptr header = data->second.header;
2376 0 : const bool new_instance = data->second.new_instance;
2377 :
2378 : // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
2379 0 : finish_store_instance_data(move(data->second.message),
2380 0 : *header,
2381 : instance,
2382 : NOT_DISPOSE_MSG,
2383 : NOT_UNREGISTER_MSG);
2384 :
2385 0 : accept_sample_processing(instance, *header, new_instance);
2386 :
2387 : // Refresh the iterator.
2388 0 : data = filter_delayed_sample_map_.find(handle);
2389 0 : if (data == filter_delayed_sample_map_.end()) {
2390 0 : continue;
2391 : }
2392 :
2393 : // Reschedule.
2394 0 : data->second.expiration_time = now + interval;
2395 0 : filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
2396 :
2397 0 : } else {
2398 : // this check is performed to handle the corner case where
2399 : // store_instance_data received and delivered a sample, while this
2400 : // method was waiting for the lock
2401 0 : if (MonotonicTimePoint::now() - instance->last_sample_tv_ >= interval) {
2402 : // no new data to process, so remove from container
2403 0 : filter_delayed_sample_map_.erase(data);
2404 : }
2405 : }
2406 : }
2407 :
2408 0 : if (!filter_delayed_sample_queue_.empty()) {
2409 0 : filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
2410 : }
2411 0 : }
2412 :
2413 0 : unique_ptr<DataAllocator>& data_allocator() { return data_allocator_; }
2414 :
2415 : unique_ptr<DataAllocator> data_allocator_;
2416 :
2417 : InstanceMap instance_map_;
2418 : ReverseInstanceMap reverse_instance_map_;
2419 :
2420 : typedef DCPS::PmfSporadicTask<DataReaderImpl_T> DRISporadicTask;
2421 :
2422 : RcHandle<DRISporadicTask> filter_delayed_sample_task_;
2423 : #ifdef OPENDDS_HAS_STD_SHARED_PTR
2424 : typedef std::shared_ptr<const OpenDDS::DCPS::DataSampleHeader> DataSampleHeader_ptr;
2425 : #else
2426 : typedef ACE_Strong_Bound_Ptr<const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex> DataSampleHeader_ptr;
2427 : #endif
2428 : struct FilterDelayedSample {
2429 0 : FilterDelayedSample(unique_ptr<MessageTypeWithAllocator> msg, DataSampleHeader_ptr hdr, bool new_inst)
2430 0 : : message(move(msg))
2431 0 : , header(hdr)
2432 0 : , new_instance(new_inst)
2433 0 : {}
2434 : container_supported_unique_ptr<MessageTypeWithAllocator> message;
2435 : DataSampleHeader_ptr header;
2436 : bool new_instance;
2437 : MonotonicTimePoint expiration_time;
2438 : };
2439 : typedef OPENDDS_MAP(DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap;
2440 : FilterDelayedSampleMap filter_delayed_sample_map_;
2441 : typedef OPENDDS_MULTIMAP(MonotonicTimePoint, DDS::InstanceHandle_t) FilterDelayedSampleQueue;
2442 : FilterDelayedSampleQueue filter_delayed_sample_queue_;
2443 :
2444 : bool marshal_skip_serialize_;
2445 :
2446 : };
2447 :
2448 : template <typename MessageType>
2449 0 : void* DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator new(size_t , ACE_New_Allocator& pool)
2450 : {
2451 : typedef typename DataReaderImpl_T<MessageType>::MessageTypeMemoryBlock MessageTypeMemoryBlock;
2452 : MessageTypeMemoryBlock* block =
2453 0 : static_cast<MessageTypeMemoryBlock*>(pool.malloc(sizeof(MessageTypeMemoryBlock)));
2454 0 : block->allocator_ = &pool;
2455 0 : return block;
2456 : }
2457 :
2458 : template <typename MessageType>
2459 0 : void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory)
2460 : {
2461 0 : if (memory) {
2462 0 : MessageTypeMemoryBlock* block = static_cast<MessageTypeMemoryBlock*>(memory);
2463 0 : block->allocator_->free(block);
2464 : }
2465 0 : }
2466 :
2467 : template <typename MessageType>
2468 0 : void DataReaderImpl_T<MessageType>::MessageTypeWithAllocator::operator delete(void* memory, ACE_New_Allocator&)
2469 : {
2470 0 : operator delete(memory);
2471 0 : }
2472 :
2473 : }
2474 : }
2475 :
2476 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
2477 :
2478 : #endif /* OPENDDS_DDS_DCPS_DATAREADERIMPL_T_H */
|