Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
9 : #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
10 :
11 : #ifndef OPENDDS_NO_MULTI_TOPIC
12 :
13 : #include "MultiTopicDataReaderBase.h"
14 :
15 : #if !defined (ACE_LACKS_PRAGMA_ONCE)
16 : #pragma once
17 : #endif /* ACE_LACKS_PRAGMA_ONCE */
18 :
19 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
20 :
21 : namespace OpenDDS {
22 : namespace DCPS {
23 :
24 : template<typename Sample, typename TypedDataReader>
25 : class MultiTopicDataReader_T
26 : : public virtual LocalObject<typename TypedDataReader::Interface>
27 : , public virtual MultiTopicDataReaderBase {
28 : public:
29 : typedef TAO::DCPS::ZeroCopyDataSeq<Sample> SampleSeq;
30 :
31 0 : MultiTopicDataReader_T() {}
32 :
33 : void init_typed(DataReaderEx* dr);
34 : const MetaStruct& getResultingMeta();
35 : void incoming_sample(void* sample, const DDS::SampleInfo& info,
36 : const char* topic, const MetaStruct& meta);
37 :
38 : DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
39 : CORBA::Long max_samples, DDS::SampleStateMask sample_states,
40 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
41 :
42 : DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
43 : CORBA::Long max_samples, DDS::SampleStateMask sample_states,
44 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
45 :
46 : DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
47 : DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
48 : DDS::ReadCondition_ptr a_condition);
49 :
50 : DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
51 : DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
52 : DDS::ReadCondition_ptr a_condition);
53 :
54 : DDS::ReturnCode_t read_next_sample(Sample& received_data,
55 : DDS::SampleInfo& sample_info);
56 :
57 : DDS::ReturnCode_t take_next_sample(Sample& received_data,
58 : DDS::SampleInfo& sample_info);
59 :
60 : DDS::ReturnCode_t read_instance(SampleSeq& received_data,
61 : DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
62 : DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
63 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
64 :
65 : DDS::ReturnCode_t take_instance(SampleSeq& received_data,
66 : DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
67 : DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
68 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
69 :
70 : DDS::ReturnCode_t read_instance_w_condition(SampleSeq& data_values,
71 : DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
72 : DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
73 :
74 : DDS::ReturnCode_t take_instance_w_condition(SampleSeq& data_values,
75 : DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
76 : DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
77 :
78 : DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
79 : DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
80 : DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
81 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
82 :
83 : DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
84 : DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
85 : DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
86 : DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
87 :
88 : DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq& data_values,
89 : DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
90 : DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
91 :
92 : DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq& data_values,
93 : DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
94 : DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
95 :
96 : DDS::ReturnCode_t return_loan(SampleSeq& received_data,
97 : DDS::SampleInfoSeq& info_seq);
98 :
99 : DDS::ReturnCode_t get_key_value(Sample& key_holder,
100 : DDS::InstanceHandle_t handle);
101 :
102 : DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
103 :
104 : private:
105 :
106 : struct SampleWithInfo {
107 0 : SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
108 0 : : sample_()
109 0 : , view_(sampinfo.view_state)
110 : {
111 0 : info_[topic] = sampinfo.instance_handle;
112 0 : }
113 :
114 0 : void combine(const SampleWithInfo& other)
115 : {
116 0 : info_.insert(other.info_.begin(), other.info_.end());
117 0 : if (other.view_ == DDS::NEW_VIEW_STATE) view_ = DDS::NEW_VIEW_STATE;
118 0 : }
119 :
120 : Sample sample_;
121 : DDS::ViewStateKind view_;
122 : OPENDDS_MAP(OPENDDS_STRING/*topicName*/, DDS::InstanceHandle_t) info_;
123 : };
124 :
125 : typedef std::vector<SampleWithInfo> SampleVec;
126 : typedef std::set<OPENDDS_STRING> TopicSet;
127 :
128 : // Given a QueryPlan that describes how to treat 'incoming' data from a
129 : // certain topic (with MetaStruct 'meta'), assign its relevant fields to
130 : // the corresponding fields of 'resulting'.
131 : void assign_fields(Sample& resulting, void* incoming, const QueryPlan& qp,
132 : const MetaStruct& meta);
133 :
134 : // Process all joins (recursively) in the QueryPlan 'qp'.
135 : DDS::ReturnCode_t process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partial_results,
136 : SampleVec starting, const TopicSet& seen,
137 : const QueryPlan& qp);
138 :
139 : // Starting with a 'prototype' sample, fill a 'resulting' vector with all
140 : // data from 'other_dr' (with MetaStruct 'other_meta') such that all key
141 : // fields named in 'key_names' match the values in 'key_data'. The struct
142 : // pointed-to by 'key_data' is of the type used by the 'other_dr'.
143 : bool join(SampleVec& resulting, const SampleWithInfo& prototype,
144 : const std::vector<OPENDDS_STRING>& key_names,
145 : const void* key_data, DDS::DataReader_ptr other_dr,
146 : const MetaStruct& other_meta);
147 :
148 : // When no common keys are found, natural join devolves to a cross-join where
149 : // each instance in the joined-to-topic (qp) is combined with the results so
150 : // far (partialResults).
151 : DDS::ReturnCode_t cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partial_results,
152 : const TopicSet& seen, const QueryPlan& qp);
153 :
154 : // Combine two vectors of data, 'resulting' and 'other', with the results of
155 : // the combination going into 'resulting'. Use the keys in 'key_names' to
156 : // determine which elements to combine, and the topic names in 'other_topics'
157 : // to determine which fields from 'other' should be assigned to 'resulting'.
158 : void combine(SampleVec& resulting, const SampleVec& other,
159 : const std::vector<OPENDDS_STRING>& key_names,
160 : const TopicSet& other_topics);
161 :
162 : // Helper for combine(), similar to assign_fields but instead of coming from
163 : // a differently-typed struct in a void*, the data comes from an existing
164 : // Sample, 'source'. Each field projected from any of the topics in
165 : // 'other_topics' is copied from 'source' to 'target'.
166 : void assign_resulting_fields(Sample& target, const Sample& source,
167 : const TopicSet& other_topics);
168 :
169 : struct GenericData {
170 0 : explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
171 0 : : meta_(meta), ptr_(doAlloc ? meta.allocate() : 0) {}
172 0 : ~GenericData() { meta_.deallocate(ptr_); }
173 :
174 : const MetaStruct& meta_;
175 : void* ptr_;
176 : };
177 :
178 : struct Contains { // predicate for std::find_if()
179 0 : explicit Contains(const OPENDDS_STRING& s) : look_for_topic_(s) {}
180 0 : bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e) const
181 : {
182 0 : return e.first.count(look_for_topic_);
183 : }
184 :
185 : const OPENDDS_STRING& look_for_topic_;
186 : };
187 :
188 : typename TypedDataReader::Interface::_var_type typed_reader_;
189 : };
190 :
191 : }
192 : }
193 :
194 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
195 :
196 : #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
197 : #include "MultiTopicDataReader_T.cpp"
198 : #endif
199 :
200 : #endif
201 : #endif
|