8 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP 9 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP 11 #ifndef OPENDDS_NO_MULTI_TOPIC 22 template<
typename Sample,
typename TypedDataReader>
26 typed_reader_ = TypedDataReader::Interface::_narrow(dr);
29 template<
typename Sample,
typename TypedDataReader>
33 return getMetaStruct<Sample>();
36 template<
typename Sample,
typename TypedDataReader>
42 const vector<SubjectFieldSpec>& proj = qp.
projection_;
43 const MetaStruct& resulting_meta = getResultingMeta();
45 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
46 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
47 resulting_meta.
assign(&resulting, iter->resulting_name_.c_str(),
48 incoming, iter->incoming_name_.c_str(), meta);
52 for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
53 iter != proj_out.end(); ++iter) {
54 resulting_meta.
assign(&resulting, iter->c_str(),
55 incoming, iter->c_str(), meta);
59 template<
typename Sample,
typename TypedDataReader>
65 const MetaStruct& resulting_meta = getResultingMeta();
67 for (TopicSet::const_iterator iterTopic = other_topics.begin();
68 iterTopic != other_topics.end(); ++iterTopic) {
69 const vector<SubjectFieldSpec>& proj = query_plans_[*iterTopic].projection_;
70 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
71 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
72 resulting_meta.
assign(&target, iter->resulting_name_.c_str(),
73 &source, iter->resulting_name_.c_str(), resulting_meta);
78 template<
typename Sample,
typename TypedDataReader>
82 const std::vector<OPENDDS_STRING>& key_names,
const void* key_data,
83 DDS::DataReader_ptr other_dr,
const MetaStruct& other_meta)
89 ACE_TEXT(
"Failed to get DataReaderImpl.\n")));
95 const QueryPlan& other_qp = query_plans_[other_topic.
in()];
96 const size_t n_keys = key_names.size();
98 if (n_keys > 0 && other_meta.
numDcpsKeys() == n_keys) {
107 ACE_ERROR((
LM_NOTICE,
"(%P|%t) NOTICE: MultiTopicDataReader_T::join: read_instance_generic" 116 resulting.push_back(prototype);
118 assign_fields(resulting.back().sample_, other_data.
ptr_, other_qp, other_meta);
130 " read_next_instance_generic for topic %C returns %C\n",
141 for (
size_t i = 0; match && i < key_names.size(); ++i) {
142 if (!other_meta.
compare(key_data, other_data.
ptr_, key_names[i].c_str())) {
148 resulting.push_back(prototype);
150 assign_fields(resulting.back().sample_, other_data.
ptr_, other_qp, other_meta);
157 template<
typename Sample,
typename TypedDataReader>
161 const std::vector<OPENDDS_STRING>& key_names,
const TopicSet& other_topics)
165 for (
typename SampleVec::iterator it_res = resulting.begin();
166 it_res != resulting.end(); ) {
167 bool found_one_match =
false;
168 for (
typename SampleVec::const_iterator it_other = other.begin();
169 it_other != other.end(); ++it_other) {
171 for (
size_t i = 0; match && i < key_names.size(); ++i) {
172 if (!meta.
compare(&*it_res, &*it_other, key_names[i].c_str())) {
179 if (found_one_match) {
180 new_data.push_back(*it_res);
181 new_data.back().combine(*it_other);
182 assign_resulting_fields(new_data.back().sample_, it_other->sample_, other_topics);
184 found_one_match =
true;
185 it_res->combine(*it_other);
186 assign_resulting_fields(it_res->sample_, it_other->sample_, other_topics);
189 if (found_one_match) {
193 it_res = resulting.erase(it_res);
196 resulting.insert(resulting.end(), new_data.begin(), new_data.end());
229 template<
typename Sample,
typename TypedDataReader>
232 std::map<TopicSet, SampleVec>& partial_results,
SampleVec starting,
236 const MetaStruct& resulting_meta = getResultingMeta();
242 typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
247 const QueryPlan& other_qp = query_plans_[other_topic];
249 const MetaStruct& other_meta = metaStructFor(other_dr);
251 vector<OPENDDS_STRING> keys;
252 for (; iter != range_end; ++iter) {
253 keys.push_back(iter->second);
256 typename std::map<TopicSet, SampleVec>::iterator found =
257 find_if(partial_results.begin(), partial_results.end(),
Contains(other_topic));
259 if (found == partial_results.end()) {
260 partial_results.erase(seen);
262 with_join.insert(other_topic);
263 SampleVec& join_result = partial_results[with_join];
264 for (
size_t i = 0; i < starting.size(); ++i) {
266 for (
size_t j = 0; j < keys.size(); ++j) {
267 other_meta.
assign(other_keys.
ptr_, keys[j].c_str(),
268 &starting[i], keys[j].c_str(), resulting_meta);
270 if (!join(join_result, starting[i], keys, other_keys.
ptr_, other_dr, other_meta)) {
275 if (!join_result.empty() && !seen.count(other_topic)) {
278 with_join, other_qp);
283 }
else if (!found->first.count(this_topic) ) {
288 combine(starting, found->second, keys, found->first);
290 for (set<OPENDDS_STRING>::const_iterator it = found->first.begin(); it != found->first.end(); ++it) {
291 new_topics.insert(*it);
294 partial_results.erase(found);
295 partial_results.erase(seen);
296 partial_results[new_topics] = starting;
302 template<
typename Sample,
typename TypedDataReader>
305 std::map<TopicSet, SampleVec>& partial_results,
const TopicSet& seen,
310 vector<OPENDDS_STRING> no_keys;
311 for (
typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
312 it_pr != partial_results.end(); ++it_pr) {
314 for (
typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
315 if (!join(resulting, *i, no_keys, 0, qp.
data_reader_, other_meta)) {
319 resulting.swap(it_pr->second);
324 partial_results[with_join].swap(partial_results[seen]);
325 partial_results.erase(seen);
326 const DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[with_join],
329 partial_results.erase(with_join);
336 template<
typename Sample,
typename TypedDataReader>
343 const QueryPlan& qp = query_plans_[topic];
346 std::map<TopicSet, SampleVec> partial_results;
350 assign_fields(partial_results[seen].back().sample_, sample, qp, meta);
352 DDS::ReturnCode_t ret = process_joins(partial_results, partial_results[seen], seen, qp);
358 for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
359 iter != query_plans_.end(); ++iter) {
360 typename std::map<TopicSet, SampleVec>::iterator found =
361 find_if(partial_results.begin(), partial_results.end(),
Contains(iter->first));
362 if (found == partial_results.end()) {
363 ret = cross_join(partial_results, seen, iter->second);
370 TypedDataReader* tdr =
dynamic_cast<TypedDataReader*
>(typed_reader_.in());
373 ACE_TEXT(
" Failed to get TypedDataReader.\n")));
377 for (
typename std::map<TopicSet, SampleVec>::iterator it_pr = partial_results.begin();
378 it_pr != partial_results.end(); ++it_pr) {
379 for (
typename SampleVec::iterator i = it_pr->second.begin(); i != it_pr->second.end(); ++i) {
382 typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
383 for (mapiter_t it_map = i->info_.begin(); it_map != i->info_.end(); ++it_map) {
384 query_plans_[it_map->first].instances_.insert(make_pair(it_map->second, ih));
394 template<
typename Sample,
typename TypedDataReader>
401 return typed_reader_->read(received_data, info_seq, max_samples,
402 sample_states, view_states, instance_states);
405 template<
typename Sample,
typename TypedDataReader>
412 return typed_reader_->take(received_data, info_seq, max_samples,
413 sample_states, view_states, instance_states);
416 template<
typename Sample,
typename TypedDataReader>
422 return typed_reader_->read_w_condition(data_values, sample_infos,
423 max_samples, a_condition);
426 template<
typename Sample,
typename TypedDataReader>
432 return typed_reader_->take_w_condition(data_values, sample_infos,
433 max_samples, a_condition);
436 template<
typename Sample,
typename TypedDataReader>
441 return typed_reader_->read_next_sample(received_data, sample_info);
444 template<
typename Sample,
typename TypedDataReader>
449 return typed_reader_->take_next_sample(received_data, sample_info);
452 template<
typename Sample,
typename TypedDataReader>
460 return typed_reader_->read_instance(received_data, info_seq, max_samples,
461 a_handle, sample_states, view_states, instance_states);
464 template<
typename Sample,
typename TypedDataReader>
472 return typed_reader_->take_instance(received_data, info_seq, max_samples,
473 a_handle, sample_states, view_states, instance_states);
476 template<
typename Sample,
typename TypedDataReader>
481 DDS::ReadCondition_ptr a_condition)
483 return typed_reader_->read_instance_w_condition(data_values,
484 sample_infos, max_samples, handle, a_condition);
487 template<
typename Sample,
typename TypedDataReader>
492 DDS::ReadCondition_ptr a_condition)
494 return typed_reader_->take_instance_w_condition(data_values,
495 sample_infos, max_samples, handle, a_condition);
498 template<
typename Sample,
typename TypedDataReader>
506 return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
507 a_handle, sample_states, view_states, instance_states);
510 template<
typename Sample,
typename TypedDataReader>
518 return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
519 a_handle, sample_states, view_states, instance_states);
522 template<
typename Sample,
typename TypedDataReader>
527 DDS::ReadCondition_ptr a_condition)
529 return typed_reader_->read_next_instance_w_condition(data_values,
530 sample_infos, max_samples, previous_handle, a_condition);
533 template<
typename Sample,
typename TypedDataReader>
538 DDS::ReadCondition_ptr a_condition)
540 return typed_reader_->take_next_instance_w_condition(data_values,
541 sample_infos, max_samples, previous_handle, a_condition);
544 template<
typename Sample,
typename TypedDataReader>
549 return typed_reader_->return_loan(received_data, info_seq);
552 template<
typename Sample,
typename TypedDataReader>
557 return typed_reader_->get_key_value(key_holder, handle);
560 template<
typename Sample,
typename TypedDataReader>
563 const Sample& instance_data)
565 return typed_reader_->lookup_instance(instance_data);
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
const InstanceHandle_t HANDLE_NIL
virtual size_t numDcpsKeys() const =0
virtual bool compare(const void *lhs, const void *rhs, const char *fieldSpec) const =0
void incoming_sample(void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
virtual DDS::TopicDescription_ptr get_topicdescription()
DDS::ReturnCode_t read(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
void assign_fields(Sample &resulting, void *incoming, const QueryPlan &qp, const MetaStruct &meta)
std::vector< SampleWithInfo > SampleVec
virtual void assign(void *lhs, const char *lhsFieldSpec, const void *rhs, const char *rhsFieldSpec, const MetaStruct &rhsMeta) const =0
sequence< SampleInfo > SampleInfoSeq
DDS::ReturnCode_t take_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t take_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t take(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
std::set< OPENDDS_STRING > TopicSet
DDS::ReturnCode_t read_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
unsigned long InstanceStateMask
DDS::ReturnCode_t read_next_instance(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
bool join(SampleVec &resulting, const SampleWithInfo &prototype, const std::vector< OPENDDS_STRING > &key_names, const void *key_data, DDS::DataReader_ptr other_dr, const MetaStruct &other_meta)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
DDS::ReturnCode_t get_key_value(Sample &key_holder, DDS::InstanceHandle_t handle)
DDS::DataReader_var data_reader_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
void assign_resulting_fields(Sample &target, const Sample &source, const TopicSet &other_topics)
DDS::ReturnCode_t read_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
std::vector< OPENDDS_STRING > keys_projected_out_
InstanceHandle_t instance_handle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DDS::ReturnCode_t process_joins(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, SampleVec starting, const TopicSet &seen, const QueryPlan &qp)
Implements the DDS::DataReader interface.
void combine(SampleVec &resulting, const SampleVec &other, const std::vector< OPENDDS_STRING > &key_names, const TopicSet &other_topics)
DDS::ReturnCode_t read_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
const ViewStateMask ANY_VIEW_STATE
DDS::ReturnCode_t return_loan(SampleSeq &received_data, DDS::SampleInfoSeq &info_seq)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES
unsigned long SampleStateMask
DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition)
HANDLE_TYPE_NATIVE InstanceHandle_t
const SampleStateKind READ_SAMPLE_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
void init_typed(DataReaderEx *dr)
const ReturnCode_t RETCODE_NO_DATA
OpenDDS_Dcps_Export LogLevel log_level
DDS::ReturnCode_t take_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
const char * retcode_to_string(DDS::ReturnCode_t value)
const ReturnCode_t RETCODE_ERROR
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
DDS::InstanceHandle_t lookup_instance(const Sample &instance_data)
virtual DDS::ReturnCode_t read_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t read_instance_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition)
const character_type * in(void) const
DDS::ReturnCode_t take_next_sample(Sample &received_data, DDS::SampleInfo &sample_info)
const MetaStruct & getResultingMeta()
std::multimap< OPENDDS_STRING, OPENDDS_STRING > adjacent_joins_
The Internal API and Implementation of OpenDDS.
unsigned long ViewStateMask
const InstanceStateKind ALIVE_INSTANCE_STATE
virtual DDS::InstanceHandle_t lookup_instance_generic(const void *data)=0
DDS::ReturnCode_t take_w_condition(SampleSeq &data_values, DDS::SampleInfoSeq &sample_infos, CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
virtual DDS::ReturnCode_t read_next_instance_generic(void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
std::vector< SubjectFieldSpec > projection_
DDS::ReturnCode_t cross_join(OPENDDS_MAP(TopicSet, SampleVec)&partial_results, const TopicSet &seen, const QueryPlan &qp)