OpenDDS  Snapshot(2023/04/28-20:55)
MultiTopicDataReader_T.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
9 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
10 
11 #ifndef OPENDDS_NO_MULTI_TOPIC
12 
14 
15 #if !defined (ACE_LACKS_PRAGMA_ONCE)
16 #pragma once
17 #endif /* ACE_LACKS_PRAGMA_ONCE */
18 
20 
21 namespace OpenDDS {
22 namespace DCPS {
23 
24 template<typename Sample, typename TypedDataReader>
26  : public virtual LocalObject<typename TypedDataReader::Interface>
27  , public virtual MultiTopicDataReaderBase {
28 public:
30 
32 
33  void init_typed(DataReaderEx* dr);
35  void incoming_sample(void* sample, const DDS::SampleInfo& info,
36  const char* topic, const MetaStruct& meta);
37 
38  DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
41 
42  DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
43  CORBA::Long max_samples, DDS::SampleStateMask sample_states,
44  DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
45 
46  DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
47  DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
48  DDS::ReadCondition_ptr a_condition);
49 
50  DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
51  DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
52  DDS::ReadCondition_ptr a_condition);
53 
55  DDS::SampleInfo& sample_info);
56 
58  DDS::SampleInfo& sample_info);
59 
60  DDS::ReturnCode_t read_instance(SampleSeq& received_data,
61  DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
62  DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
63  DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
64 
65  DDS::ReturnCode_t take_instance(SampleSeq& received_data,
66  DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
67  DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
68  DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
69 
70  DDS::ReturnCode_t read_instance_w_condition(SampleSeq& data_values,
71  DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
72  DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
73 
74  DDS::ReturnCode_t take_instance_w_condition(SampleSeq& data_values,
75  DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
76  DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
77 
78  DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
79  DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
80  DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
81  DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
82 
83  DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
84  DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
85  DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
86  DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
87 
89  DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
90  DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
91 
93  DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
94  DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
95 
96  DDS::ReturnCode_t return_loan(SampleSeq& received_data,
97  DDS::SampleInfoSeq& info_seq);
98 
100  DDS::InstanceHandle_t handle);
101 
102  DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
103 
104 private:
105 
106  struct SampleWithInfo {
107  SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
108  : sample_()
109  , view_(sampinfo.view_state)
110  {
111  info_[topic] = sampinfo.instance_handle;
112  }
113 
114  void combine(const SampleWithInfo& other)
115  {
116  info_.insert(other.info_.begin(), other.info_.end());
118  }
119 
122  OPENDDS_MAP(OPENDDS_STRING/*topicName*/, DDS::InstanceHandle_t) info_;
123  };
124 
125  typedef std::vector<SampleWithInfo> SampleVec;
126  typedef std::set<OPENDDS_STRING> TopicSet;
127 
128  // Given a QueryPlan that describes how to treat 'incoming' data from a
129  // certain topic (with MetaStruct 'meta'), assign its relevant fields to
130  // the corresponding fields of 'resulting'.
131  void assign_fields(Sample& resulting, void* incoming, const QueryPlan& qp,
132  const MetaStruct& meta);
133 
134  // Process all joins (recursively) in the QueryPlan 'qp'.
135  DDS::ReturnCode_t process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partial_results,
136  SampleVec starting, const TopicSet& seen,
137  const QueryPlan& qp);
138 
139  // Starting with a 'prototype' sample, fill a 'resulting' vector with all
140  // data from 'other_dr' (with MetaStruct 'other_meta') such that all key
141  // fields named in 'key_names' match the values in 'key_data'. The struct
142  // pointed-to by 'key_data' is of the type used by the 'other_dr'.
143  bool join(SampleVec& resulting, const SampleWithInfo& prototype,
144  const std::vector<OPENDDS_STRING>& key_names,
145  const void* key_data, DDS::DataReader_ptr other_dr,
146  const MetaStruct& other_meta);
147 
148  // When no common keys are found, natural join devolves to a cross-join where
149  // each instance in the joined-to-topic (qp) is combined with the results so
150  // far (partialResults).
151  DDS::ReturnCode_t cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partial_results,
152  const TopicSet& seen, const QueryPlan& qp);
153 
154  // Combine two vectors of data, 'resulting' and 'other', with the results of
155  // the combination going into 'resulting'. Use the keys in 'key_names' to
156  // determine which elements to combine, and the topic names in 'other_topics'
157  // to determine which fields from 'other' should be assigned to 'resulting'.
158  void combine(SampleVec& resulting, const SampleVec& other,
159  const std::vector<OPENDDS_STRING>& key_names,
160  const TopicSet& other_topics);
161 
162  // Helper for combine(), similar to assign_fields but instead of coming from
163  // a differently-typed struct in a void*, the data comes from an existing
164  // Sample, 'source'. Each field projected from any of the topics in
165  // 'other_topics' is copied from 'source' to 'target'.
166  void assign_resulting_fields(Sample& target, const Sample& source,
167  const TopicSet& other_topics);
168 
169  struct GenericData {
170  explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
171  : meta_(meta), ptr_(doAlloc ? meta.allocate() : 0) {}
172  ~GenericData() { meta_.deallocate(ptr_); }
173 
175  void* ptr_;
176  };
177 
178  struct Contains { // predicate for std::find_if()
179  explicit Contains(const OPENDDS_STRING& s) : look_for_topic_(s) {}
180  bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e) const
181  {
182  return e.first.count(look_for_topic_);
183  }
184 
186  };
187 
188  typename TypedDataReader::Interface::_var_type typed_reader_;
189 };
190 
191 }
192 }
193 
195 
196 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
198 #endif
199 
200 #endif
201 #endif
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
OPENDDS_MAP(OPENDDS_STRING, DDS::InstanceHandle_t) info_
ACE_CDR::Long Long
void incoming_sample(void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t read(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void assign_fields(Sample &resulting, void *incoming, const QueryPlan &qp, const MetaStruct &meta)
std::vector< SampleWithInfo > SampleVec
sequence< SampleInfo > SampleInfoSeq
DDS::ReturnCode_t take_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t take_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t take(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
TypedDataReader::Interface::_var_type typed_reader_
DDS::ReturnCode_t read_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
unsigned long InstanceStateMask
DDS::ReturnCode_t read_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool join(SampleVec &resulting, const SampleWithInfo &prototype, const std::vector< OPENDDS_STRING > &key_names, const void *key_data, DDS::DataReader_ptr other_dr, const MetaStruct &other_meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t get_key_value(Sample &key_holder, DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
void assign_resulting_fields(Sample &target, const Sample &source, const TopicSet &other_topics)
DDS::ReturnCode_t read_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
InstanceHandle_t instance_handle
DDS::ReturnCode_t process_joins(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, SampleVec starting, const TopicSet &seen, const QueryPlan &qp)
const ViewStateKind NEW_VIEW_STATE
DDS::ReturnCode_t read_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
DDS::ReturnCode_t return_loan(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq)
SampleWithInfo(const OPENDDS_STRING &topic, const DDS::SampleInfo &sampinfo)
unsigned long SampleStateMask
DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t take_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
GenericData(const MetaStruct &meta, bool doAlloc=true)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::InstanceHandle_t lookup_instance(const Sample &instance_data)
TAO::DCPS::ZeroCopyDataSeq< Sample > SampleSeq
unsigned long ViewStateKind
DDS::ReturnCode_t read_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t take_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
bool operator()(const std::pair< const std::set< OPENDDS_STRING >, SampleVec > &e) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
DDS::ReturnCode_t take_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t cross_join(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, const TopicSet &seen, const QueryPlan &qp)