MultiTopicDataReader_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00010 
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012 
00013 #include <stdexcept>
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 template<typename Sample, typename TypedDataReader>
00019 void
00020 MultiTopicDataReader_T<Sample, TypedDataReader>::init_typed(DataReaderEx* dr)
00021 {
00022   typed_reader_ = TypedDataReader::Interface::_narrow(dr);
00023 }
00024 
00025 template<typename Sample, typename TypedDataReader>
00026 const MetaStruct&
00027 MultiTopicDataReader_T<Sample, TypedDataReader>::getResultingMeta()
00028 {
00029   return getMetaStruct<Sample>();
00030 }
00031 
00032 template<typename Sample, typename TypedDataReader>
00033 void
00034 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_fields(void* incoming,
00035   Sample& resulting, const MultiTopicDataReaderBase::QueryPlan& qp,
00036   const MetaStruct& meta)
00037 {
00038   using namespace std;
00039   const vector<SubjectFieldSpec>& proj = qp.projection_;
00040   const MetaStruct& resulting_meta = getResultingMeta();
00041   typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00042   for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00043     const SubjectFieldSpec& sfs = *iter;
00044     resulting_meta.assign(&resulting, sfs.resulting_name_.c_str(),
00045                           incoming, sfs.incoming_name_.c_str(), meta);
00046   }
00047 
00048   const vector<OPENDDS_STRING>& proj_out = qp.keys_projected_out_;
00049   for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
00050        iter != proj_out.end(); ++iter) {
00051     resulting_meta.assign(&resulting, iter->c_str(),
00052                           incoming, iter->c_str(), meta);
00053   }
00054 }
00055 
00056 template<typename Sample, typename TypedDataReader>
00057 void
00058 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_resulting_fields(
00059   Sample& target, const Sample& source, const TopicSet& other_topics)
00060 {
00061   using namespace std;
00062   const MetaStruct& meta = getResultingMeta();
00063   for (TopicSet::const_iterator iterTopic = other_topics.begin();
00064        iterTopic != other_topics.end(); ++iterTopic) {
00065     const QueryPlan& qp = query_plans_[*iterTopic];
00066     const vector<SubjectFieldSpec>& proj = qp.projection_;
00067     typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00068     for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00069       const SubjectFieldSpec& sfs = *iter;
00070       meta.assign(&target, sfs.resulting_name_.c_str(),
00071                   &source, sfs.resulting_name_.c_str(), meta);
00072     }
00073   }
00074 }
00075 
00076 template<typename Sample, typename TypedDataReader>
00077 void
00078 MultiTopicDataReader_T<Sample, TypedDataReader>::join(
00079   SampleVec& resulting, const SampleWithInfo& prototype,
00080   const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00081   DDS::DataReader_ptr other_dr, const MetaStruct& other_meta)
00082 {
00083   using namespace DDS;
00084   DataReaderImpl* other_dri = dynamic_cast<DataReaderImpl*>(other_dr);
00085   TopicDescription_var other_td = other_dri->get_topicdescription();
00086   CORBA::String_var other_topic = other_td->get_name();
00087   const QueryPlan& other_qp = query_plans_[other_topic.in()];
00088   const size_t n_keys = key_names.size();
00089 
00090   if (n_keys > 0 && other_meta.numDcpsKeys() == n_keys) { // complete key
00091     InstanceHandle_t ih = other_dri->lookup_instance_generic(key_data);
00092     if (ih != HANDLE_NIL) {
00093       GenericData other_data(other_meta, false);
00094       SampleInfo info;
00095       ReturnCode_t ret = other_dri->read_instance_generic(other_data.ptr_,
00096         info, ih, READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00097       if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00098         OPENDDS_STRING rc_ss;
00099         rc_ss.reserve(sizeof(ReturnCode_t));
00100         rc_ss += ret;
00101         throw std::runtime_error("In join(), incoming DataReader for " +
00102           OPENDDS_STRING(other_topic) + " read_instance_generic, error #" +
00103           rc_ss);
00104       } else if (ret == DDS::RETCODE_OK) {
00105         resulting.push_back(prototype);
00106         resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00107         assign_fields(other_data.ptr_, resulting.back().sample_,
00108                       other_qp, other_meta);
00109       }
00110     }
00111   } else { // incomplete key or cross-join (0 key fields)
00112     SampleVec new_resulting;
00113     ReturnCode_t ret = RETCODE_OK;
00114     for (InstanceHandle_t ih = HANDLE_NIL; ret != RETCODE_NO_DATA;) {
00115       GenericData other_data(other_meta, false);
00116       SampleInfo info;
00117       ret = other_dri->read_next_instance_generic(other_data.ptr_, info, ih,
00118         READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00119       if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00120         OPENDDS_STRING rc_ss;
00121         rc_ss.reserve(sizeof(ReturnCode_t));
00122         rc_ss += ret;
00123         throw std::runtime_error("In join(), incoming DataReader for " +
00124           OPENDDS_STRING(other_topic) + " read_next_instance_generic, error #" +
00125           rc_ss);
00126       } else if (ret == RETCODE_NO_DATA) {
00127         break;
00128       }
00129       ih = info.instance_handle;
00130 
00131       bool match = true;
00132       for (size_t i = 0; match && i < key_names.size(); ++i) {
00133         if (!other_meta.compare(key_data, other_data.ptr_,
00134                                 key_names[i].c_str())) {
00135           match = false;
00136         }
00137       }
00138 
00139       if (match) {
00140         resulting.push_back(prototype);
00141         resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00142         assign_fields(other_data.ptr_, resulting.back().sample_,
00143                       other_qp, other_meta);
00144       }
00145     }
00146   }
00147 }
00148 
00149 template<typename Sample, typename TypedDataReader>
00150 void
00151 MultiTopicDataReader_T<Sample, TypedDataReader>::combine(
00152   SampleVec& resulting, const SampleVec& other,
00153   const std::vector<OPENDDS_STRING>& key_names, const TopicSet& other_topics)
00154 {
00155   const MetaStruct& meta = getResultingMeta();
00156   SampleVec newData;
00157   for (typename SampleVec::iterator iterRes = resulting.begin();
00158        iterRes != resulting.end(); /*incremented in loop*/) {
00159     bool foundOneMatch = false;
00160     for (typename SampleVec::const_iterator iterOther = other.begin();
00161          iterOther != other.end(); ++iterOther) {
00162       bool match = true;
00163       for (size_t i = 0; match && i < key_names.size(); ++i) {
00164         if (!meta.compare(&*iterRes, &*iterOther, key_names[i].c_str())) {
00165           match = false;
00166         }
00167       }
00168       if (!match) {
00169         continue;
00170       }
00171       if (foundOneMatch) {
00172         newData.push_back(*iterRes);
00173         newData.back().combine(*iterOther);
00174         assign_resulting_fields(newData.back().sample_,
00175                                 iterOther->sample_, other_topics);
00176       } else {
00177         foundOneMatch = true;
00178         iterRes->combine(*iterOther);
00179         assign_resulting_fields(iterRes->sample_,
00180                                 iterOther->sample_, other_topics);
00181       }
00182     }
00183     if (foundOneMatch) {
00184       ++iterRes;
00185     } else {
00186       // no match found in 'other' so data must not appear in result set
00187       iterRes = resulting.erase(iterRes);
00188     }
00189   }
00190   resulting.insert(resulting.end(), newData.begin(), newData.end());
00191 }
00192 
00193 template<typename Sample, typename TypedDataReader>
00194 void
00195 MultiTopicDataReader_T<Sample, TypedDataReader>::process_joins(
00196   std::map<TopicSet, SampleVec>& partialResults, SampleVec starting,
00197   const TopicSet& seen, const QueryPlan& qp)
00198 {
00199   using namespace std;
00200   const MetaStruct& resulting_meta = getResultingMeta();
00201   const OPENDDS_STRING this_topic = topicNameFor(qp.data_reader_);
00202   typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
00203   for (iter_t iter = qp.adjacent_joins_.begin();
00204        iter != qp.adjacent_joins_.end();) { // for each topic we're joining
00205     const OPENDDS_STRING& other_topic = iter->first;
00206     iter_t range_end = qp.adjacent_joins_.upper_bound(other_topic);
00207     const QueryPlan& other_qp = query_plans_[other_topic];
00208     DDS::DataReader_ptr other_dr = other_qp.data_reader_;
00209     const MetaStruct& other_meta = metaStructFor(other_dr);
00210 
00211     vector<OPENDDS_STRING> keys;
00212     for (; iter != range_end; ++iter) { // for each key in common w/ this topic
00213       keys.push_back(iter->second);
00214     }
00215 
00216     typename std::map<TopicSet, SampleVec>::iterator found =
00217       find_if(partialResults.begin(), partialResults.end(),
00218         Contains(other_topic));
00219 
00220     if (found == partialResults.end()) { // haven't seen this topic yet
00221 
00222       partialResults.erase(seen);
00223       TopicSet withJoin(seen);
00224       withJoin.insert(other_topic);
00225       SampleVec& join_result = partialResults[withJoin];
00226       for (size_t i = 0; i < starting.size(); ++i) {
00227         GenericData other_key(other_meta);
00228         for (size_t j = 0; j < keys.size(); ++j) {
00229           other_meta.assign(other_key.ptr_, keys[j].c_str(),
00230                             &starting[i], keys[j].c_str(), resulting_meta);
00231         }
00232         join(join_result, starting[i], keys,
00233              other_key.ptr_, other_dr, other_meta);
00234       }
00235 
00236       if (!join_result.empty() && !seen.count(other_topic)) {
00237         // recurse
00238         process_joins(partialResults, join_result, withJoin, other_qp);
00239       }
00240 
00241     } else if (!found->first.count(this_topic) /*avoid looping back*/) {
00242       // we have partialResults for this topic, use them instead of recursing
00243 
00244       combine(starting, found->second, keys, found->first);
00245       TopicSet newKey(found->first);
00246       for (set<OPENDDS_STRING>::const_iterator i3 = found->first.begin();
00247            i3 != found->first.end(); ++i3) {
00248         newKey.insert(*i3);
00249       }
00250       partialResults.erase(found);
00251       partialResults[newKey] = starting;
00252 
00253     }
00254   }
00255 }
00256 
00257 template<typename Sample, typename TypedDataReader>
00258 void
00259 MultiTopicDataReader_T<Sample, TypedDataReader>::cross_join(
00260   std::map<TopicSet, SampleVec>& partialResults, const TopicSet& seen,
00261   const QueryPlan& qp)
00262 {
00263   using namespace std;
00264   const MetaStruct& other_meta = metaStructFor(qp.data_reader_);
00265   vector<OPENDDS_STRING> no_keys;
00266   for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00267        partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00268     SampleVec resulting;
00269     for (typename SampleVec::iterator i = iterPR->second.begin();
00270          i != iterPR->second.end(); ++i) {
00271       join(resulting, *i, no_keys, NULL, qp.data_reader_, other_meta);
00272     }
00273     resulting.swap(iterPR->second);
00274   }
00275   TopicSet withJoin(seen);
00276   withJoin.insert(topicNameFor(qp.data_reader_));
00277   partialResults[withJoin].swap(partialResults[seen]);
00278   partialResults.erase(seen);
00279   process_joins(partialResults, partialResults[withJoin], withJoin, qp);
00280 }
00281 
00282 template<typename Sample, typename TypedDataReader>
00283 void
00284 MultiTopicDataReader_T<Sample, TypedDataReader>::incoming_sample(void* sample,
00285   const DDS::SampleInfo& info, const char* topic, const MetaStruct& meta)
00286 {
00287   using namespace std;
00288   using namespace DDS;
00289   const QueryPlan& qp = query_plans_[topic];
00290 
00291   // Track results of joins along multiple paths through the MultiTopic keys.
00292   std::map<TopicSet, SampleVec> partialResults;
00293   TopicSet seen;
00294   seen.insert(topic);
00295   partialResults[seen].push_back(SampleWithInfo(topic, info));
00296   assign_fields(sample, partialResults[seen].back().sample_, qp, meta);
00297 
00298   process_joins(partialResults, partialResults[seen], seen, qp);
00299 
00300   // Any topic we haven't seen needs to be cross-joined
00301   for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
00302        iter != query_plans_.end(); ++iter) {
00303     typename std::map<TopicSet, SampleVec>::iterator found =
00304       find_if(partialResults.begin(), partialResults.end(),
00305               Contains(iter->first));
00306     if (found == partialResults.end()) {
00307       cross_join(partialResults, seen, iter->second);
00308     }
00309   }
00310 
00311   TypedDataReader* tdr = dynamic_cast<TypedDataReader*>(typed_reader_.in());
00312   for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00313        partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00314     for (typename SampleVec::iterator i = iterPR->second.begin();
00315          i != iterPR->second.end(); ++i) {
00316       InstanceHandle_t ih = tdr->store_synthetic_data(i->sample_, i->view_);
00317       if (ih != HANDLE_NIL) {
00318         typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
00319         for (mapiter_t iterMap = i->info_.begin(); iterMap != i->info_.end();
00320              ++iterMap) {
00321           query_plans_[iterMap->first].instances_.insert(
00322             make_pair(iterMap->second, ih));
00323         }
00324       }
00325     }
00326   }
00327 }
00328 
00329 // The following methods implement the FooDataReader API by delegating
00330 // to the typed_reader_.
00331 
00332 template<typename Sample, typename TypedDataReader>
00333 DDS::ReturnCode_t
00334 MultiTopicDataReader_T<Sample, TypedDataReader>::read(SampleSeq& received_data,
00335   DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00336   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00337   DDS::InstanceStateMask instance_states)
00338 {
00339   return typed_reader_->read(received_data, info_seq, max_samples,
00340     sample_states, view_states, instance_states);
00341 }
00342 
00343 template<typename Sample, typename TypedDataReader>
00344 DDS::ReturnCode_t
00345 MultiTopicDataReader_T<Sample, TypedDataReader>::take(SampleSeq& received_data,
00346   DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00347   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00348   DDS::InstanceStateMask instance_states)
00349 {
00350   return typed_reader_->take(received_data, info_seq, max_samples,
00351     sample_states, view_states, instance_states);
00352 }
00353 
00354 template<typename Sample, typename TypedDataReader>
00355 DDS::ReturnCode_t
00356 MultiTopicDataReader_T<Sample, TypedDataReader>::read_w_condition(
00357   SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00358   CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00359 {
00360   return typed_reader_->read_w_condition(data_values, sample_infos,
00361     max_samples, a_condition);
00362 }
00363 
00364 template<typename Sample, typename TypedDataReader>
00365 DDS::ReturnCode_t
00366 MultiTopicDataReader_T<Sample, TypedDataReader>::take_w_condition(
00367   SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00368   CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00369 {
00370   return typed_reader_->take_w_condition(data_values, sample_infos,
00371     max_samples, a_condition);
00372 }
00373 
00374 template<typename Sample, typename TypedDataReader>
00375 DDS::ReturnCode_t
00376 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_sample(
00377   Sample& received_data, DDS::SampleInfo& sample_info)
00378 {
00379   return typed_reader_->read_next_sample(received_data, sample_info);
00380 }
00381 
00382 template<typename Sample, typename TypedDataReader>
00383 DDS::ReturnCode_t
00384 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_sample(
00385   Sample& received_data, DDS::SampleInfo& sample_info)
00386 {
00387   return typed_reader_->take_next_sample(received_data, sample_info);
00388 }
00389 
00390 template<typename Sample, typename TypedDataReader>
00391 DDS::ReturnCode_t
00392 MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance(
00393   SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00394   CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00395   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00396   DDS::InstanceStateMask instance_states)
00397 {
00398   return typed_reader_->read_instance(received_data, info_seq, max_samples,
00399     a_handle, sample_states, view_states, instance_states);
00400 }
00401 
00402 template<typename Sample, typename TypedDataReader>
00403 DDS::ReturnCode_t
00404 MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance(
00405   SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00406   CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00407   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00408   DDS::InstanceStateMask instance_states)
00409 {
00410   return typed_reader_->take_instance(received_data, info_seq, max_samples,
00411     a_handle, sample_states, view_states, instance_states);
00412 }
00413 
00414 template<typename Sample, typename TypedDataReader>
00415 DDS::ReturnCode_t
00416 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance(
00417   SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00418   CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00419   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00420   DDS::InstanceStateMask instance_states)
00421 {
00422   return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
00423     a_handle, sample_states, view_states, instance_states);
00424 }
00425 
00426 template<typename Sample, typename TypedDataReader>
00427 DDS::ReturnCode_t
00428 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance(
00429   SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00430   CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00431   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00432   DDS::InstanceStateMask instance_states)
00433 {
00434   return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
00435     a_handle, sample_states, view_states, instance_states);
00436 }
00437 
00438 template<typename Sample, typename TypedDataReader>
00439 DDS::ReturnCode_t
00440 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance_w_condition(
00441   SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00442   CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00443   DDS::ReadCondition_ptr a_condition)
00444 {
00445   return typed_reader_->read_next_instance_w_condition(data_values,
00446     sample_infos, max_samples, previous_handle, a_condition);
00447 }
00448 
00449 template<typename Sample, typename TypedDataReader>
00450 DDS::ReturnCode_t
00451 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance_w_condition(
00452   SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00453   CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00454   DDS::ReadCondition_ptr a_condition)
00455 {
00456   return typed_reader_->take_next_instance_w_condition(data_values,
00457     sample_infos, max_samples, previous_handle, a_condition);
00458 }
00459 
00460 template<typename Sample, typename TypedDataReader>
00461 DDS::ReturnCode_t
00462 MultiTopicDataReader_T<Sample, TypedDataReader>::return_loan(
00463   SampleSeq& received_data, DDS::SampleInfoSeq& info_seq)
00464 {
00465   return typed_reader_->return_loan(received_data, info_seq);
00466 }
00467 
00468 template<typename Sample, typename TypedDataReader>
00469 DDS::ReturnCode_t
00470 MultiTopicDataReader_T<Sample, TypedDataReader>::get_key_value(
00471   Sample& key_holder, DDS::InstanceHandle_t handle)
00472 {
00473   return typed_reader_->get_key_value(key_holder, handle);
00474 }
00475 
00476 template<typename Sample, typename TypedDataReader>
00477 DDS::InstanceHandle_t
00478 MultiTopicDataReader_T<Sample, TypedDataReader>::lookup_instance(
00479   const Sample& instance_data)
00480 {
00481   return typed_reader_->lookup_instance(instance_data);
00482 }
00483 
00484 }
00485 }
00486 
00487 #endif
00488 #endif

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7