OpenDDS  Snapshot(2023/04/28-20:55)
DataReaderImpl_T.h
Go to the documentation of this file.
1 #ifndef OPENDDS_DCPS_DATAREADERIMPL_T_H
2 #define OPENDDS_DCPS_DATAREADERIMPL_T_H
3 
4 #include <ace/config-lite.h>
5 
6 #ifdef ACE_HAS_CPP11
7 # define OPENDDS_HAS_STD_SHARED_PTR
8 #endif
9 
10 #include "MultiTopicImpl.h"
11 #include "RakeResults_T.h"
12 #include "SubscriberImpl.h"
13 #include "BuiltInTopicUtils.h"
14 #include "Util.h"
15 #include "TypeSupportImpl.h"
16 #include "dcps_export.h"
17 #include "GuidConverter.h"
19 
20 #ifndef OPENDDS_HAS_STD_SHARED_PTR
21 # include <ace/Bound_Ptr.h>
22 #endif
23 #include <ace/Time_Value.h>
24 
25 #ifndef OPENDDS_HAS_STD_SHARED_PTR
26 # include <memory>
27 #endif
28 
30 
31 namespace OpenDDS {
32  namespace DCPS {
33 
34  /** Servant for DataReader interface of Traits::MessageType data type.
35  *
36  * See the DDS specification, OMG formal/2015-04-10, for a description of
37  * this interface.
38  *
39  */
40  template <typename MessageType>
41  class
42 #if ( __GNUC__ == 4 && __GNUC_MINOR__ == 1)
44 #endif
47  , public virtual DataReaderImpl
48  {
49  public:
52  typedef typename TraitsType::MessageSequenceType MessageSequenceType;
53 
54  typedef OPENDDS_MAP_CMP_T(MessageType, DDS::InstanceHandle_t,
55  typename TraitsType::LessThanType) InstanceMap;
56  typedef OPENDDS_MAP(DDS::InstanceHandle_t, typename InstanceMap::iterator) ReverseInstanceMap;
57 
59  : public virtual RcObject
60  , public InstanceMap
61  {
62  };
63 
65 
66  typedef typename TraitsType::DataReaderType Interface;
67 
68  CORBA::Boolean _is_a(const char* type_id)
69  {
70  return Interface::_is_a(type_id);
71  }
72 
73  const char* _interface_repository_id() const
74  {
75  return Interface::_interface_repository_id();
76  }
77 
79  {
80  return false;
81  }
82 
83  // work around "hides overloaded virtual" warnings when MessageType=DynamicSample
88 
90  : public MessageType
91  , public EnableContainerSupportedUniquePtr<MessageTypeWithAllocator>
92  {
93  public:
94  void* operator new(size_t size, ACE_New_Allocator& pool);
95  void operator delete(void* memory, ACE_New_Allocator& pool);
96  void operator delete(void* memory);
97 
99  MessageTypeWithAllocator(const MessageType& other)
100  : MessageType(other)
101  {
102  }
103 
104  const MessageType* message() const { return this; }
105 
106 #ifndef OPENDDS_HAS_STD_UNIQUE_PTR
110 #endif
111  };
112 
116  };
117 
119 
121  : filter_delayed_sample_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl_T::filter_delayed))
122  , marshal_skip_serialize_(false)
123  {
124  initialize_lookup_maps();
125  }
126 
128  {
129  filter_delayed_sample_task_->cancel();
130 
131  for (typename InstanceMap::iterator it = instance_map_.begin();
132  it != instance_map_.end(); ++it)
133  {
134  OpenDDS::DCPS::SubscriptionInstance_rch ptr = get_handle_instance(it->second);
135  if (!ptr) continue;
136  purge_data(ptr);
137  }
138  //X SHH release the data samples in the instance_map_.
139  }
140 
141  /**
142  * Do parts of enable specific to the datatype.
143  * Called by DataReaderImpl::enable().
144  */
146  {
147  data_allocator().reset(new DataAllocator(get_n_chunks ()));
150  ACE_TEXT("(%P|%t) %CDataReaderImpl::")
151  ACE_TEXT("enable_specific-data")
152  ACE_TEXT(" Cached_Allocator_With_Overflow ")
153  ACE_TEXT("%x with %d chunks\n"),
154  TraitsType::type_name(),
155  data_allocator().get(),
156  get_n_chunks ()));
157 
158  return DDS::RETCODE_OK;
159  }
160 
162  MessageSequenceType & received_data,
168  {
169  DDS::ReturnCode_t const precond =
170  check_inputs("read", received_data, info_seq, max_samples);
171  if (DDS::RETCODE_OK != precond)
172  {
173  return precond;
174  }
175 
177  guard,
178  sample_lock_,
180 
181  return read_i(received_data, info_seq, max_samples, sample_states,
182  view_states, instance_states, 0);
183  }
184 
186  MessageSequenceType & received_data,
192  {
193  DDS::ReturnCode_t const precond =
194  check_inputs("take", received_data, info_seq, max_samples);
195  if (DDS::RETCODE_OK != precond)
196  {
197  return precond;
198  }
199 
201  guard,
202  sample_lock_,
204 
205  return take_i(received_data, info_seq, max_samples, sample_states,
206  view_states, instance_states, 0);
207  }
208 
210  MessageSequenceType & received_data,
211  DDS::SampleInfoSeq & sample_info,
213  DDS::ReadCondition_ptr a_condition)
214  {
215  DDS::ReturnCode_t const precond =
216  check_inputs("read_w_condition", received_data, sample_info, max_samples);
217  if (DDS::RETCODE_OK != precond)
218  {
219  return precond;
220  }
221 
222  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
224 
225  if (!has_readcondition(a_condition))
226  {
228  }
229 
230  return read_i(received_data, sample_info, max_samples,
231  a_condition->get_sample_state_mask(),
232  a_condition->get_view_state_mask(),
233  a_condition->get_instance_state_mask(),
234 #ifndef OPENDDS_NO_QUERY_CONDITION
235  dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
236 #else
237  0);
238 #endif
239  }
240 
242  MessageSequenceType & received_data,
243  DDS::SampleInfoSeq & sample_info,
245  DDS::ReadCondition_ptr a_condition)
246  {
247  DDS::ReturnCode_t const precond =
248  check_inputs("take_w_condition", received_data, sample_info, max_samples);
249  if (DDS::RETCODE_OK != precond)
250  {
251  return precond;
252  }
253 
254  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
256 
257  if (!has_readcondition(a_condition))
258  {
260  }
261 
262  return take_i(received_data, sample_info, max_samples,
263  a_condition->get_sample_state_mask(),
264  a_condition->get_view_state_mask(),
265  a_condition->get_instance_state_mask(),
266 #ifndef OPENDDS_NO_QUERY_CONDITION
267  dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
268 #else
269  0
270 #endif
271  );
272  }
273 
274  virtual DDS::ReturnCode_t read_next_sample(MessageType& received_data,
275  DDS::SampleInfo& sample_info_ref)
276  {
277  bool found_data = false;
279 
280  const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
281 
283  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
284  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
285  ++next; // pre-increment iterator, in case updates cause changes to match set
286  const DDS::InstanceHandle_t handle = *it;
287  const SubscriptionInstance_rch inst = get_handle_instance(handle);
288  if (!inst) continue;
289 
290  bool most_recent_generation = false;
291  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
292  !found_data && item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
293  if (item->registered_data_) {
294  received_data = *static_cast<MessageType*>(item->registered_data_);
295  }
296  inst->instance_state_->sample_info(sample_info_ref, item);
297  inst->rcvd_samples_.mark_read(item);
298 
299  const ValueDispatcher* vd = get_value_dispatcher();
300  if (observer && item->registered_data_ && vd) {
301  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
302  observer->on_sample_read(this, s);
303  }
304 
305  if (!most_recent_generation) {
306  most_recent_generation = inst->instance_state_->most_recent_generation(item);
307  }
308 
309  found_data = true;
310  }
311 
312  if (found_data) {
313  if (most_recent_generation) {
314  inst->instance_state_->accessed();
315  }
316  // Get the sample_ranks, generation_ranks, and
317  // absolute_generation_ranks for this info_seq
318  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
319 
320  break;
321  }
322  }
323 
324  post_read_or_take();
325  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
326  }
327 
328  virtual DDS::ReturnCode_t take_next_sample(MessageType& received_data,
329  DDS::SampleInfo& sample_info_ref)
330  {
331  bool found_data = false;
333 
334  const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
335 
337  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
338  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
339  ++next; // pre-increment iterator, in case updates cause changes to match set
340  const DDS::InstanceHandle_t handle = *it;
341  const SubscriptionInstance_rch inst = get_handle_instance(handle);
342  if (!inst) continue;
343 
344  bool most_recent_generation = false;
345  ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
346  if (item) {
347  if (item->registered_data_) {
348  received_data = *static_cast<MessageType*>(item->registered_data_);
349  }
350  inst->instance_state_->sample_info(sample_info_ref, item);
351  inst->rcvd_samples_.mark_read(item);
352 
353  const ValueDispatcher* vd = get_value_dispatcher();
354  if (observer && item->registered_data_ && vd) {
355  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
356  observer->on_sample_taken(this, s);
357  }
358 
359  if (!most_recent_generation) {
360  most_recent_generation = inst->instance_state_->most_recent_generation(item);
361  }
362 
363  if (most_recent_generation) {
364  inst->instance_state_->accessed();
365  }
366 
367  // Get the sample_ranks, generation_ranks, and
368  // absolute_generation_ranks for this info_seq
369  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
370 
371  inst->rcvd_samples_.remove(item);
372  item->dec_ref();
373  item = 0;
374 
375  found_data = true;
376 
377  break;
378  }
379  }
380 
381  post_read_or_take();
382  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
383  }
384 
386  MessageSequenceType & received_data,
389  DDS::InstanceHandle_t a_handle,
393  {
394  DDS::ReturnCode_t const precond =
395  check_inputs("read_instance", received_data, info_seq, max_samples);
396  if (DDS::RETCODE_OK != precond)
397  {
398  return precond;
399  }
400 
402  guard,
403  sample_lock_,
405  return read_instance_i(received_data, info_seq, max_samples, a_handle,
406  sample_states, view_states, instance_states, 0);
407  }
408 
410  MessageSequenceType & received_data,
413  DDS::InstanceHandle_t a_handle,
417  {
418  DDS::ReturnCode_t const precond =
419  check_inputs("take_instance", received_data, info_seq, max_samples);
420  if (DDS::RETCODE_OK != precond)
421  {
422  return precond;
423  }
424 
426  guard,
427  sample_lock_,
429  return take_instance_i(received_data, info_seq, max_samples, a_handle,
430  sample_states, view_states, instance_states, 0);
431  }
432 
434  MessageSequenceType & received_data,
437  DDS::InstanceHandle_t a_handle,
438  DDS::ReadCondition_ptr a_condition)
439  {
440  DDS::ReturnCode_t const precond =
441  check_inputs("read_instance_w_condition", received_data, info_seq,
442  max_samples);
443  if (DDS::RETCODE_OK != precond)
444  {
445  return precond;
446  }
447 
448  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
450 
451  if (!has_readcondition(a_condition))
452  {
454  }
455 
456 #ifndef OPENDDS_NO_QUERY_CONDITION
457  DDS::QueryCondition_ptr query_condition =
458  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
459 #endif
460 
461  return read_instance_i(received_data, info_seq, max_samples, a_handle,
462  a_condition->get_sample_state_mask(),
463  a_condition->get_view_state_mask(),
464  a_condition->get_instance_state_mask(),
465 #ifndef OPENDDS_NO_QUERY_CONDITION
466  query_condition
467 #else
468  0
469 #endif
470  );
471  }
472 
474  MessageSequenceType & received_data,
477  DDS::InstanceHandle_t a_handle,
478  DDS::ReadCondition_ptr a_condition)
479  {
480  DDS::ReturnCode_t const precond =
481  check_inputs("take_instance_w_condition", received_data, info_seq,
482  max_samples);
483  if (DDS::RETCODE_OK != precond)
484  {
485  return precond;
486  }
487 
488  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
490 
491  if (!has_readcondition(a_condition))
492  {
494  }
495 
496 #ifndef OPENDDS_NO_QUERY_CONDITION
497  DDS::QueryCondition_ptr query_condition =
498  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
499 #endif
500 
501  return take_instance_i(received_data, info_seq, max_samples, a_handle,
502  a_condition->get_sample_state_mask(),
503  a_condition->get_view_state_mask(),
504  a_condition->get_instance_state_mask(),
505 #ifndef OPENDDS_NO_QUERY_CONDITION
506  query_condition
507 #else
508  0
509 #endif
510  );
511  }
512 
514  MessageSequenceType & received_data,
517  DDS::InstanceHandle_t a_handle,
521  {
522  DDS::ReturnCode_t const precond =
523  check_inputs("read_next_instance", received_data, info_seq, max_samples);
524  if (DDS::RETCODE_OK != precond)
525  {
526  return precond;
527  }
528 
529  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
530  sample_states, view_states, instance_states, 0);
531  }
532 
534  MessageSequenceType & received_data,
537  DDS::InstanceHandle_t a_handle,
541  {
542  DDS::ReturnCode_t const precond =
543  check_inputs("take_next_instance", received_data, info_seq, max_samples);
544  if (DDS::RETCODE_OK != precond)
545  {
546  return precond;
547  }
548 
549  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
550  sample_states, view_states, instance_states, 0);
551  }
552 
554  MessageSequenceType & received_data,
557  DDS::InstanceHandle_t a_handle,
558  DDS::ReadCondition_ptr a_condition)
559  {
560  DDS::ReturnCode_t const precond =
561  check_inputs("read_next_instance_w_condition", received_data, info_seq,
562  max_samples);
563  if (DDS::RETCODE_OK != precond)
564  {
565  return precond;
566  }
567 
568  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
570 
571  if (!has_readcondition(a_condition))
572  {
574  }
575 
576 #ifndef OPENDDS_NO_QUERY_CONDITION
577  DDS::QueryCondition_ptr query_condition =
578  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
579 #endif
580 
581  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
582  a_condition->get_sample_state_mask(),
583  a_condition->get_view_state_mask(),
584  a_condition->get_instance_state_mask(),
585 #ifndef OPENDDS_NO_QUERY_CONDITION
586  query_condition
587 #else
588  0
589 #endif
590  );
591  }
592 
594  MessageSequenceType & received_data,
597  DDS::InstanceHandle_t a_handle,
598  DDS::ReadCondition_ptr a_condition)
599  {
600  DDS::ReturnCode_t const precond =
601  check_inputs("take_next_instance_w_condition", received_data, info_seq,
602  max_samples);
603  if (DDS::RETCODE_OK != precond)
604  {
605  return precond;
606  }
607 
608  ACE_GUARD_RETURN (ACE_Recursive_Thread_Mutex, guard, sample_lock_,
610 
611  if (!has_readcondition(a_condition))
612  {
614  }
615 
616 #ifndef OPENDDS_NO_QUERY_CONDITION
617  DDS::QueryCondition_ptr query_condition =
618  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
619 #endif
620 
621  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
622  a_condition->get_sample_state_mask(),
623  a_condition->get_view_state_mask(),
624  a_condition->get_instance_state_mask(),
625 #ifndef OPENDDS_NO_QUERY_CONDITION
626  query_condition
627 #else
628  0
629 #endif
630  );
631  }
632 
634  MessageSequenceType & received_data,
636  {
637  // Some incomplete tests to see that the data and info are from the
638  // same read.
639  if (received_data.length() != info_seq.length())
640  {
642  }
643 
644  if (received_data.release())
645  {
646  // nothing to do because this is not zero-copy data
647  return DDS::RETCODE_OK;
648  }
649  else
650  {
651  info_seq.length(0);
652  received_data.length(0);
653  }
654  return DDS::RETCODE_OK;
655  }
656 
657  virtual DDS::ReturnCode_t get_key_value(MessageType& key_holder,
658  DDS::InstanceHandle_t handle)
659  {
660  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
661 
662  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
663  if (pos != reverse_instance_map_.end()) {
664  key_holder = pos->second->first;
665  return DDS::RETCODE_OK;
666  }
667 
669  }
670 
671  virtual DDS::InstanceHandle_t lookup_instance(const MessageType& instance_data)
672  {
673  ACE_Guard<ACE_Recursive_Thread_Mutex> guard(sample_lock_);
674 
675  const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
676  if (it != instance_map_.end()) {
677  return it->second;
678  }
679  return DDS::HANDLE_NIL;
680  }
681 
683  {
684  MessageSequenceType& received_data =
685  *static_cast< MessageSequenceType*> (seq);
686 
687  if (!received_data.release())
688  {
689  // release_loan(received_data);
690  received_data.length(0);
691  }
692  return DDS::RETCODE_OK;
693  }
694 
695  void release_loan (MessageSequenceType & received_data)
696  {
697  received_data.length(0);
698  }
699 
700 #ifndef OPENDDS_NO_CONTENT_SUBSCRIPTION_PROFILE
704  const FilterEvaluator& evaluator,
705  const DDS::StringSeq& params)
706  {
707  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_, false);
708  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_, false);
709 
710  TopicDescriptionPtr<TopicImpl> topic(topic_servant_);
711  TypeSupport* const ts = topic->get_type_support();
712  TypeSupportImpl* const type_support = dynamic_cast<TypeSupportImpl*>(ts);
713  const bool filter_has_non_key_fields = type_support ? evaluator.has_non_key_fields(*type_support) : true;
714 
715  const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
716  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
717  ++next; // pre-increment iterator, in case updates cause changes to match set
718  const DDS::InstanceHandle_t handle = *it;
719  const SubscriptionInstance_rch inst = get_handle_instance(handle);
720  if (!inst) continue;
721 
722  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
723  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
724  if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
725  continue;
726  }
727  if (evaluator.eval(*static_cast<MessageType*>(item->registered_data_), params)) {
728  return true;
729  }
730  }
731  }
732 
733  return false;
734  }
735 
740  bool adjust_ref_count = false)
741  {
742  MessageSequenceType data;
744  {
746  rc = read_i(data, gen.info_, DDS::LENGTH_UNLIMITED,
747  sample_states, view_states, instance_states, 0);
748  if (adjust_ref_count) {
749  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(data);
750  received_data_p.increment_references();
751  }
752  }
753  gen.samples_.reserve(data.length());
754  for (CORBA::ULong i = 0; i < data.length(); ++i) {
755  gen.samples_.push_back(&data[i]);
756  }
757  return rc;
758  }
759 
761  {
762  return lookup_instance(*static_cast<const MessageType*>(data));
763  }
764 
768  {
770  guard,
771  sample_lock_,
773 
774  MessageSequenceType data;
775  DDS::SampleInfoSeq infos;
776  const DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
777  sample_states, view_states, instance_states, 0);
778 
779  samples.reserve(data.length());
780 
781  for (CORBA::ULong i = 0; i < data.length(); ++i) {
782  samples.push_back(infos[i], &data[i]);
783  }
784 
785  return rc;
786  }
787 
789  DDS::SampleInfo& info, DDS::InstanceHandle_t instance,
792  {
793  MessageSequenceType dataseq;
794  DDS::SampleInfoSeq infoseq;
795  const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
796  DDS::LENGTH_UNLIMITED, instance, sample_states, view_states,
797  instance_states, 0);
798  if (rc != DDS::RETCODE_NO_DATA)
799  {
800  const CORBA::ULong last = dataseq.length() - 1;
801  data = new MessageType(dataseq[last]);
802  info = infoseq[last];
803  }
804  return rc;
805  }
806 
808  DDS::SampleInfo& info, DDS::InstanceHandle_t previous_instance,
811  {
812  MessageSequenceType dataseq;
813  DDS::SampleInfoSeq infoseq;
814  const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
815  DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
816  instance_states, 0);
817  if (rc != DDS::RETCODE_NO_DATA)
818  {
819  const CORBA::ULong last = dataseq.length() - 1;
820  data = new MessageType(dataseq[last]);
821  info = infoseq[last];
822  }
823  return rc;
824  }
825 
826 #endif
827 
828  DDS::InstanceHandle_t store_synthetic_data(const MessageType& sample,
829  DDS::ViewStateKind view,
830  const SystemTimePoint& timestamp = SystemTimePoint::now())
831  {
832  using namespace OpenDDS::DCPS;
833  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, guard, sample_lock_,
835 #ifndef OPENDDS_NO_MULTI_TOPIC
836  DDS::TopicDescription_var descr = get_topicdescription();
837  if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
838  if (!mt->filter(sample)) {
839  return DDS::HANDLE_NIL;
840  }
841  }
842 #endif
843 
844  get_subscriber_servant()->data_received(this);
845 
846  DDS::InstanceHandle_t inst = lookup_instance(sample);
847  bool filtered = false;
848  SubscriptionInstance_rch instance;
849 
850  const DDS::Time_t now = timestamp.to_dds_time();
852  header.source_timestamp_sec_ = now.sec;
853  header.source_timestamp_nanosec_ = now.nanosec;
854 
855  // Call store_instance_data() once or twice, depending on if we need to
856  // process the INSTANCE_REGISTRATION. In either case, store_instance_data()
857  // owns the memory for the sample and it must come from the correct allocator.
858  for (int i = 0; i < 2; ++i) {
859  if (i == 0 && inst != DDS::HANDLE_NIL) continue;
860 
861  const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
862  header.message_id_ = static_cast<char>(msg);
863 
864  bool just_registered;
865  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
866  store_instance_data(move(data), DDS::HANDLE_NIL, header, instance, just_registered, filtered);
867  if (instance) inst = instance->instance_handle_;
868  }
869 
870  if (!filtered) {
871  if (view == DDS::NOT_NEW_VIEW_STATE) {
872  if (instance) instance->instance_state_->accessed();
873  }
874  notify_read_conditions();
875  }
876 
877  const ValueDispatcher* vd = get_value_dispatcher();
878  const Observer_rch observer = get_observer(Observer::e_SAMPLE_RECEIVED);
879  if (observer && vd) {
880  Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, header.instance_state(), now, header.sequence_, &sample, *vd);
881  observer->on_sample_received(this, s);
882  }
883 
884  return inst;
885  }
886 
888  DDS::InstanceHandle_t publication_handle,
890  const SystemTimePoint& timestamp,
891  const GUID_t& publication_id)
892  {
893  // sample_lock_ must be held.
894  using namespace OpenDDS::DCPS;
895 
896  SubscriptionInstance_rch si = get_handle_instance(instance);
897  if (si && state != DDS::ALIVE_INSTANCE_STATE) {
898  const DDS::Time_t now = timestamp.to_dds_time();
900  header.publication_id_ = publication_id;
901  header.source_timestamp_sec_ = now.sec;
902  header.source_timestamp_nanosec_ = now.nanosec;
903  const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
905  header.message_id_ = static_cast<char>(msg);
906  bool just_registered, filtered;
907  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
908  get_key_value(*data, instance);
909  store_instance_data(move(data), publication_handle, header, si, just_registered, filtered);
910  if (!filtered) {
911  notify_read_conditions();
912  }
913  }
914  }
915 
918  {
919  //!!! caller should already have the sample_lock_
920  const bool encapsulated = sample.header_.cdr_encapsulation_;
921  Message_Block_Ptr payload(sample.data(&mb_alloc_));
923  payload.get(),
925  static_cast<Endianness>(sample.header_.byte_order_));
926 
927  if (encapsulated) {
928  EncapsulationHeader encap;
929  if (!(ser >> encap)) {
930  if (DCPS_debug_level > 0) {
931  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
932  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
933  ACE_TEXT("deserialization of encapsulation header failed.\n"),
934  TraitsType::type_name()));
935  }
936  return;
937  }
939  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
940  return;
941  }
942 
943  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
944  if (DCPS_debug_level >= 1) {
945  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
946  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
947  ACE_TEXT("Encoding kind of the received sample (%C) does not ")
948  ACE_TEXT("match the ones specified by DataReader.\n"),
949  TraitsType::type_name(),
950  Encoding::kind_to_string(encoding.kind()).c_str()));
951  }
952  return;
953  }
954  if (DCPS_debug_level >= 8) {
955  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
956  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
957  ACE_TEXT("Deserializing with encoding kind %C.\n"),
958  TraitsType::type_name(),
959  Encoding::kind_to_string(encoding.kind()).c_str()));
960  }
961 
962  ser.encoding(encoding);
963  }
964 
965  bool ser_ret = true;
966  MessageType data;
967  if (sample.header_.key_fields_only_) {
968  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(data);
969  } else {
970  ser_ret = ser >> data;
971  }
972  if (!ser_ret) {
973  if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
974  if (DCPS_debug_level > 1) {
975  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
976  ACE_TEXT("object construction failure, dropping sample.\n"),
977  TraitsType::type_name()));
978  }
979  } else {
980  if (DCPS_debug_level > 0) {
981  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
982  ACE_TEXT("deserialization failed.\n"),
983  TraitsType::type_name()));
984  }
985  }
986  return;
987  }
988 
990  typename InstanceMap::const_iterator const it = instance_map_.find(data);
991  if (it != instance_map_.end()) {
992  handle = it->second;
993  }
994 
995  if (handle == DDS::HANDLE_NIL) {
996  instance.reset();
997  } else {
998  instance = get_handle_instance(handle);
999  }
1000  }
1001 
1002  virtual void qos_change(const DDS::DataReaderQos& qos)
1003  {
1004  // reliability is not changeable, just time_based_filter
1006  if (qos.time_based_filter.minimum_separation != qos_.time_based_filter.minimum_separation) {
1008  if (qos_.time_based_filter.minimum_separation != zero) {
1009  if (qos.time_based_filter.minimum_separation != zero) {
1011  const TimeDuration interval(qos_.time_based_filter.minimum_separation);
1012  FilterDelayedSampleQueue queue;
1013 
1014  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1015  for (typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
1016  FilterDelayedSample& sample = pos->second;
1017  sample.expiration_time = now + (interval - (sample.expiration_time - now));
1018  queue.insert(std::make_pair(sample.expiration_time, pos->first));
1019  }
1020  std::swap(queue, filter_delayed_sample_queue_);
1021 
1022  if (!filter_delayed_sample_queue_.empty()) {
1023  filter_delayed_sample_task_->cancel();
1024  filter_delayed_sample_task_->schedule(interval);
1025  }
1026 
1027  } else {
1028  filter_delayed_sample_task_->cancel();
1029  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1030  filter_delayed_sample_map_.clear();
1031  filter_delayed_sample_queue_.clear();
1032  }
1033  }
1034  // else no existing timers to change/cancel
1035  }
1036  // else no qos change so nothing to change
1037  }
1038 
1040  }
1041 
1043  {
1044  marshal_skip_serialize_ = value;
1045  }
1046 
1048  {
1049  return marshal_skip_serialize_;
1050  }
1051 
1053  {
1054  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
1055 
1056  const typename InstanceMap::iterator end = instance_map_.end();
1057  typename InstanceMap::iterator it = instance_map_.begin();
1058  while (it != end) {
1059  const DDS::InstanceHandle_t handle = it->second;
1060  ++it; // it will be invalid, so iterate now.
1061  release_instance(handle);
1062  }
1063  }
1064 
1065 protected:
1066 
1068  DDS::InstanceHandle_t publication_handle,
1070  bool& just_registered,
1071  bool& filtered,
1072  OpenDDS::DCPS::MarshalingType marshaling_type,
1073  bool full_copy)
1074  {
1075  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
1076  dynamic_hook(*data);
1077  RcHandle<MessageHolder> message_holder;
1078 
1079  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1080  if (marshal_skip_serialize_) {
1081  if (!MarshalTraitsType::from_message_block(*data, *payload)) {
1082  if (DCPS_debug_level > 0) {
1083  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::dds_demarshal: ")
1084  ACE_TEXT("attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
1085  }
1086  return message_holder;
1087  }
1088  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1089  return message_holder;
1090  }
1091  const bool encapsulated = sample.header_.cdr_encapsulation_;
1092 
1094  payload.get(),
1096  static_cast<Endianness>(sample.header_.byte_order_));
1097 
1098  if (encapsulated) {
1099  EncapsulationHeader encap;
1100  if (!(ser >> encap)) {
1101  if (DCPS_debug_level > 0) {
1102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1103  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1104  ACE_TEXT("deserialization of encapsulation header failed.\n"),
1105  TraitsType::type_name()));
1106  }
1107  return message_holder;
1108  }
1110  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
1111  return message_holder;
1112  }
1113 
1114  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
1115  if (DCPS_debug_level >= 1) {
1116  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
1117  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1118  ACE_TEXT("Encoding kind %C of the received sample does not ")
1119  ACE_TEXT("match the ones specified by DataReader.\n"),
1120  TraitsType::type_name(),
1121  Encoding::kind_to_string(encoding.kind()).c_str()));
1122  }
1123  return message_holder;
1124  }
1125  if (DCPS_debug_level >= 8) {
1126  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
1127  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1128  ACE_TEXT("Deserializing with encoding kind %C.\n"),
1129  TraitsType::type_name(),
1130  Encoding::kind_to_string(encoding.kind()).c_str()));
1131  }
1132 
1133  ser.encoding(encoding);
1134  }
1135 
1136  const bool key_only_marshaling =
1137  marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING;
1138 
1139  bool ser_ret = true;
1140  if (key_only_marshaling) {
1141  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(*data);
1142  } else {
1143  ser_ret = ser >> *data;
1144  if (full_copy) {
1145  message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
1146  }
1147  }
1148  if (!ser_ret) {
1150  if (DCPS_debug_level > 1) {
1151  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
1152  ACE_TEXT("object construction failure, dropping sample.\n"),
1153  TraitsType::type_name()));
1154  }
1155  } else {
1156  if (DCPS_debug_level > 0) {
1157  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR %CDataReaderImpl::dds_demarshal ")
1158  ACE_TEXT("deserialization failed, dropping sample.\n"),
1159  TraitsType::type_name()));
1160  }
1161  }
1162  return message_holder;
1163  }
1164 
1165 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1166  /*
1167  * If sample.header_.content_filter_ is true, the writer has already
1168  * filtered.
1169  */
1170  if (!sample.header_.content_filter_) {
1171  ACE_Guard<ACE_Thread_Mutex> guard(content_filtered_topic_mutex_);
1172  if (content_filtered_topic_) {
1173  const bool sample_only_has_key_fields = !sample.header_.valid_data();
1174  if (key_only_marshaling != sample_only_has_key_fields) {
1175  if (DCPS_debug_level > 0) {
1176  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1177  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1178  ACE_TEXT("Mismatch between the key only and valid data properties ")
1179  ACE_TEXT("of a %C message of a content filtered topic!\n"),
1180  TraitsType::type_name(),
1181  to_string(static_cast<MessageId>(sample.header_.message_id_))));
1182  }
1183  filtered = true;
1184  message_holder.reset();
1185  return message_holder;
1186  }
1187  const MessageType& type = static_cast<MessageType&>(*data);
1188  if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
1189  filtered = true;
1190  message_holder.reset();
1191  return message_holder;
1192  }
1193  }
1194  }
1195 #endif
1196 
1197  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1198  return message_holder;
1199  }
1200 
1202  DDS::InstanceHandle_t publication_handle,
1204  {
1205  //!!! caller should already have the sample_lock_
1206 
1207  // The data sample in this dispose message does not contain any valid data.
1208  // What it needs here is the key value to identify the instance to dispose.
1209  // The demarshal push this "sample" to received sample list so the user
1210  // can be notified the dispose event.
1211  bool just_registered = false;
1212  bool filtered = false;
1214  if (sample.header_.key_fields_only_) {
1216  }
1217  dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling, false);
1218  }
1219 
1221  {
1222  drop_sample(instance->instance_handle_);
1223 
1224 
1225  instance->instance_state_->cancel_release();
1226 
1227  while (instance->rcvd_samples_.size() > 0)
1228  {
1230  instance->rcvd_samples_.remove_head();
1231  head->dec_ref();
1232  }
1233  }
1234 
1236  {
1237  const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
1238  if (pos != reverse_instance_map_.end()) {
1239  remove_from_lookup_maps(handle);
1240  instance_map_.erase(pos->second);
1241  reverse_instance_map_.erase(pos);
1242  }
1243  }
1244 
1246  {
1247  const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
1248  if (pos != instances_.end()) {
1249  update_lookup_maps(pos);
1250  }
1251  }
1252 
1253 private:
1254 
1255  /// Available for specialization so that some types of MessageType can observe and
1256  /// change the sample before dds_demarshal deserializes into it
1257  void dynamic_hook(MessageType&) {}
1258 
1260  DDS::InstanceHandle_t publication_handle,
1263  {
1264 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
1265  const bool is_dispose_msg =
1268 
1269  if (!is_bit() && security_config_) {
1270  if (header.message_id_ == SAMPLE_DATA ||
1271  header.message_id_ == INSTANCE_REGISTRATION) {
1272 
1273  // Pubulisher has already gone through the check.
1274  if (instance_ptr &&
1275  instance_ptr->instance_state_ &&
1276  instance_ptr->instance_state_->writes_instance(header.publication_id_)) {
1277  return true;
1278  }
1279 
1281  const GUID_t local_participant = make_part_guid(get_guid());
1282  const GUID_t remote_participant = make_part_guid(header.publication_id_);
1283  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1284  // Construct a DynamicData around the deserialized sample.
1285  DDS::DynamicData_var dda =
1286  XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
1287  // The remote participant might not be using security.
1288  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1289  !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1290  if (log_level >= LogLevel::Warning) {
1292  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
1293  ex.code, ex.minor_code, ex.message.in()));
1294  }
1295  return false;
1296  }
1297  } else if (is_dispose_msg) {
1298 
1300  const GUID_t local_participant = make_part_guid(get_guid());
1301  const GUID_t remote_participant = make_part_guid(header.publication_id_);
1302  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1303  // Construct a DynamicData around the deserialized sample.
1304  DDS::DynamicData_var dda =
1305  XTypes::get_dynamic_data_adapter(dynamic_type_, *instance_data->message());
1306  // The remote participant might not be using security.
1307  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1308  !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1309  if (log_level >= LogLevel::Warning) {
1311  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
1312  ex.code, ex.minor_code, ex.message.in()));
1313  }
1314  return false;
1315  }
1316  }
1317  }
1318 #else
1319  ACE_UNUSED_ARG(instance_data);
1320  ACE_UNUSED_ARG(publication_handle);
1321  ACE_UNUSED_ARG(header);
1322  ACE_UNUSED_ARG(instance_ptr);
1323 #endif
1324 
1325  return true;
1326  }
1327 
1328  DDS::ReturnCode_t read_i(MessageSequenceType& received_data,
1334 #ifndef OPENDDS_NO_QUERY_CONDITION
1335  DDS::QueryCondition_ptr a_condition)
1336 #else
1337  int)
1338 #endif
1339 {
1340 
1341  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1342 
1343 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1344  if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
1346  }
1347 
1348  const bool group_coherent_ordered =
1349  subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
1350  && subqos_.presentation.coherent_access
1351  && subqos_.presentation.ordered_access;
1352 
1353  if (group_coherent_ordered && coherent_) {
1354  max_samples = 1;
1355  }
1356 #endif
1357 
1358  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1359 #ifndef OPENDDS_NO_QUERY_CONDITION
1360  a_condition,
1361 #endif
1363 
1364  const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
1365 
1366 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1367  if (!group_coherent_ordered) {
1368 #endif
1369  const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1370  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1371  ++next; // pre-increment iterator, in case updates cause changes to match set
1372  const DDS::InstanceHandle_t handle = *it;
1373  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1374  if (!inst) continue;
1375 
1376  size_t i(0);
1377  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1378  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1379  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1380 
1381  const ValueDispatcher* vd = get_value_dispatcher();
1382  if (observer && item->registered_data_ && vd) {
1383  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1384  observer->on_sample_read(this, s);
1385  }
1386  }
1387  }
1388 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1389  } else {
1390  const RakeData item = group_coherent_ordered_data_.get_data();
1391  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1392  const ValueDispatcher* vd = get_value_dispatcher();
1393  if (observer && item.rde_->registered_data_ && vd) {
1394  typename InstanceMap::iterator i = instance_map_.begin();
1395  const DDS::InstanceHandle_t handle = (i != instance_map_.end()) ? i->second : DDS::HANDLE_NIL;
1396  Observer::Sample s(handle, item.si_->instance_state_->instance_state(), *item.rde_, *vd);
1397  observer->on_sample_read(this, s);
1398  }
1399  }
1400 #endif
1401 
1402  results.copy_to_user();
1403 
1405  if (received_data.length()) {
1406  ret = DDS::RETCODE_OK;
1407  if (received_data.maximum() == 0) { // using ZeroCopy
1408  received_data_p.set_loaner(this);
1409  }
1410  }
1411 
1412  post_read_or_take();
1413  return ret;
1414 }
1415 
1416 DDS::ReturnCode_t take_i(MessageSequenceType& received_data,
1417  DDS::SampleInfoSeq& info_seq,
1418  CORBA::Long max_samples,
1419  DDS::SampleStateMask sample_states,
1420  DDS::ViewStateMask view_states,
1421  DDS::InstanceStateMask instance_states,
1422 #ifndef OPENDDS_NO_QUERY_CONDITION
1423  DDS::QueryCondition_ptr a_condition)
1424 #else
1425  int)
1426 #endif
1427 {
1428  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1429 
1430 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1431  if (subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS && !coherent_) {
1433  }
1434 
1435  const bool group_coherent_ordered =
1436  subqos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS
1437  && subqos_.presentation.coherent_access
1438  && subqos_.presentation.ordered_access;
1439 
1440  if (group_coherent_ordered && coherent_) {
1441  max_samples = 1;
1442  }
1443 #endif
1444 
1445  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1446 #ifndef OPENDDS_NO_QUERY_CONDITION
1447  a_condition,
1448 #endif
1450 
1451  const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
1452 
1453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1454  if (!group_coherent_ordered) {
1455 #endif
1456  const HandleSet& matches = lookup_matching_instances(sample_states, view_states, instance_states);
1457  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1458  ++next; // pre-increment iterator, in case updates cause changes to match set
1459  const DDS::InstanceHandle_t handle = *it;
1460  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1461  if (!inst) continue;
1462 
1463  size_t i(0);
1464  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1465  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1466  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1467 
1468  const ValueDispatcher* vd = get_value_dispatcher();
1469  if (observer && item->registered_data_ && vd) {
1470  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1471  observer->on_sample_taken(this, s);
1472  }
1473  }
1474  }
1475 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1476  } else {
1477  const RakeData item = group_coherent_ordered_data_.get_data();
1478  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1479  }
1480 #endif
1481 
1482  results.copy_to_user();
1483 
1485  if (received_data.length()) {
1486  ret = DDS::RETCODE_OK;
1487  if (received_data.maximum() == 0) { // using ZeroCopy
1488  received_data_p.set_loaner(this);
1489  }
1490  }
1491 
1492  post_read_or_take();
1493  return ret;
1494 }
1495 
1496 DDS::ReturnCode_t read_instance_i(MessageSequenceType& received_data,
1497  DDS::SampleInfoSeq& info_seq,
1498  CORBA::Long max_samples,
1499  DDS::InstanceHandle_t a_handle,
1500  DDS::SampleStateMask sample_states,
1501  DDS::ViewStateMask view_states,
1502  DDS::InstanceStateMask instance_states,
1503 #ifndef OPENDDS_NO_QUERY_CONDITION
1504  DDS::QueryCondition_ptr a_condition)
1505 #else
1506  int)
1507 #endif
1508 {
1509  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1510  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1511 
1512  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1513 
1514  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1515 #ifndef OPENDDS_NO_QUERY_CONDITION
1516  a_condition,
1517 #endif
1519 
1520  const InstanceState_rch state_obj = inst->instance_state_;
1521  if (state_obj->match(view_states, instance_states)) {
1522  const Observer_rch observer = get_observer(Observer::e_SAMPLE_READ);
1523  size_t i(0);
1524  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1525  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1526  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1527  const ValueDispatcher* vd = get_value_dispatcher();
1528  if (observer && item->registered_data_ && vd) {
1529  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1530  observer->on_sample_read(this, s);
1531  }
1532  }
1533  } else if (DCPS_debug_level >= 8) {
1534  OPENDDS_STRING msg;
1535  if ((state_obj->view_state() & view_states) == 0) {
1536  msg = "view state is not valid";
1537  }
1538  if ((state_obj->instance_state() & instance_states) == 0) {
1539  if (!msg.empty()) msg += " and ";
1540  msg += "instance state is ";
1541  msg += state_obj->instance_state_string();
1542  msg += " while the validity mask is " + InstanceState::instance_state_mask_string(instance_states);
1543  }
1544  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl_T::read_instance_i: ")
1545  ACE_TEXT("will return no data reading sub %C because:\n %C\n"),
1546  LogGuid(get_guid()).c_str(), msg.c_str()));
1547  }
1548 
1549  results.copy_to_user();
1550 
1552  if (received_data.length()) {
1553  ret = DDS::RETCODE_OK;
1554  if (received_data.maximum() == 0) { // using ZeroCopy
1555  received_data_p.set_loaner(this);
1556  }
1557  }
1558 
1559  post_read_or_take();
1560  return ret;
1561 }
1562 
1563 DDS::ReturnCode_t take_instance_i(MessageSequenceType& received_data,
1564  DDS::SampleInfoSeq& info_seq,
1565  CORBA::Long max_samples,
1566  DDS::InstanceHandle_t a_handle,
1567  DDS::SampleStateMask sample_states,
1568  DDS::ViewStateMask view_states,
1569  DDS::InstanceStateMask instance_states,
1570 #ifndef OPENDDS_NO_QUERY_CONDITION
1571  DDS::QueryCondition_ptr a_condition)
1572 #else
1573  int)
1574 #endif
1575 {
1576  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1577  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1578 
1579  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1580 
1581  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1582 #ifndef OPENDDS_NO_QUERY_CONDITION
1583  a_condition,
1584 #endif
1586 
1587  const InstanceState_rch state_obj = inst->instance_state_;
1588  if (state_obj->match(view_states, instance_states)) {
1589  const Observer_rch observer = get_observer(Observer::e_SAMPLE_TAKEN);
1590  size_t i(0);
1591  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1592  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1593  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1594  const ValueDispatcher* vd = get_value_dispatcher();
1595  if (observer && item->registered_data_ && vd) {
1596  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1597  observer->on_sample_taken(this, s);
1598  }
1599  }
1600  }
1601 
1602  results.copy_to_user();
1603 
1605  if (received_data.length()) {
1606  ret = DDS::RETCODE_OK;
1607  if (received_data.maximum() == 0) { // using ZeroCopy
1608  received_data_p.set_loaner(this);
1609  }
1610  }
1611 
1612  post_read_or_take();
1613  return ret;
1614 }
1615 
1616 DDS::ReturnCode_t read_next_instance_i(MessageSequenceType& received_data,
1617  DDS::SampleInfoSeq& info_seq,
1618  CORBA::Long max_samples,
1619  DDS::InstanceHandle_t a_handle,
1620  DDS::SampleStateMask sample_states,
1621  DDS::ViewStateMask view_states,
1622  DDS::InstanceStateMask instance_states,
1623 #ifndef OPENDDS_NO_QUERY_CONDITION
1624  DDS::QueryCondition_ptr a_condition)
1625 #else
1626  int)
1627 #endif
1628 {
1630 
1631  typename InstanceMap::iterator it = instance_map_.begin();
1632  const typename InstanceMap::iterator the_end = instance_map_.end();
1633  if (a_handle != DDS::HANDLE_NIL) {
1634  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1635  if (pos != reverse_instance_map_.end()) {
1636  it = pos->second;
1637  ++it;
1638  } else {
1639  it = the_end;
1640  }
1641  }
1642 
1644  for (; it != the_end; ++it) {
1645  handle = it->second;
1646  const DDS::ReturnCode_t status =
1647  read_instance_i(received_data, info_seq, max_samples, handle,
1648  sample_states, view_states, instance_states,
1649 #ifndef OPENDDS_NO_QUERY_CONDITION
1650  a_condition);
1651 #else
1652  0);
1653 #endif
1654  if (status != DDS::RETCODE_NO_DATA) {
1655  post_read_or_take();
1656  return status;
1657  }
1658  }
1659 
1660  post_read_or_take();
1661  return DDS::RETCODE_NO_DATA;
1662 }
1663 
1664 DDS::ReturnCode_t take_next_instance_i(MessageSequenceType& received_data,
1665  DDS::SampleInfoSeq& info_seq,
1666  CORBA::Long max_samples,
1667  DDS::InstanceHandle_t a_handle,
1668  DDS::SampleStateMask sample_states,
1669  DDS::ViewStateMask view_states,
1670  DDS::InstanceStateMask instance_states,
1671 #ifndef OPENDDS_NO_QUERY_CONDITION
1672  DDS::QueryCondition_ptr a_condition)
1673 #else
1674  int)
1675 #endif
1676 {
1678 
1679  typename InstanceMap::iterator it = instance_map_.begin();
1680  const typename InstanceMap::iterator the_end = instance_map_.end();
1681  if (a_handle != DDS::HANDLE_NIL) {
1682  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1683  if (pos != reverse_instance_map_.end()) {
1684  it = pos->second;
1685  ++it;
1686  } else {
1687  it = the_end;
1688  }
1689  }
1690 
1692  for (; it != the_end; ++it) {
1693  handle = it->second;
1694  const DDS::ReturnCode_t status =
1695  take_instance_i(received_data, info_seq, max_samples, handle,
1696  sample_states, view_states, instance_states,
1697 #ifndef OPENDDS_NO_QUERY_CONDITION
1698  a_condition);
1699 #else
1700  0);
1701 #endif
1702  if (status != DDS::RETCODE_NO_DATA) {
1703  total_samples(); // see if we are empty
1704  post_read_or_take();
1705  return status;
1706  }
1707  }
1708 
1709  post_read_or_take();
1710  return DDS::RETCODE_NO_DATA;
1711 }
1712 
1714  DDS::InstanceHandle_t publication_handle,
1717  bool& just_registered,
1718  bool& filtered)
1719 {
1720  ACE_UNUSED_ARG(publication_handle);
1721 
1722  const bool is_dispose_msg =
1725  const bool is_unregister_msg =
1728 
1729  if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
1730  return;
1731  }
1732 
1733  // not filtering any data, except what is specifically identified as filtered below
1734  filtered = false;
1735 
1737 
1738  //!!! caller should already have the sample_lock_
1739  //We will unlock it before calling into listeners
1740 
1741  typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
1742 
1743  if (it == instance_map_.end()) {
1744  if (is_dispose_msg || is_unregister_msg) {
1745  return;
1746  }
1747 
1748  std::size_t instances_size = 0;
1749  {
1750  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1751  instances_size = instances_.size();
1752  }
1753  if ((qos_.resource_limits.max_instances != DDS::LENGTH_UNLIMITED) &&
1754  ((::CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
1755  {
1756  DDS::DataReaderListener_var listener
1757  = listener_for (DDS::SAMPLE_REJECTED_STATUS);
1758 
1759  set_status_changed_flag (DDS::SAMPLE_REJECTED_STATUS, true);
1760 
1761  sample_rejected_status_.last_reason = DDS::REJECTED_BY_INSTANCES_LIMIT;
1762  ++sample_rejected_status_.total_count;
1763  ++sample_rejected_status_.total_count_change;
1764  sample_rejected_status_.last_instance_handle = handle;
1765 
1766  if (!CORBA::is_nil(listener.in()))
1767  {
1768  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
1769 
1770  listener->on_sample_rejected(this, sample_rejected_status_);
1771  sample_rejected_status_.total_count_change = 0;
1772  } // do we want to do something if listener is nil???
1773  notify_status_condition_no_sample_lock();
1774 
1775  return;
1776  }
1777 
1778  {
1779  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1780 
1781 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1782  SharedInstanceMap_rch inst;
1783  OwnershipManagerScopedAccess ownership_scoped_access;
1784  OwnershipManagerPtr owner_manager = ownership_manager();
1785 
1786  bool new_handle = true;
1787  if (is_exclusive_ownership_) {
1788  OwnershipManagerScopedAccess temp(owner_manager);
1789  temp.swap(ownership_scoped_access);
1790  if (!owner_manager || ownership_scoped_access.lock_result_ != 0) {
1791  if (DCPS_debug_level > 0) {
1792  ACE_ERROR ((LM_ERROR,
1793  ACE_TEXT("(%P|%t) ")
1794  ACE_TEXT("%CDataReaderImpl::")
1795  ACE_TEXT("store_instance_data, ")
1796  ACE_TEXT("acquire instance_lock failed.\n"), TraitsType::type_name()));
1797  }
1798  return;
1799  }
1800 
1801  inst = dynamic_rchandle_cast<SharedInstanceMap>(
1802  owner_manager->get_instance_map(topic_servant_->type_name(), this));
1803  if (inst != 0) {
1804  typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
1805  if (iter != inst->end ()) {
1806  handle = iter->second;
1807  new_handle = false;
1808  }
1809  }
1810  }
1811 #endif
1812 
1813  just_registered = true;
1814  DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
1815  bool owns_handle = false;
1816  if (handle == DDS::HANDLE_NIL) {
1817  handle = get_next_handle(key);
1818  owns_handle = true;
1819  }
1821  OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
1822  rchandle_from(this),
1823  qos_,
1824  ref(instances_lock_),
1825  handle, owns_handle);
1826 
1827  const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
1828  instances_.insert(typename SubscriptionInstanceMapType::value_type(handle, instance));
1829 
1830  if (bpair.second == false) {
1831  if (DCPS_debug_level > 0) {
1833  ACE_TEXT("(%P|%t) ")
1834  ACE_TEXT("%CDataReaderImpl::")
1835  ACE_TEXT("store_instance_data, ")
1836  ACE_TEXT("insert handle failed.\n"), TraitsType::type_name()));
1837  }
1838  return;
1839  }
1840  update_lookup_maps(bpair.first);
1841 
1842 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1843  if (owner_manager) {
1844  if (!inst) {
1845  inst = make_rch<SharedInstanceMap>();
1846  owner_manager->set_instance_map(
1847  topic_servant_->type_name(),
1848  inst,
1849  this);
1850  }
1851 
1852  if (new_handle) {
1853  const std::pair<typename InstanceMap::iterator, bool> bpair =
1854  inst->insert(typename InstanceMap::value_type(*instance_data, handle));
1855  if (!bpair.second) {
1856  if (DCPS_debug_level > 0) {
1857  ACE_ERROR ((LM_ERROR,
1858  ACE_TEXT("(%P|%t) ")
1859  ACE_TEXT("%CDataReaderImpl::")
1860  ACE_TEXT("store_instance_data, ")
1861  ACE_TEXT("insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1862  }
1863  return;
1864  }
1865  }
1866 
1868  temp.swap(ownership_scoped_access);
1869  if (temp.release() != 0) {
1870  if (DCPS_debug_level > 0) {
1871  ACE_ERROR ((LM_ERROR,
1872  ACE_TEXT("(%P|%t) ")
1873  ACE_TEXT("%CDataReaderImpl::")
1874  ACE_TEXT("store_instance_data, ")
1875  ACE_TEXT("release instance_lock failed.\n"), TraitsType::type_name()));
1876  }
1877  return;
1878  }
1879  }
1880 #endif
1881  } // scope for instances_lock_
1882 
1883  std::pair<typename InstanceMap::iterator, bool> bpair =
1884  instance_map_.insert(typename InstanceMap::value_type(*instance_data,
1885  handle));
1886  if (bpair.second == false)
1887  {
1888  if (DCPS_debug_level > 0) {
1889  ACE_ERROR ((LM_ERROR,
1890  ACE_TEXT("(%P|%t) ")
1891  ACE_TEXT("%CDataReaderImpl::")
1892  ACE_TEXT("store_instance_data, ")
1893  ACE_TEXT("insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1894  }
1895  return;
1896  }
1897  reverse_instance_map_[handle] = bpair.first;
1898  }
1899  else
1900  {
1901  just_registered = false;
1902  handle = it->second;
1903  }
1904 
1906  {
1907  instance_ptr = get_handle_instance(handle);
1908  OPENDDS_ASSERT(instance_ptr);
1909 
1911  {
1912  {
1913  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1914  filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
1915  }
1916 
1917  MonotonicTimePoint now;
1918  MonotonicTimePoint deadline;
1919  if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
1920  filtered = true;
1921  if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1922  delay_sample(handle, move(instance_data), header, just_registered, now, deadline);
1923  }
1924  } else {
1925  // nothing time based filtered now
1926  clear_sample(handle);
1927 
1928  }
1929 
1930  if (filtered) {
1931  return;
1932  }
1933  }
1934 
1935  finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
1936  }
1937  else
1938  {
1939  instance_ptr = get_handle_instance(handle);
1940  OPENDDS_ASSERT(instance_ptr);
1941  instance_ptr->instance_state_->lively(header.publication_id_);
1942  }
1943 }
1944 
1946  SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg )
1947 {
1948  if ((qos_.resource_limits.max_samples_per_instance !=
1950  (instance_ptr->rcvd_samples_.size() >=
1951  static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
1952 
1953  // According to spec 1.2, Samples that contain no data do not
1954  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
1955  // so do not remove the oldest sample when unregister/dispose
1956  // message arrives.
1957 
1958  if (!is_dispose_msg && !is_unregister_msg
1959  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
1960  {
1961  DDS::DataReaderListener_var listener
1962  = listener_for(DDS::SAMPLE_REJECTED_STATUS);
1963 
1964  set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
1965 
1966  sample_rejected_status_.last_reason =
1968  ++sample_rejected_status_.total_count;
1969  ++sample_rejected_status_.total_count_change;
1970  sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
1971 
1972  if (!CORBA::is_nil(listener.in()))
1973  {
1974  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
1975 
1976  listener->on_sample_rejected(this, sample_rejected_status_);
1977  sample_rejected_status_.total_count_change = 0;
1978  } // do we want to do something if listener is nil???
1979  notify_status_condition_no_sample_lock();
1980  return;
1981  }
1982  else if (!is_dispose_msg && !is_unregister_msg)
1983  {
1984  // Discard the oldest previously-read sample
1986  instance_ptr->rcvd_samples_.remove_head();
1987  item->dec_ref();
1988  }
1989  }
1990  else if (qos_.resource_limits.max_samples != DDS::LENGTH_UNLIMITED)
1991  {
1992  CORBA::Long total_samples = 0;
1993  {
1994  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, instances_lock_);
1995  for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
1996  iter != instances_.end();
1997  ++iter) {
1998  OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
1999 
2000  total_samples += (CORBA::Long) ptr->rcvd_samples_.size();
2001  }
2002  }
2003 
2004  if (total_samples >= qos_.resource_limits.max_samples)
2005  {
2006  // According to spec 1.2, Samples that contain no data do not
2007  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
2008  // so do not remove the oldest sample when unregister/dispose
2009  // message arrives.
2010 
2011  if (!is_dispose_msg && !is_unregister_msg
2012  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
2013  {
2014  DDS::DataReaderListener_var listener
2015  = listener_for(DDS::SAMPLE_REJECTED_STATUS);
2016 
2017  set_status_changed_flag(DDS::SAMPLE_REJECTED_STATUS, true);
2018 
2019  sample_rejected_status_.last_reason =
2021  ++sample_rejected_status_.total_count;
2022  ++sample_rejected_status_.total_count_change;
2023  sample_rejected_status_.last_instance_handle = instance_ptr->instance_handle_;
2024  if (!CORBA::is_nil(listener.in()))
2025  {
2026  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2027 
2028  listener->on_sample_rejected(this, sample_rejected_status_);
2029  sample_rejected_status_.total_count_change = 0;
2030  } // do we want to do something if listener is nil???
2031  notify_status_condition_no_sample_lock();
2032 
2033  return;
2034  }
2035  else if (!is_dispose_msg && !is_unregister_msg)
2036  {
2037  // Discard the oldest previously-read sample
2039  instance_ptr->rcvd_samples_.remove_head();
2040  item->dec_ref();
2041  }
2042  }
2043  }
2044 
2045  bool event_notify = false;
2046 
2047  if (is_dispose_msg) {
2048  event_notify = instance_ptr->instance_state_->dispose_was_received(header.publication_id_);
2049  }
2050 
2051  if (is_unregister_msg) {
2052  if (instance_ptr->instance_state_->unregister_was_received(header.publication_id_)) {
2053  event_notify = true;
2054  }
2055  }
2056 
2057  if (!is_dispose_msg && !is_unregister_msg) {
2058  event_notify = true;
2059  instance_ptr->instance_state_->data_was_received(header.publication_id_);
2060  }
2061 
2062  if (!event_notify) {
2063  return;
2064  }
2065 
2066  ReceivedDataElement* const ptr =
2067  new (*rd_allocator_.get()) ReceivedDataElementWithType<MessageTypeWithAllocator>(
2068  header, instance_data.release(), &sample_lock_);
2069 
2071  instance_ptr->instance_state_->disposed_generation_count();
2074 
2075  instance_ptr->last_sequence_ = header.sequence_;
2076 
2077  instance_ptr->rcvd_strategy_->add(ptr);
2078 
2079  if (! is_dispose_msg && ! is_unregister_msg
2080  && instance_ptr->rcvd_samples_.size() > get_depth())
2081  {
2083  instance_ptr->rcvd_samples_.remove_head();
2084 
2085  if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
2086  {
2087  DDS::DataReaderListener_var listener
2088  = listener_for (DDS::SAMPLE_LOST_STATUS);
2089 
2090  ++sample_lost_status_.total_count;
2091  ++sample_lost_status_.total_count_change;
2092 
2093  set_status_changed_flag(DDS::SAMPLE_LOST_STATUS, true);
2094 
2095  if (!CORBA::is_nil(listener.in()))
2096  {
2097  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2098 
2099  listener->on_sample_lost(this, sample_lost_status_);
2100 
2101  sample_lost_status_.total_count_change = 0;
2102  }
2103 
2104  notify_status_condition_no_sample_lock();
2105  }
2106 
2107  head_ptr->dec_ref();
2108  }
2109 
2110 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2111  if (! ptr->coherent_change_) {
2112 #endif
2113  RcHandle<OpenDDS::DCPS::SubscriberImpl> sub = get_subscriber_servant();
2114  if (!sub || get_deleted())
2115  return;
2116 
2118 
2119  set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, true);
2120 
2121  DDS::SubscriberListener_var sub_listener =
2123  if (!CORBA::is_nil(sub_listener.in()) && !coherent_) {
2124  if (!is_bit()) {
2126  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2127  sub_listener->on_data_on_readers(sub.in());
2128  } else {
2129  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(sub, sub_listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, false));
2130  }
2131  } else {
2132  sub->notify_status_condition();
2133 
2134  DDS::DataReaderListener_var listener =
2135  listener_for (DDS::DATA_AVAILABLE_STATUS);
2136 
2137  if (!CORBA::is_nil(listener.in())) {
2138  if (!is_bit()) {
2139  set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
2141  sub.reset();
2142  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2143  listener->on_data_available(this);
2144  } else {
2145  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, true, true));
2146  }
2147  } else {
2148  notify_status_condition_no_sample_lock();
2149  }
2150  }
2151 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2152  }
2153 #endif
2154 }
2155 
2156 /// Release sample_lock_ during status notifications in store_instance_data()
2157 /// as the lock is not needed and could cause deadlock condition.
2158 /// See comments in member function implementation for details.
2160 {
2161  // This member function avoids a deadlock condition which otherwise
2162  // could occur as follows:
2163  // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
2164  // eventually DataReaderImpl::sample_lock_ to lock in call to
2165  // DataReaderImpl::contains_samples().
2166  // Thread2: Call to DataReaderImpl::data_received()
2167  // causes DataReaderImpl::sample_lock_ to lock and eventually
2168  // during notify of status condition a call to WaitSet::signal()
2169  // causes WaitSet::lock_ to lock.
2170  // Because the DataReaderImpl::sample_lock_ is not needed during
2171  // status notification this member function is used in
2172  // store_instance_data() to release sample_lock_ before making
2173  // the notification.
2174  ACE_GUARD(typename DataReaderImpl::Reverse_Lock_t, unlock_guard, reverse_sample_lock_);
2175  notify_status_condition();
2176 }
2177 
2178 
2179 /// Common input read* & take* input processing and precondition checks
2180 DDS::ReturnCode_t check_inputs(const char* method_name,
2181  MessageSequenceType& received_data,
2182  DDS::SampleInfoSeq& info_seq,
2183  ::CORBA::Long max_samples)
2184 {
2185  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
2186 
2187  // ---- start of preconditions common to read and take -----
2188  // SPEC ref v1.2 7.1.2.5.3.8 #1
2189  // NOTE: We can't check maximum() or release() here since those are
2190  // implementation details of the sequences. In general, the
2191  // info_seq will have release() == true and maximum() == 0.
2192  // If we're in zero-copy mode, the received_data will have
2193  // release() == false and maximum() == 0. If it's not
2194  // zero-copy then received_data will have release == true()
2195  // and maximum() == anything.
2196  if (received_data.length() != info_seq.length())
2197  {
2199  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2200  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
2201  ACE_TEXT("sequences do not match.\n"),
2202  TraitsType::type_name(),
2203  method_name ));
2205  }
2206 
2207  //SPEC ref v1.2 7.1.2.5.3.8 #4
2208  if ((received_data.maximum() > 0) && (received_data.release() == false))
2209  {
2211  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2212  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
2213  ACE_TEXT("maximum %d and owns %d\n"),
2214  TraitsType::type_name(),
2215  method_name,
2216  received_data.maximum(),
2217  received_data.release() ));
2218 
2220  }
2221 
2222  if (received_data.maximum() == 0)
2223  {
2224  // not in SPEC but needed.
2225  if (max_samples == DDS::LENGTH_UNLIMITED)
2226  {
2227  max_samples =
2228  static_cast< ::CORBA::Long> (received_data_p.max_slots());
2229  }
2230  }
2231  else
2232  {
2233  if (max_samples == DDS::LENGTH_UNLIMITED)
2234  {
2235  //SPEC ref v1.2 7.1.2.5.3.8 #5a
2236  max_samples = received_data.maximum();
2237  }
2238  else if (
2239  max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
2240  {
2241  //SPEC ref v1.2 7.1.2.5.3.8 #5c
2243  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2244  ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
2245  TraitsType::type_name(),
2246  method_name,
2247  max_samples,
2248  received_data.maximum()));
2250  }
2251  //else
2252  //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
2253  }
2254 
2255  // The spec does not say what to do in this case but it appears to be a good thing.
2256  // Note: max_slots is the greater of the sequence's maximum and init_size.
2257  if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
2258  {
2259  max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
2260  }
2261  //---- end of preconditions common to read and take -----
2262 
2263  return DDS::RETCODE_OK;
2264 }
2265 
2269  const bool just_registered,
2270  const MonotonicTimePoint& now,
2271  const MonotonicTimePoint& deadline)
2272 {
2273  // sample_lock_ should already be held
2275 
2276  typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
2277  if (i == filter_delayed_sample_map_.end()) {
2278 
2279  // emplace()/insert() only if the sample is going to be
2280  // new (otherwise we call move(data) twice).
2281  std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
2282 #ifdef ACE_HAS_CPP11
2283  filter_delayed_sample_map_.emplace(std::piecewise_construct,
2284  std::forward_as_tuple(handle),
2285  std::forward_as_tuple(move(data), hdr, just_registered));
2286 #else
2287  filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
2288 #endif
2289  FilterDelayedSample& sample = result.first->second;
2290  sample.expiration_time = deadline;
2291  const bool schedule = filter_delayed_sample_queue_.empty();
2292  filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
2293  if (schedule) {
2294  filter_delayed_sample_task_->schedule(now - deadline);
2295  } else if (filter_delayed_sample_queue_.begin()->second == handle) {
2296  filter_delayed_sample_task_->cancel();
2297  filter_delayed_sample_task_->schedule(now - deadline);
2298  }
2299  } else {
2300  FilterDelayedSample& sample = i->second;
2301  // we only care about the most recently filtered sample, so clean up the last one
2302 
2303  sample.message = move(data);
2304  sample.header = hdr;
2305  sample.new_instance = just_registered;
2306  // already scheduled for timeout at the desired time
2307  }
2308 }
2309 
2311 {
2312  // sample_lock_ should already be held
2313 
2314  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2315  if (sample != filter_delayed_sample_map_.end()) {
2316  // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
2317  sample->second.message.reset();
2318  }
2319 }
2320 
2322 {
2323  // sample_lock_ should already be held
2324 
2325  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2326  if (sample != filter_delayed_sample_map_.end()) {
2327  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
2328  if (pos->second == handle) {
2329  filter_delayed_sample_queue_.erase(pos);
2330  break;
2331  }
2332  }
2333 
2334  // use the handle to erase, since the sample lock was released
2335  filter_delayed_sample_map_.erase(handle);
2336  }
2337 }
2338 
2340 {
2341  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2342 
2343  // Make a copy because finish_store_instance_data will release the sample lock.
2344  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) Handles;
2345  Handles handles;
2346 
2347  ACE_GUARD(ACE_Recursive_Thread_Mutex, guard, sample_lock_);
2348 
2349  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
2350  handles.push_back(pos->second);
2351  filter_delayed_sample_queue_.erase(pos++);
2352  }
2353 
2354  const TimeDuration interval(qos_.time_based_filter.minimum_separation);
2355 
2356  for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
2357  const DDS::InstanceHandle_t handle = *pos;
2358 
2359  SubscriptionInstance_rch instance = get_handle_instance(handle);
2360  if (!instance) {
2361  continue;
2362  }
2363 
2364  typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
2365  if (data == filter_delayed_sample_map_.end()) {
2366  continue;
2367  }
2368 
2369  if (data->second.message) {
2370  const bool NOT_DISPOSE_MSG = false;
2371  const bool NOT_UNREGISTER_MSG = false;
2372  // clear the message, since ownership is being transferred to finish_store_instance_data.
2373 
2374  instance->last_accepted_.set_to_now();
2375  const DataSampleHeader_ptr header = data->second.header;
2376  const bool new_instance = data->second.new_instance;
2377 
2378  // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
2379  finish_store_instance_data(move(data->second.message),
2380  *header,
2381  instance,
2382  NOT_DISPOSE_MSG,
2383  NOT_UNREGISTER_MSG);
2384 
2385  accept_sample_processing(instance, *header, new_instance);
2386 
2387  // Refresh the iterator.
2388  data = filter_delayed_sample_map_.find(handle);
2389  if (data == filter_delayed_sample_map_.end()) {
2390  continue;
2391  }
2392 
2393  // Reschedule.
2394  data->second.expiration_time = now + interval;
2395  filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
2396 
2397  } else {
2398  // this check is performed to handle the corner case where
2399  // store_instance_data received and delivered a sample, while this
2400  // method was waiting for the lock
2401  if (MonotonicTimePoint::now() - instance->last_sample_tv_ >= interval) {
2402  // no new data to process, so remove from container
2403  filter_delayed_sample_map_.erase(data);
2404  }
2405  }
2406  }
2407 
2408  if (!filter_delayed_sample_queue_.empty()) {
2409  filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
2410  }
2411 }
2412 
2413 unique_ptr<DataAllocator>& data_allocator() { return data_allocator_; }
2414 
2416 
2417 InstanceMap instance_map_;
2418 ReverseInstanceMap reverse_instance_map_;
2419 
2421 
2423 #ifdef OPENDDS_HAS_STD_SHARED_PTR
2424 typedef std::shared_ptr<const OpenDDS::DCPS::DataSampleHeader> DataSampleHeader_ptr;
2425 #else
2427 #endif
2429  FilterDelayedSample(unique_ptr<MessageTypeWithAllocator> msg, DataSampleHeader_ptr hdr, bool new_inst)
2430  : message(move(msg))
2431  , header(hdr)
2432  , new_instance(new_inst)
2433  {}
2435  DataSampleHeader_ptr header;
2438 };
2439 typedef OPENDDS_MAP(DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap;
2440 FilterDelayedSampleMap filter_delayed_sample_map_;
2441 typedef OPENDDS_MULTIMAP(MonotonicTimePoint, DDS::InstanceHandle_t) FilterDelayedSampleQueue;
2442 FilterDelayedSampleQueue filter_delayed_sample_queue_;
2443 
2445 
2446 };
2447 
2448 template <typename MessageType>
2450 {
2452  MessageTypeMemoryBlock* block =
2453  static_cast<MessageTypeMemoryBlock*>(pool.malloc(sizeof(MessageTypeMemoryBlock)));
2454  block->allocator_ = &pool;
2455  return block;
2456 }
2457 
2458 template <typename MessageType>
2460 {
2461  if (memory) {
2462  MessageTypeMemoryBlock* block = static_cast<MessageTypeMemoryBlock*>(memory);
2463  block->allocator_->free(block);
2464  }
2465 }
2466 
2467 template <typename MessageType>
2469 {
2470  operator delete(memory);
2471 }
2472 
2473 }
2474 }
2475 
2477 
2478 #endif /* OPENDDS_DDS_DCPS_DATAREADERIMPL_T_H */
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
DataSampleHeader header_
The demarshalled sample header.
virtual DDS::ReturnCode_t take_next_sample(MessageType &received_data, DDS::SampleInfo &sample_info_ref)
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_DEBUG(X)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
virtual DDS::ReturnCode_t read_next_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release_loan(MessageSequenceType &received_data)
void set_instance_map(const char *type_name, const RcHandle< RcObject > &instance_map, DataReaderImpl *reader)
virtual void free(void *ptr)
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
const StatusKind SAMPLE_LOST_STATUS
const LogLevel::Value value
Definition: debug.cpp:61
DDS::InstanceHandle_t lookup_instance_generic(const void *data)
InstanceStateKind instance_state
virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
virtual DDS::ReturnCode_t enable_specific()
char message_id_
The enum MessageId.
FilterDelayedSampleMap filter_delayed_sample_map_
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94
::DDS::ReturnCode_t take_next_sample(inout<%SCOPED%> received_data, inout ::DDS::SampleInfo sample_info)
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
RcHandle< SharedInstanceMap > SharedInstanceMap_rch
bool dispose_was_received(const GUID_t &writer_id)
TimeBasedFilterQosPolicy time_based_filter
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
sequence< octet > key
sequence< SampleInfo > SampleInfoSeq
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
virtual DDS::ReturnCode_t read_next_sample(MessageType &received_data, DDS::SampleInfo &sample_info_ref)
ReverseInstanceMap reverse_instance_map_
virtual DDS::ReturnCode_t read_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:92
::DDS::InstanceHandle_t lookup_instance(in<%SCOPED%> instance_data)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
const char * c_str() const
virtual void push_back(const DDS::SampleInfo &info, const void *sample)=0
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
#define OPENDDS_MULTIMAP(K, T)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const char * _interface_repository_id() const
unsigned long InstanceStateMask
MarshalTraits< MessageType > MarshalTraitsType
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
bool unregister_was_received(const GUID_t &writer_id)
virtual DDS::ReturnCode_t take_next_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
SubscriptionInstance_rch si_
Definition: RakeData.h:27
static OPENDDS_STRING instance_state_mask_string(DDS::InstanceStateMask mask)
Return string representation of the instance state mask passed.
unique_ptr< ReceivedDataStrategy > rcvd_strategy_
ReceivedDataElementList strategy.
void accessed()
A read or take operation has been performed on this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
virtual void release_instance_i(DDS::InstanceHandle_t handle)
bool valid_data() const
Returns true if the sample has a complete serialized payload.
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
virtual DDS::ReturnCode_t return_loan(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq)
TraitsType::DataReaderType Interface
virtual DDS::ReturnCode_t take_next_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual DDS::ReturnCode_t get_key_value(MessageType &key_holder, DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
const StatusKind DATA_ON_READERS_STATUS
#define OPENDDS_STRING
SequenceNumber last_sequence_
Sequence number of the move recent data sample received.
void release_all_instances()
Release all instances held by the reader.
container_supported_unique_ptr< MessageTypeWithAllocator > message
virtual void reserve(CORBA::ULong size)=0
LM_DEBUG
virtual DDS::ReturnCode_t take_instance(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
const ViewStateKind NOT_NEW_VIEW_STATE
RcHandle< RcObject > get_instance_map(const char *type_name, DataReaderImpl *reader)
const char * instance_state_string() const
Return string of the name of the current instance state.
ReliabilityQosPolicyKind kind
ACE_CDR::ULong ULong
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
void data_was_received(const GUID_t &writer_id)
Data sample received for this instance.
virtual void state_updated_i(DDS::InstanceHandle_t handle)
InstanceHandle_t instance_handle
ACE_CDR::Boolean Boolean
virtual void lookup_instance(const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unsigned long InstanceStateKind
A fixed-size allocator that caches items for quicker access but if the pool is exhausted it will use ...
const StatusKind DATA_AVAILABLE_STATUS
void set_marshal_skip_serialize(bool value)
const InstanceStateMask ANY_INSTANCE_STATE
Implements the DDS::DataReader interface.
bool writes_instance(const GUID_t &writer_id) const
Returns true if the writer is a writer of this instance.
unique_ptr< DataAllocator > data_allocator_
const unsigned long DURATION_ZERO_NSEC
Definition: DdsDcpsCore.idl:76
CORBA::Boolean marshal(TAO_OutputCDR &)
const ViewStateMask ANY_VIEW_STATE
CORBA::Boolean _is_a(const char *type_id)
virtual DDS::ReturnCode_t read_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
ReceivedDataElementList * rdel_
Definition: RakeData.h:26
bool store_instance_data_check(unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
::DDS::ReturnCode_t read_next_sample(inout<%SCOPED%> received_data, inout ::DDS::SampleInfo sample_info)
FilterDelayedSample(unique_ptr< MessageTypeWithAllocator > msg, DataSampleHeader_ptr hdr, bool new_inst)
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
bool most_recent_generation(ReceivedDataElement *item) const
OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_Mutex > DataAllocator
LM_WARNING
virtual DDS::ReturnCode_t take(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const SampleStateKind READ_SAMPLE_STATE
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
Access instance state.
OpenDDS_Dcps_Export GUID_t make_part_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:216
void delay_sample(DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
DDS::BuiltinTopicKey_t keyFromSample(TopicType *sample)
unsigned long nanosec
const ReturnCode_t RETCODE_NO_DATA
ConstructionStatus get_construction_status() const
virtual DDS::ReturnCode_t auto_return_loan(void *seq)
void filter_delayed(const MonotonicTimePoint &now)
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDSTraits< MessageType > TraitsType
ReliabilityQosPolicy reliability
OpenDDS_Dcps_Export LogLevel log_level
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch instance, size_t index_in_instance)
bool remove(ReceivedDataElement *data_sample)
bool eval(const T &sample, const DDS::StringSeq &params) const
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const long DURATION_ZERO_SEC
Definition: DdsDcpsCore.idl:75
#define OPENDDS_MAP_CMP_T
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
virtual RcHandle< MessageHolder > dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)
const Encoding & encoding() const
Definition: Serializer.inl:199
const ReturnCode_t RETCODE_ERROR
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
TraitsType::MessageSequenceType MessageSequenceType
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
virtual DDS::ReturnCode_t read(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
::DDS::ReturnCode_t get_key_value(inout<%SCOPED%> key_holder, in ::DDS::InstanceHandle_t handle)
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
virtual DDS::ReturnCode_t read_next_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
const char * to_string(MessageId value)
DDS::InstanceHandle_t store_synthetic_data(const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
void clear_sample(DDS::InstanceHandle_t handle)
virtual DDS::ReturnCode_t take_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
DDS::ViewStateKind view_state() const
Access view state.
size_t disposed_generation_count() const
Access disposed generation count.
const long LENGTH_UNLIMITED
unsigned long ViewStateKind
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
ReceivedDataElement * rde_
Definition: RakeData.h:25
void cancel_release()
Cancel a scheduled or pending release of resources.
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleQueue filter_delayed_sample_queue_
virtual void qos_change(const DDS::DataReaderQos &qos)
DDS::DynamicData_ptr get_dynamic_data_adapter(DDS::DynamicType_ptr type, const T &value)
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
#define TheServiceParticipant
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
bool matches(CORBA::ULong sample_states) const
unique_ptr< DataAllocator > & data_allocator()
const InstanceState_rch instance_state_
Instance state for this instance.
virtual DDS::ReturnCode_t read_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void dispose_unregister(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
unsigned long ViewStateMask
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
const InstanceStateKind ALIVE_INSTANCE_STATE
virtual DDS::ReturnCode_t take_instance_w_condition(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
bool coherent_change_
Sample belongs to an active coherent change set.
Boolean is_nil(T x)
const StatusKind SAMPLE_REJECTED_STATUS
virtual DDS::ReturnCode_t take(AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
virtual void qos_change(const DDS::DataReaderQos &qos)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool has_non_key_fields(const TypeSupportImpl &ts) const
DDS::ReturnCode_t read_generic(GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
size_t no_writers_generation_count() const
Access no writers generation count.
DCPS::PmfSporadicTask< DataReaderImpl_T > DRISporadicTask
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
void drop_sample(DDS::InstanceHandle_t handle)
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)
void set_instance_state_i(DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &publication_id)
const ReturnCode_t RETCODE_BAD_PARAMETER
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153
bool contains_sample_filtered(DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)