OpenDDS  Snapshot(2023/04/28-20:55)
RakeResults_T.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_RAKERESULTS_T_H
7 #define OPENDDS_DCPS_RAKERESULTS_T_H
8 
9 #include <ace/config-macros.h>
10 #ifndef ACE_LACKS_PRAGMA_ONCE
11 # pragma once
12 #endif
13 
14 #include "Comparator_T.h"
15 #include "PoolAllocator.h"
16 #include "RakeData.h"
17 #include "TypeSupportImpl.h"
18 
19 #include <dds/DdsDcpsSubscriptionC.h>
20 
22 
23 namespace OpenDDS {
24 namespace DCPS {
25 
27 
28 /// Rake is an abbreviation for "read or take". This class manages the
29 /// results from a read() or take() operation, which are the received_data
30 /// and the info_seq sequences passed in by-reference from the user.
31 template <class MessageType>
32 class RakeResults {
33 
36 
37 public:
39  SampleSeq& received_data,
42  DDS::PresentationQosPolicy presentation,
43 #ifndef OPENDDS_NO_QUERY_CONDITION
44  DDS::QueryCondition_ptr cond,
45 #endif
46  Operation_t oper);
47 
48  /// Returns false if the sample will definitely not be part of the
49  /// resulting dataset, however if this returns true it still may be
50  /// excluded (due to sorting and max_samples).
53  SubscriptionInstance_rch instance,
54  size_t index_in_instance);
55 
56  bool copy_to_user();
57 
58 private:
59  template <class FwdIter>
60  bool copy_into(FwdIter begin, FwdIter end,
61  MessageSequenceAdapterType& received_data_p);
62 
63  RakeResults(const RakeResults&); // no copy construction
64  RakeResults& operator=(const RakeResults&); // no assignment
65 
67  SampleSeq& received_data_;
70 #ifndef OPENDDS_NO_QUERY_CONDITION
71  DDS::QueryCondition_ptr cond_;
72 #endif
74 
75  class SortedSetCmp {
76  public:
77  bool operator()(const RakeData& lhs, const RakeData& rhs) const {
78  if (!cmp_.in()) {
79  // The following assumes that if no comparator is set
80  // then PRESENTATION ordered access applies (TOPIC).
81  return lhs.rde_->source_timestamp_ < rhs.rde_->source_timestamp_;
82  }
83 
84  return cmp_->compare(lhs.rde_->registered_data_,
85  rhs.rde_->registered_data_);
86  }
87 
89  explicit SortedSetCmp(ComparatorBase::Ptr cmp) : cmp_(cmp) {}
90 
91  private:
93  };
94 
96  typedef OPENDDS_MULTISET_CMP(RakeData, SortedSetCmp) SortedSet;
97 
98  // Contains data for QueryCondition/Ordered access
99  SortedSet sorted_;
100 
101  // Contains data for all other use cases
102  OPENDDS_VECTOR(RakeData) unsorted_;
103 
104  // data structures used by copy_into()
105  typedef OPENDDS_VECTOR(CORBA::ULong) IndexList;
106  struct InstanceData {
108  size_t MRSIC_index_;
110  CORBA::Long MRSIC_disposed_gc_, MRSIC_nowriters_gc_,
111  MRS_disposed_gc_, MRS_nowriters_gc_;
112  InstanceData() : most_recent_generation_(false), MRSIC_index_(0),
113  MRSIC_disposed_gc_(0), MRSIC_nowriters_gc_(0), MRS_disposed_gc_(0),
114  MRS_nowriters_gc_(0) {}
115  };
116 };
117 
118 } // namespace DCPS
119 } // namespace OpenDDS
120 
122 
123 #if defined (ACE_TEMPLATES_REQUIRE_SOURCE)
124 #include "RakeResults_T.cpp"
125 #endif /* ACE_TEMPLATES_REQUIRE_SOURCE */
126 
127 #endif /* RAKERESULTS_H */
ACE_CDR::Long Long
DDSTraits< MessageType >::MessageSequenceAdapterType MessageSequenceAdapterType
Definition: RakeResults_T.h:35
DDS::Time_t source_timestamp_
Source time stamp for this data sample.
sequence< SampleInfo > SampleInfoSeq
bool operator()(const RakeData &lhs, const RakeData &rhs) const
Definition: RakeResults_T.h:77
DDSTraits< MessageType >::MessageSequenceType SampleSeq
Definition: RakeResults_T.h:34
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
RakeResults(DataReaderImpl *reader, SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::PresentationQosPolicy presentation, DDS::QueryCondition_ptr cond, Operation_t oper)
ACE_CDR::ULong ULong
DDS::SampleInfoSeq & info_seq_
Definition: RakeResults_T.h:68
DataReaderImpl * reader_
Definition: RakeResults_T.h:66
SortedSetCmp(ComparatorBase::Ptr cmp)
Definition: RakeResults_T.h:89
Implements the DDS::DataReader interface.
bool compare(void *lhs, void *rhs) const
Definition: Comparator_T.h:37
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch instance, size_t index_in_instance)
typedef OPENDDS_MULTISET_CMP(RakeData, SortedSetCmp) SortedSet
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
RakeResults & operator=(const RakeResults &)
ReceivedDataElement * rde_
Definition: RakeData.h:25
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
bool copy_into(FwdIter begin, FwdIter end, MessageSequenceAdapterType &received_data_p)
OPENDDS_VECTOR(RakeData) unsorted_
DDS::QueryCondition_ptr cond_
Definition: RakeResults_T.h:71