Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
9 : #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
10 :
11 : #ifndef OPENDDS_NO_MULTI_TOPIC
12 :
13 : #include <stdexcept>
14 : #include <sstream>
15 :
16 :
17 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
18 :
19 : namespace OpenDDS {
20 : namespace DCPS {
21 :
22 : template<typename Sample, typename TypedDataReader>
23 : void
24 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::init_typed(DataReaderEx* dr)
25 : {
26 0 : typed_reader_ = TypedDataReader::Interface::_narrow(dr);
27 0 : }
28 :
29 : template<typename Sample, typename TypedDataReader>
30 : const MetaStruct&
31 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::getResultingMeta()
32 : {
33 0 : return getMetaStruct<Sample>();
34 : }
35 :
36 : template<typename Sample, typename TypedDataReader>
37 : void
38 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::assign_fields(Sample& resulting,
39 : void* incoming, const MultiTopicDataReaderBase::QueryPlan& qp, const MetaStruct& meta)
40 : {
41 : using namespace std;
42 0 : const vector<SubjectFieldSpec>& proj = qp.projection_;
43 0 : const MetaStruct& resulting_meta = getResultingMeta();
44 :
45 : typedef vector<SubjectFieldSpec>::const_iterator iter_t;
46 0 : for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
47 0 : resulting_meta.assign(&resulting, iter->resulting_name_.c_str(),
48 0 : incoming, iter->incoming_name_.c_str(), meta);
49 : }
50 :
51 0 : const vector<OPENDDS_STRING>& proj_out = qp.keys_projected_out_;
52 0 : for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
53 0 : iter != proj_out.end(); ++iter) {
54 0 : resulting_meta.assign(&resulting, iter->c_str(),
55 : incoming, iter->c_str(), meta);
56 : }
57 0 : }
58 :
59 : template<typename Sample, typename TypedDataReader>
60 : void
61 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::assign_resulting_fields(
62 : Sample& target, const Sample& source, const TopicSet& other_topics)
63 : {
64 : using namespace std;
65 0 : const MetaStruct& resulting_meta = getResultingMeta();
66 :
67 0 : for (TopicSet::const_iterator iterTopic = other_topics.begin();
68 0 : iterTopic != other_topics.end(); ++iterTopic) {
69 0 : const vector<SubjectFieldSpec>& proj = query_plans_[*iterTopic].projection_;
70 : typedef vector<SubjectFieldSpec>::const_iterator iter_t;
71 0 : for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
72 0 : resulting_meta.assign(&target, iter->resulting_name_.c_str(),
73 0 : &source, iter->resulting_name_.c_str(), resulting_meta);
74 : }
75 : }
76 0 : }
77 :
78 : template<typename Sample, typename TypedDataReader>
79 : bool
80 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::join(
81 : SampleVec& resulting, const SampleWithInfo& prototype,
82 : const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
83 : DDS::DataReader_ptr other_dr, const MetaStruct& other_meta)
84 : {
85 : using namespace DDS;
86 0 : DataReaderImpl* other_dri = dynamic_cast<DataReaderImpl*>(other_dr);
87 0 : if (!other_dri) {
88 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReader_T::join: ")
89 : ACE_TEXT("Failed to get DataReaderImpl.\n")));
90 0 : return false;
91 : }
92 :
93 0 : TopicDescription_var other_td = other_dri->get_topicdescription();
94 0 : CORBA::String_var other_topic = other_td->get_name();
95 0 : const QueryPlan& other_qp = query_plans_[other_topic.in()];
96 0 : const size_t n_keys = key_names.size();
97 :
98 0 : if (n_keys > 0 && other_meta.numDcpsKeys() == n_keys) { // complete key
99 0 : InstanceHandle_t ih = other_dri->lookup_instance_generic(key_data);
100 0 : if (ih != HANDLE_NIL) {
101 0 : GenericData other_data(other_meta, false);
102 : SampleInfo info;
103 0 : const ReturnCode_t ret = other_dri->read_instance_generic(other_data.ptr_,
104 : info, ih, READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
105 0 : if (ret != DDS::RETCODE_OK && ret != DDS::RETCODE_NO_DATA) {
106 0 : if (log_level >= LogLevel::Notice) {
107 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: MultiTopicDataReader_T::join: read_instance_generic"
108 : " for topic %C returns %C\n", other_topic.in(), retcode_to_string(ret)));
109 : }
110 0 : return false;
111 : }
112 0 : if (ret == DDS::RETCODE_NO_DATA || !info.valid_data) {
113 0 : return false;
114 : }
115 :
116 0 : resulting.push_back(prototype);
117 0 : resulting.back().combine(SampleWithInfo(other_topic.in(), info));
118 0 : assign_fields(resulting.back().sample_, other_data.ptr_, other_qp, other_meta);
119 0 : }
120 : } else { // incomplete key or cross-join (0 key fields)
121 0 : ReturnCode_t ret = RETCODE_OK;
122 0 : for (InstanceHandle_t ih = HANDLE_NIL; ret != RETCODE_NO_DATA;) {
123 0 : GenericData other_data(other_meta, false);
124 : SampleInfo info;
125 0 : const ReturnCode_t ret = other_dri->read_next_instance_generic(other_data.ptr_,
126 : info, ih, READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
127 0 : if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
128 0 : if (log_level >= LogLevel::Notice) {
129 0 : ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: MultiTopicDataReader_T::join:"
130 : " read_next_instance_generic for topic %C returns %C\n",
131 : other_topic.in(), retcode_to_string(ret)));
132 : }
133 0 : return false;
134 : }
135 0 : if (ret == RETCODE_NO_DATA || !info.valid_data) {
136 : break;
137 : }
138 0 : ih = info.instance_handle;
139 :
140 0 : bool match = true;
141 0 : for (size_t i = 0; match && i < key_names.size(); ++i) {
142 0 : if (!other_meta.compare(key_data, other_data.ptr_, key_names[i].c_str())) {
143 0 : match = false;
144 : }
145 : }
146 :
147 0 : if (match) {
148 0 : resulting.push_back(prototype);
149 0 : resulting.back().combine(SampleWithInfo(other_topic.in(), info));
150 0 : assign_fields(resulting.back().sample_, other_data.ptr_, other_qp, other_meta);
151 : }
152 : }
153 : }
154 0 : return true;
155 0 : }
156 :
157 : template<typename Sample, typename TypedDataReader>
158 : void
159 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::combine(
160 : SampleVec& resulting, const SampleVec& other,
161 : const std::vector<OPENDDS_STRING>& key_names, const TopicSet& other_topics)
162 : {
163 0 : const MetaStruct& meta = getResultingMeta();
164 0 : SampleVec new_data;
165 0 : for (typename SampleVec::iterator it_res = resulting.begin();
166 0 : it_res != resulting.end(); /*incremented in loop*/) {
167 0 : bool found_one_match = false;
168 0 : for (typename SampleVec::const_iterator it_other = other.begin();
169 0 : it_other != other.end(); ++it_other) {
170 0 : bool match = true;
171 0 : for (size_t i = 0; match && i < key_names.size(); ++i) {
172 0 : if (!meta.compare(&*it_res, &*it_other, key_names[i].c_str())) {
173 0 : match = false;
174 : }
175 : }
176 0 : if (!match) {
177 0 : continue;
178 : }
179 0 : if (found_one_match) {
180 0 : new_data.push_back(*it_res);
181 0 : new_data.back().combine(*it_other);
182 0 : assign_resulting_fields(new_data.back().sample_, it_other->sample_, other_topics);
183 : } else {
184 0 : found_one_match = true;
185 0 : it_res->combine(*it_other);
186 0 : assign_resulting_fields(it_res->sample_, it_other->sample_, other_topics);
187 : }
188 : }
189 0 : if (found_one_match) {
190 0 : ++it_res;
191 : } else {
192 : // no match found in 'other' so data must not appear in result set
193 0 : it_res = resulting.erase(it_res);
194 : }
195 : }
196 0 : resulting.insert(resulting.end(), new_data.begin(), new_data.end());
197 0 : }
198 :
199 : /**
200 : * Two constituent topics are joinable directly if they have some common join keys,
201 : * or indirectly via a third topic which has common join keys with each of them.
202 : * The constituent topics form one or more groups of connected topics. In each groups,
203 : * topics are connected. But any two groups are not connected (otherwise, they would
204 : * have been a single group). And so groups are cross-joined with each other.
205 : * Within a group, topics are joined using process_joins(). Starting from an incoming
206 : * sample of a given topic, it computes partial resulting samples from the samples of
207 : * the constituent topics in the same group that are already received, in a DFS manner.
208 : * @partial_results contains entries for sets of topics, each corresponding to a path
209 : * of topics which have been visited, starting from the incoming topic.
210 : * For example, if the graph of visited topics looks like this
211 : * A (incoming topic)
212 : * / \
213 : * B C
214 : * / / | \
215 : * D E F G
216 : * / . . .
217 : * H . . .
218 : * . . . .
219 : * then @a partial_results contains entries for topic sets: {A,B,D,H}, {A,C,E},
220 : * {A,C,F}, {A,C,G}.
221 : * When traverse a path, if the next adjacent topic appears on a different path, i.e.,
222 : * there is an entry corresponding to that path in @a partial_results that has it in
223 : * its topic set, then the two entries are combined into a new entry which contains
224 : * topics from both of them. For example, if we are traversing path (A,C,G) and
225 : * the next adjacent topic to G is B, which appears in path (A,B,D,H), then entries
226 : * keyed by (A,C,G) and (A,B,D,H) are combined into a single entry with key
227 : * (A,B,C,D,G,H). Entries (A,C,G) and (A,B,D,H) are removed from @a partial_results.
228 : */
229 : template<typename Sample, typename TypedDataReader>
230 : DDS::ReturnCode_t
231 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::process_joins(
232 : std::map<TopicSet, SampleVec>& partial_results, SampleVec starting,
233 : const TopicSet& seen, const QueryPlan& qp)
234 : {
235 : using namespace std;
236 0 : const MetaStruct& resulting_meta = getResultingMeta();
237 0 : OPENDDS_STRING this_topic;
238 : {
239 0 : ACE_GUARD_RETURN(ACE_RW_Thread_Mutex, read_guard, qp_lock_, DDS::RETCODE_OUT_OF_RESOURCES);
240 0 : this_topic = topicNameFor(qp.data_reader_);
241 0 : }
242 : typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
243 0 : for (iter_t iter = qp.adjacent_joins_.begin(); iter != qp.adjacent_joins_.end();) {
244 : // for each topic we're joining
245 0 : const OPENDDS_STRING& other_topic = iter->first;
246 0 : iter_t range_end = qp.adjacent_joins_.upper_bound(other_topic);
247 0 : const QueryPlan& other_qp = query_plans_[other_topic];
248 0 : DDS::DataReader_ptr other_dr = other_qp.data_reader_;
249 0 : const MetaStruct& other_meta = metaStructFor(other_dr);
250 :
251 0 : vector<OPENDDS_STRING> keys;
252 0 : for (; iter != range_end; ++iter) { // for each key in common w/ this topic
253 0 : keys.push_back(iter->second);
254 : }
255 :
256 : typename std::map<TopicSet, SampleVec>::iterator found =
257 0 : find_if(partial_results.begin(), partial_results.end(), Contains(other_topic));
258 :
259 0 : if (found == partial_results.end()) { // haven't seen this topic yet
260 0 : partial_results.erase(seen);
261 0 : TopicSet with_join(seen);
262 0 : with_join.insert(other_topic);
263 0 : SampleVec& join_result = partial_results[with_join];
264 0 : for (size_t i = 0; i < starting.size(); ++i) {
265 0 : GenericData other_keys(other_meta);
266 0 : for (size_t j = 0; j < keys.size(); ++j) {
267 0 : other_meta.assign(other_keys.ptr_, keys[j].c_str(),
268 0 : &starting[i], keys[j].c_str(), resulting_meta);
269 : }
270 0 : if (!join(join_result, starting[i], keys, other_keys.ptr_, other_dr, other_meta)) {
271 0 : return DDS::RETCODE_ERROR;
272 : }
273 : }
274 :
275 0 : if (!join_result.empty() && !seen.count(other_topic)) {
276 : // Recursively join with topics that are adjacent to other_topic.
277 0 : const DDS::ReturnCode_t ret = process_joins(partial_results, join_result,
278 : with_join, other_qp);
279 0 : if (ret != DDS::RETCODE_OK) {
280 0 : return ret;
281 : }
282 : }
283 0 : } else if (!found->first.count(this_topic) /*avoid looping back*/) {
284 : // We have partialResults for this topic, use them instead of recursing.
285 : // Combine the partial samples for the TopicSet seen and found->first.
286 : // Store the result into a new entry keyed with all topics in seen and found->first.
287 : // The existing two entries are removed since they are not needed anymore.
288 0 : combine(starting, found->second, keys, found->first);
289 0 : TopicSet new_topics(seen);
290 0 : for (set<OPENDDS_STRING>::const_iterator it = found->first.begin(); it != found->first.end(); ++it) {
291 0 : new_topics.insert(*it);
292 : }
293 :
294 0 : partial_results.erase(found);
295 0 : partial_results.erase(seen);
296 0 : partial_results[new_topics] = starting;
297 0 : }
298 : }
299 0 : return DDS::RETCODE_OK;
300 0 : }
301 :
302 : template<typename Sample, typename TypedDataReader>
303 : DDS::ReturnCode_t
304 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::cross_join(
305 : std::map<TopicSet, SampleVec>& partial_results, const TopicSet& seen,
306 : const QueryPlan& qp)
307 : {
308 : using namespace std;
309 0 : const MetaStruct& other_meta = metaStructFor(qp.data_reader_);
310 0 : vector<OPENDDS_STRING> no_keys;
311 0 : for (typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
312 0 : it_pr != partial_results.end(); ++it_pr) {
313 0 : SampleVec resulting;
314 0 : for (typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
315 0 : if (!join(resulting, *i, no_keys, 0, qp.data_reader_, other_meta)) {
316 0 : return DDS::RETCODE_ERROR;
317 : }
318 : }
319 0 : resulting.swap(it_pr->second);
320 : }
321 :
322 0 : TopicSet with_join(seen);
323 0 : with_join.insert(topicNameFor(qp.data_reader_));
324 0 : partial_results[with_join].swap(partial_results[seen]);
325 0 : partial_results.erase(seen);
326 0 : const DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[with_join],
327 : with_join, qp);
328 0 : if (ret != DDS::RETCODE_OK) {
329 0 : partial_results.erase(with_join);
330 0 : return ret;
331 : }
332 :
333 0 : return DDS::RETCODE_OK;
334 0 : }
335 :
336 : template<typename Sample, typename TypedDataReader>
337 : void
338 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::incoming_sample(void* sample,
339 : const DDS::SampleInfo& info, const char* topic, const MetaStruct& meta)
340 : {
341 : using namespace std;
342 : using namespace DDS;
343 0 : const QueryPlan& qp = query_plans_[topic];
344 :
345 : // Track results of joins along multiple paths through the MultiTopic keys.
346 0 : std::map<TopicSet, SampleVec> partial_results;
347 0 : TopicSet seen;
348 0 : seen.insert(topic);
349 0 : partial_results[seen].push_back(SampleWithInfo(topic, info));
350 0 : assign_fields(partial_results[seen].back().sample_, sample, qp, meta);
351 :
352 0 : DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[seen], seen, qp);
353 0 : if (ret != DDS::RETCODE_OK) {
354 0 : return;
355 : }
356 :
357 : // Any topic we haven't seen needs to be cross-joined
358 0 : for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
359 0 : iter != query_plans_.end(); ++iter) {
360 : typename std::map<TopicSet, SampleVec>::iterator found =
361 0 : find_if(partial_results.begin(), partial_results.end(), Contains(iter->first));
362 0 : if (found == partial_results.end()) {
363 0 : ret = cross_join(partial_results, seen, iter->second);
364 0 : if (ret != DDS::RETCODE_OK) {
365 0 : return;
366 : }
367 : }
368 : }
369 :
370 0 : TypedDataReader* tdr = dynamic_cast<TypedDataReader*>(typed_reader_.in());
371 0 : if (!tdr) {
372 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReader_T::incoming_sample:")
373 : ACE_TEXT(" Failed to get TypedDataReader.\n")));
374 0 : return;
375 : }
376 :
377 0 : for (typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
378 0 : it_pr != partial_results.end(); ++it_pr) {
379 0 : for (typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
380 0 : InstanceHandle_t ih = tdr->store_synthetic_data(i->sample_, i->view_);
381 0 : if (ih != HANDLE_NIL) {
382 : typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
383 0 : for (mapiter_t it_map = i->info_.begin(); it_map != i->info_.end(); ++it_map) {
384 0 : query_plans_[it_map->first].instances_.insert(make_pair(it_map->second, ih));
385 : }
386 : }
387 : }
388 : }
389 0 : }
390 :
391 : // The following methods implement the FooDataReader API by delegating
392 : // to the typed_reader_.
393 :
394 : template<typename Sample, typename TypedDataReader>
395 : DDS::ReturnCode_t
396 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read(SampleSeq& received_data,
397 : DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
398 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
399 : DDS::InstanceStateMask instance_states)
400 : {
401 0 : return typed_reader_->read(received_data, info_seq, max_samples,
402 0 : sample_states, view_states, instance_states);
403 : }
404 :
405 : template<typename Sample, typename TypedDataReader>
406 : DDS::ReturnCode_t
407 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take(SampleSeq& received_data,
408 : DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
409 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
410 : DDS::InstanceStateMask instance_states)
411 : {
412 0 : return typed_reader_->take(received_data, info_seq, max_samples,
413 0 : sample_states, view_states, instance_states);
414 : }
415 :
416 : template<typename Sample, typename TypedDataReader>
417 : DDS::ReturnCode_t
418 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_w_condition(
419 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
420 : CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
421 : {
422 0 : return typed_reader_->read_w_condition(data_values, sample_infos,
423 0 : max_samples, a_condition);
424 : }
425 :
426 : template<typename Sample, typename TypedDataReader>
427 : DDS::ReturnCode_t
428 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_w_condition(
429 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
430 : CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
431 : {
432 0 : return typed_reader_->take_w_condition(data_values, sample_infos,
433 0 : max_samples, a_condition);
434 : }
435 :
436 : template<typename Sample, typename TypedDataReader>
437 : DDS::ReturnCode_t
438 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_sample(
439 : Sample& received_data, DDS::SampleInfo& sample_info)
440 : {
441 0 : return typed_reader_->read_next_sample(received_data, sample_info);
442 : }
443 :
444 : template<typename Sample, typename TypedDataReader>
445 : DDS::ReturnCode_t
446 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_sample(
447 : Sample& received_data, DDS::SampleInfo& sample_info)
448 : {
449 0 : return typed_reader_->take_next_sample(received_data, sample_info);
450 : }
451 :
452 : template<typename Sample, typename TypedDataReader>
453 : DDS::ReturnCode_t
454 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance(
455 : SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
456 : CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
457 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
458 : DDS::InstanceStateMask instance_states)
459 : {
460 0 : return typed_reader_->read_instance(received_data, info_seq, max_samples,
461 0 : a_handle, sample_states, view_states, instance_states);
462 : }
463 :
464 : template<typename Sample, typename TypedDataReader>
465 : DDS::ReturnCode_t
466 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance(
467 : SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
468 : CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
469 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
470 : DDS::InstanceStateMask instance_states)
471 : {
472 0 : return typed_reader_->take_instance(received_data, info_seq, max_samples,
473 0 : a_handle, sample_states, view_states, instance_states);
474 : }
475 :
476 : template<typename Sample, typename TypedDataReader>
477 : DDS::ReturnCode_t
478 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance_w_condition(
479 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
480 : CORBA::Long max_samples, DDS::InstanceHandle_t handle,
481 : DDS::ReadCondition_ptr a_condition)
482 : {
483 0 : return typed_reader_->read_instance_w_condition(data_values,
484 0 : sample_infos, max_samples, handle, a_condition);
485 : }
486 :
487 : template<typename Sample, typename TypedDataReader>
488 : DDS::ReturnCode_t
489 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance_w_condition(
490 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
491 : CORBA::Long max_samples, DDS::InstanceHandle_t handle,
492 : DDS::ReadCondition_ptr a_condition)
493 : {
494 0 : return typed_reader_->take_instance_w_condition(data_values,
495 0 : sample_infos, max_samples, handle, a_condition);
496 : }
497 :
498 : template<typename Sample, typename TypedDataReader>
499 : DDS::ReturnCode_t
500 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance(
501 : SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
502 : CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
503 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
504 : DDS::InstanceStateMask instance_states)
505 : {
506 0 : return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
507 0 : a_handle, sample_states, view_states, instance_states);
508 : }
509 :
510 : template<typename Sample, typename TypedDataReader>
511 : DDS::ReturnCode_t
512 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance(
513 : SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
514 : CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
515 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
516 : DDS::InstanceStateMask instance_states)
517 : {
518 0 : return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
519 0 : a_handle, sample_states, view_states, instance_states);
520 : }
521 :
522 : template<typename Sample, typename TypedDataReader>
523 : DDS::ReturnCode_t
524 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance_w_condition(
525 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
526 : CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
527 : DDS::ReadCondition_ptr a_condition)
528 : {
529 0 : return typed_reader_->read_next_instance_w_condition(data_values,
530 0 : sample_infos, max_samples, previous_handle, a_condition);
531 : }
532 :
533 : template<typename Sample, typename TypedDataReader>
534 : DDS::ReturnCode_t
535 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance_w_condition(
536 : SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
537 : CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
538 : DDS::ReadCondition_ptr a_condition)
539 : {
540 0 : return typed_reader_->take_next_instance_w_condition(data_values,
541 0 : sample_infos, max_samples, previous_handle, a_condition);
542 : }
543 :
544 : template<typename Sample, typename TypedDataReader>
545 : DDS::ReturnCode_t
546 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::return_loan(
547 : SampleSeq& received_data, DDS::SampleInfoSeq& info_seq)
548 : {
549 0 : return typed_reader_->return_loan(received_data, info_seq);
550 : }
551 :
552 : template<typename Sample, typename TypedDataReader>
553 : DDS::ReturnCode_t
554 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::get_key_value(
555 : Sample& key_holder, DDS::InstanceHandle_t handle)
556 : {
557 0 : return typed_reader_->get_key_value(key_holder, handle);
558 : }
559 :
560 : template<typename Sample, typename TypedDataReader>
561 : DDS::InstanceHandle_t
562 0 : MultiTopicDataReader_T<Sample, TypedDataReader>::lookup_instance(
563 : const Sample& instance_data)
564 : {
565 0 : return typed_reader_->lookup_instance(instance_data);
566 : }
567 :
568 : }
569 : }
570 :
571 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
572 :
573 : #endif
574 : #endif
|