00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include <stdexcept>
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 template<typename Sample, typename TypedDataReader>
00019 void
00020 MultiTopicDataReader_T<Sample, TypedDataReader>::init_typed(DataReaderEx* dr)
00021 {
00022 typed_reader_ = TypedDataReader::Interface::_narrow(dr);
00023 }
00024
00025 template<typename Sample, typename TypedDataReader>
00026 const MetaStruct&
00027 MultiTopicDataReader_T<Sample, TypedDataReader>::getResultingMeta()
00028 {
00029 return getMetaStruct<Sample>();
00030 }
00031
00032 template<typename Sample, typename TypedDataReader>
00033 void
00034 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_fields(void* incoming,
00035 Sample& resulting, const MultiTopicDataReaderBase::QueryPlan& qp,
00036 const MetaStruct& meta)
00037 {
00038 using namespace std;
00039 const vector<SubjectFieldSpec>& proj = qp.projection_;
00040 const MetaStruct& resulting_meta = getResultingMeta();
00041 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00042 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00043 const SubjectFieldSpec& sfs = *iter;
00044 resulting_meta.assign(&resulting, sfs.resulting_name_.c_str(),
00045 incoming, sfs.incoming_name_.c_str(), meta);
00046 }
00047
00048 const vector<OPENDDS_STRING>& proj_out = qp.keys_projected_out_;
00049 for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
00050 iter != proj_out.end(); ++iter) {
00051 resulting_meta.assign(&resulting, iter->c_str(),
00052 incoming, iter->c_str(), meta);
00053 }
00054 }
00055
00056 template<typename Sample, typename TypedDataReader>
00057 void
00058 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_resulting_fields(
00059 Sample& target, const Sample& source, const TopicSet& other_topics)
00060 {
00061 using namespace std;
00062 const MetaStruct& meta = getResultingMeta();
00063 for (TopicSet::const_iterator iterTopic = other_topics.begin();
00064 iterTopic != other_topics.end(); ++iterTopic) {
00065 const QueryPlan& qp = query_plans_[*iterTopic];
00066 const vector<SubjectFieldSpec>& proj = qp.projection_;
00067 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00068 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00069 const SubjectFieldSpec& sfs = *iter;
00070 meta.assign(&target, sfs.resulting_name_.c_str(),
00071 &source, sfs.resulting_name_.c_str(), meta);
00072 }
00073 }
00074 }
00075
00076 template<typename Sample, typename TypedDataReader>
00077 void
00078 MultiTopicDataReader_T<Sample, TypedDataReader>::join(
00079 SampleVec& resulting, const SampleWithInfo& prototype,
00080 const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00081 DDS::DataReader_ptr other_dr, const MetaStruct& other_meta)
00082 {
00083 using namespace DDS;
00084 DataReaderImpl* other_dri = dynamic_cast<DataReaderImpl*>(other_dr);
00085 TopicDescription_var other_td = other_dri->get_topicdescription();
00086 CORBA::String_var other_topic = other_td->get_name();
00087 const QueryPlan& other_qp = query_plans_[other_topic.in()];
00088 const size_t n_keys = key_names.size();
00089
00090 if (n_keys > 0 && other_meta.numDcpsKeys() == n_keys) {
00091 InstanceHandle_t ih = other_dri->lookup_instance_generic(key_data);
00092 if (ih != HANDLE_NIL) {
00093 GenericData other_data(other_meta, false);
00094 SampleInfo info;
00095 ReturnCode_t ret = other_dri->read_instance_generic(other_data.ptr_,
00096 info, ih, READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00097 if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00098 OPENDDS_STRING rc_ss;
00099 rc_ss.reserve(sizeof(ReturnCode_t));
00100 rc_ss += ret;
00101 throw std::runtime_error("In join(), incoming DataReader for " +
00102 OPENDDS_STRING(other_topic) + " read_instance_generic, error #" +
00103 rc_ss);
00104 } else if (ret == DDS::RETCODE_OK) {
00105 resulting.push_back(prototype);
00106 resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00107 assign_fields(other_data.ptr_, resulting.back().sample_,
00108 other_qp, other_meta);
00109 }
00110 }
00111 } else {
00112 SampleVec new_resulting;
00113 ReturnCode_t ret = RETCODE_OK;
00114 for (InstanceHandle_t ih = HANDLE_NIL; ret != RETCODE_NO_DATA;) {
00115 GenericData other_data(other_meta, false);
00116 SampleInfo info;
00117 ret = other_dri->read_next_instance_generic(other_data.ptr_, info, ih,
00118 READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00119 if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00120 OPENDDS_STRING rc_ss;
00121 rc_ss.reserve(sizeof(ReturnCode_t));
00122 rc_ss += ret;
00123 throw std::runtime_error("In join(), incoming DataReader for " +
00124 OPENDDS_STRING(other_topic) + " read_next_instance_generic, error #" +
00125 rc_ss);
00126 } else if (ret == RETCODE_NO_DATA) {
00127 break;
00128 }
00129 ih = info.instance_handle;
00130
00131 bool match = true;
00132 for (size_t i = 0; match && i < key_names.size(); ++i) {
00133 if (!other_meta.compare(key_data, other_data.ptr_,
00134 key_names[i].c_str())) {
00135 match = false;
00136 }
00137 }
00138
00139 if (match) {
00140 resulting.push_back(prototype);
00141 resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00142 assign_fields(other_data.ptr_, resulting.back().sample_,
00143 other_qp, other_meta);
00144 }
00145 }
00146 }
00147 }
00148
00149 template<typename Sample, typename TypedDataReader>
00150 void
00151 MultiTopicDataReader_T<Sample, TypedDataReader>::combine(
00152 SampleVec& resulting, const SampleVec& other,
00153 const std::vector<OPENDDS_STRING>& key_names, const TopicSet& other_topics)
00154 {
00155 const MetaStruct& meta = getResultingMeta();
00156 SampleVec newData;
00157 for (typename SampleVec::iterator iterRes = resulting.begin();
00158 iterRes != resulting.end(); ) {
00159 bool foundOneMatch = false;
00160 for (typename SampleVec::const_iterator iterOther = other.begin();
00161 iterOther != other.end(); ++iterOther) {
00162 bool match = true;
00163 for (size_t i = 0; match && i < key_names.size(); ++i) {
00164 if (!meta.compare(&*iterRes, &*iterOther, key_names[i].c_str())) {
00165 match = false;
00166 }
00167 }
00168 if (!match) {
00169 continue;
00170 }
00171 if (foundOneMatch) {
00172 newData.push_back(*iterRes);
00173 newData.back().combine(*iterOther);
00174 assign_resulting_fields(newData.back().sample_,
00175 iterOther->sample_, other_topics);
00176 } else {
00177 foundOneMatch = true;
00178 iterRes->combine(*iterOther);
00179 assign_resulting_fields(iterRes->sample_,
00180 iterOther->sample_, other_topics);
00181 }
00182 }
00183 if (foundOneMatch) {
00184 ++iterRes;
00185 } else {
00186
00187 iterRes = resulting.erase(iterRes);
00188 }
00189 }
00190 resulting.insert(resulting.end(), newData.begin(), newData.end());
00191 }
00192
00193 template<typename Sample, typename TypedDataReader>
00194 void
00195 MultiTopicDataReader_T<Sample, TypedDataReader>::process_joins(
00196 std::map<TopicSet, SampleVec>& partialResults, SampleVec starting,
00197 const TopicSet& seen, const QueryPlan& qp)
00198 {
00199 using namespace std;
00200 const MetaStruct& resulting_meta = getResultingMeta();
00201 const OPENDDS_STRING this_topic = topicNameFor(qp.data_reader_);
00202 typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
00203 for (iter_t iter = qp.adjacent_joins_.begin();
00204 iter != qp.adjacent_joins_.end();) {
00205 const OPENDDS_STRING& other_topic = iter->first;
00206 iter_t range_end = qp.adjacent_joins_.upper_bound(other_topic);
00207 const QueryPlan& other_qp = query_plans_[other_topic];
00208 DDS::DataReader_ptr other_dr = other_qp.data_reader_;
00209 const MetaStruct& other_meta = metaStructFor(other_dr);
00210
00211 vector<OPENDDS_STRING> keys;
00212 for (; iter != range_end; ++iter) {
00213 keys.push_back(iter->second);
00214 }
00215
00216 typename std::map<TopicSet, SampleVec>::iterator found =
00217 find_if(partialResults.begin(), partialResults.end(),
00218 Contains(other_topic));
00219
00220 if (found == partialResults.end()) {
00221
00222 partialResults.erase(seen);
00223 TopicSet withJoin(seen);
00224 withJoin.insert(other_topic);
00225 SampleVec& join_result = partialResults[withJoin];
00226 for (size_t i = 0; i < starting.size(); ++i) {
00227 GenericData other_key(other_meta);
00228 for (size_t j = 0; j < keys.size(); ++j) {
00229 other_meta.assign(other_key.ptr_, keys[j].c_str(),
00230 &starting[i], keys[j].c_str(), resulting_meta);
00231 }
00232 join(join_result, starting[i], keys,
00233 other_key.ptr_, other_dr, other_meta);
00234 }
00235
00236 if (!join_result.empty() && !seen.count(other_topic)) {
00237
00238 process_joins(partialResults, join_result, withJoin, other_qp);
00239 }
00240
00241 } else if (!found->first.count(this_topic) ) {
00242
00243
00244 combine(starting, found->second, keys, found->first);
00245 TopicSet newKey(found->first);
00246 for (set<OPENDDS_STRING>::const_iterator i3 = found->first.begin();
00247 i3 != found->first.end(); ++i3) {
00248 newKey.insert(*i3);
00249 }
00250 partialResults.erase(found);
00251 partialResults[newKey] = starting;
00252
00253 }
00254 }
00255 }
00256
00257 template<typename Sample, typename TypedDataReader>
00258 void
00259 MultiTopicDataReader_T<Sample, TypedDataReader>::cross_join(
00260 std::map<TopicSet, SampleVec>& partialResults, const TopicSet& seen,
00261 const QueryPlan& qp)
00262 {
00263 using namespace std;
00264 const MetaStruct& other_meta = metaStructFor(qp.data_reader_);
00265 vector<OPENDDS_STRING> no_keys;
00266 for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00267 partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00268 SampleVec resulting;
00269 for (typename SampleVec::iterator i = iterPR->second.begin();
00270 i != iterPR->second.end(); ++i) {
00271 join(resulting, *i, no_keys, NULL, qp.data_reader_, other_meta);
00272 }
00273 resulting.swap(iterPR->second);
00274 }
00275 TopicSet withJoin(seen);
00276 withJoin.insert(topicNameFor(qp.data_reader_));
00277 partialResults[withJoin].swap(partialResults[seen]);
00278 partialResults.erase(seen);
00279 process_joins(partialResults, partialResults[withJoin], withJoin, qp);
00280 }
00281
00282 template<typename Sample, typename TypedDataReader>
00283 void
00284 MultiTopicDataReader_T<Sample, TypedDataReader>::incoming_sample(void* sample,
00285 const DDS::SampleInfo& info, const char* topic, const MetaStruct& meta)
00286 {
00287 using namespace std;
00288 using namespace DDS;
00289 const QueryPlan& qp = query_plans_[topic];
00290
00291
00292 std::map<TopicSet, SampleVec> partialResults;
00293 TopicSet seen;
00294 seen.insert(topic);
00295 partialResults[seen].push_back(SampleWithInfo(topic, info));
00296 assign_fields(sample, partialResults[seen].back().sample_, qp, meta);
00297
00298 process_joins(partialResults, partialResults[seen], seen, qp);
00299
00300
00301 for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
00302 iter != query_plans_.end(); ++iter) {
00303 typename std::map<TopicSet, SampleVec>::iterator found =
00304 find_if(partialResults.begin(), partialResults.end(),
00305 Contains(iter->first));
00306 if (found == partialResults.end()) {
00307 cross_join(partialResults, seen, iter->second);
00308 }
00309 }
00310
00311 TypedDataReader* tdr = dynamic_cast<TypedDataReader*>(typed_reader_.in());
00312 for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00313 partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00314 for (typename SampleVec::iterator i = iterPR->second.begin();
00315 i != iterPR->second.end(); ++i) {
00316 InstanceHandle_t ih = tdr->store_synthetic_data(i->sample_, i->view_);
00317 if (ih != HANDLE_NIL) {
00318 typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
00319 for (mapiter_t iterMap = i->info_.begin(); iterMap != i->info_.end();
00320 ++iterMap) {
00321 query_plans_[iterMap->first].instances_.insert(
00322 make_pair(iterMap->second, ih));
00323 }
00324 }
00325 }
00326 }
00327 }
00328
00329
00330
00331
00332 template<typename Sample, typename TypedDataReader>
00333 DDS::ReturnCode_t
00334 MultiTopicDataReader_T<Sample, TypedDataReader>::read(SampleSeq& received_data,
00335 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00336 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00337 DDS::InstanceStateMask instance_states)
00338 {
00339 return typed_reader_->read(received_data, info_seq, max_samples,
00340 sample_states, view_states, instance_states);
00341 }
00342
00343 template<typename Sample, typename TypedDataReader>
00344 DDS::ReturnCode_t
00345 MultiTopicDataReader_T<Sample, TypedDataReader>::take(SampleSeq& received_data,
00346 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00347 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00348 DDS::InstanceStateMask instance_states)
00349 {
00350 return typed_reader_->take(received_data, info_seq, max_samples,
00351 sample_states, view_states, instance_states);
00352 }
00353
00354 template<typename Sample, typename TypedDataReader>
00355 DDS::ReturnCode_t
00356 MultiTopicDataReader_T<Sample, TypedDataReader>::read_w_condition(
00357 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00358 CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00359 {
00360 return typed_reader_->read_w_condition(data_values, sample_infos,
00361 max_samples, a_condition);
00362 }
00363
00364 template<typename Sample, typename TypedDataReader>
00365 DDS::ReturnCode_t
00366 MultiTopicDataReader_T<Sample, TypedDataReader>::take_w_condition(
00367 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00368 CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00369 {
00370 return typed_reader_->take_w_condition(data_values, sample_infos,
00371 max_samples, a_condition);
00372 }
00373
00374 template<typename Sample, typename TypedDataReader>
00375 DDS::ReturnCode_t
00376 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_sample(
00377 Sample& received_data, DDS::SampleInfo& sample_info)
00378 {
00379 return typed_reader_->read_next_sample(received_data, sample_info);
00380 }
00381
00382 template<typename Sample, typename TypedDataReader>
00383 DDS::ReturnCode_t
00384 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_sample(
00385 Sample& received_data, DDS::SampleInfo& sample_info)
00386 {
00387 return typed_reader_->take_next_sample(received_data, sample_info);
00388 }
00389
00390 template<typename Sample, typename TypedDataReader>
00391 DDS::ReturnCode_t
00392 MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance(
00393 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00394 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00395 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00396 DDS::InstanceStateMask instance_states)
00397 {
00398 return typed_reader_->read_instance(received_data, info_seq, max_samples,
00399 a_handle, sample_states, view_states, instance_states);
00400 }
00401
00402 template<typename Sample, typename TypedDataReader>
00403 DDS::ReturnCode_t
00404 MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance(
00405 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00406 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00407 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00408 DDS::InstanceStateMask instance_states)
00409 {
00410 return typed_reader_->take_instance(received_data, info_seq, max_samples,
00411 a_handle, sample_states, view_states, instance_states);
00412 }
00413
00414 template<typename Sample, typename TypedDataReader>
00415 DDS::ReturnCode_t
00416 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance(
00417 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00418 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00419 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00420 DDS::InstanceStateMask instance_states)
00421 {
00422 return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
00423 a_handle, sample_states, view_states, instance_states);
00424 }
00425
00426 template<typename Sample, typename TypedDataReader>
00427 DDS::ReturnCode_t
00428 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance(
00429 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00430 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00431 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00432 DDS::InstanceStateMask instance_states)
00433 {
00434 return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
00435 a_handle, sample_states, view_states, instance_states);
00436 }
00437
00438 template<typename Sample, typename TypedDataReader>
00439 DDS::ReturnCode_t
00440 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance_w_condition(
00441 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00442 CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00443 DDS::ReadCondition_ptr a_condition)
00444 {
00445 return typed_reader_->read_next_instance_w_condition(data_values,
00446 sample_infos, max_samples, previous_handle, a_condition);
00447 }
00448
00449 template<typename Sample, typename TypedDataReader>
00450 DDS::ReturnCode_t
00451 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance_w_condition(
00452 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00453 CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00454 DDS::ReadCondition_ptr a_condition)
00455 {
00456 return typed_reader_->take_next_instance_w_condition(data_values,
00457 sample_infos, max_samples, previous_handle, a_condition);
00458 }
00459
00460 template<typename Sample, typename TypedDataReader>
00461 DDS::ReturnCode_t
00462 MultiTopicDataReader_T<Sample, TypedDataReader>::return_loan(
00463 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq)
00464 {
00465 return typed_reader_->return_loan(received_data, info_seq);
00466 }
00467
00468 template<typename Sample, typename TypedDataReader>
00469 DDS::ReturnCode_t
00470 MultiTopicDataReader_T<Sample, TypedDataReader>::get_key_value(
00471 Sample& key_holder, DDS::InstanceHandle_t handle)
00472 {
00473 return typed_reader_->get_key_value(key_holder, handle);
00474 }
00475
00476 template<typename Sample, typename TypedDataReader>
00477 DDS::InstanceHandle_t
00478 MultiTopicDataReader_T<Sample, TypedDataReader>::lookup_instance(
00479 const Sample& instance_data)
00480 {
00481 return typed_reader_->lookup_instance(instance_data);
00482 }
00483
00484 }
00485 }
00486
00487 #endif
00488 #endif