00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_CPP
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include <stdexcept>
00014 #include <sstream>
00015
00016
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 namespace OpenDDS {
00020 namespace DCPS {
00021
00022 template<typename Sample, typename TypedDataReader>
00023 void
00024 MultiTopicDataReader_T<Sample, TypedDataReader>::init_typed(DataReaderEx* dr)
00025 {
00026 typed_reader_ = TypedDataReader::Interface::_narrow(dr);
00027 }
00028
00029 template<typename Sample, typename TypedDataReader>
00030 const MetaStruct&
00031 MultiTopicDataReader_T<Sample, TypedDataReader>::getResultingMeta()
00032 {
00033 return getMetaStruct<Sample>();
00034 }
00035
00036 template<typename Sample, typename TypedDataReader>
00037 void
00038 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_fields(void* incoming,
00039 Sample& resulting, const MultiTopicDataReaderBase::QueryPlan& qp,
00040 const MetaStruct& meta)
00041 {
00042 using namespace std;
00043 const vector<SubjectFieldSpec>& proj = qp.projection_;
00044 const MetaStruct& resulting_meta = getResultingMeta();
00045 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00046 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00047 const SubjectFieldSpec& sfs = *iter;
00048 resulting_meta.assign(&resulting, sfs.resulting_name_.c_str(),
00049 incoming, sfs.incoming_name_.c_str(), meta);
00050 }
00051
00052 const vector<OPENDDS_STRING>& proj_out = qp.keys_projected_out_;
00053 for (vector<OPENDDS_STRING>::const_iterator iter = proj_out.begin();
00054 iter != proj_out.end(); ++iter) {
00055 resulting_meta.assign(&resulting, iter->c_str(),
00056 incoming, iter->c_str(), meta);
00057 }
00058 }
00059
00060 template<typename Sample, typename TypedDataReader>
00061 void
00062 MultiTopicDataReader_T<Sample, TypedDataReader>::assign_resulting_fields(
00063 Sample& target, const Sample& source, const TopicSet& other_topics)
00064 {
00065 using namespace std;
00066 const MetaStruct& meta = getResultingMeta();
00067 for (TopicSet::const_iterator iterTopic = other_topics.begin();
00068 iterTopic != other_topics.end(); ++iterTopic) {
00069 const QueryPlan& qp = query_plans_[*iterTopic];
00070 const vector<SubjectFieldSpec>& proj = qp.projection_;
00071 typedef vector<SubjectFieldSpec>::const_iterator iter_t;
00072 for (iter_t iter = proj.begin(); iter != proj.end(); ++iter) {
00073 const SubjectFieldSpec& sfs = *iter;
00074 meta.assign(&target, sfs.resulting_name_.c_str(),
00075 &source, sfs.resulting_name_.c_str(), meta);
00076 }
00077 }
00078 }
00079
00080 template<typename Sample, typename TypedDataReader>
00081 void
00082 MultiTopicDataReader_T<Sample, TypedDataReader>::join(
00083 SampleVec& resulting, const SampleWithInfo& prototype,
00084 const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00085 DDS::DataReader_ptr other_dr, const MetaStruct& other_meta)
00086 {
00087 using namespace DDS;
00088 DataReaderImpl* other_dri = dynamic_cast<DataReaderImpl*>(other_dr);
00089
00090 if (!other_dri) {
00091 ACE_ERROR((LM_ERROR,
00092 ACE_TEXT("(%P|%t) ERROR: ")
00093 ACE_TEXT("MultiTopicDataReader_T::join, ")
00094 ACE_TEXT("Failed to get DataReaderImpl.\n")));
00095 return;
00096 }
00097
00098 TopicDescription_var other_td = other_dri->get_topicdescription();
00099 CORBA::String_var other_topic = other_td->get_name();
00100 const QueryPlan& other_qp = query_plans_[other_topic.in()];
00101 const size_t n_keys = key_names.size();
00102
00103 if (n_keys > 0 && other_meta.numDcpsKeys() == n_keys) {
00104 InstanceHandle_t ih = other_dri->lookup_instance_generic(key_data);
00105 if (ih != HANDLE_NIL) {
00106 GenericData other_data(other_meta, false);
00107 SampleInfo info;
00108 ReturnCode_t ret = other_dri->read_instance_generic(other_data.ptr_,
00109 info, ih, READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00110 if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00111 std::ostringstream rc_ss;
00112 rc_ss << ret;
00113 throw std::runtime_error("In join(), incoming DataReader for " +
00114 OPENDDS_STRING(other_topic) + " read_instance_generic, error #" +
00115 rc_ss.str());
00116 } else if (ret == DDS::RETCODE_OK) {
00117 resulting.push_back(prototype);
00118 resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00119 assign_fields(other_data.ptr_, resulting.back().sample_,
00120 other_qp, other_meta);
00121 }
00122 }
00123 } else {
00124 SampleVec new_resulting;
00125 ReturnCode_t ret = RETCODE_OK;
00126 for (InstanceHandle_t ih = HANDLE_NIL; ret != RETCODE_NO_DATA;) {
00127 GenericData other_data(other_meta, false);
00128 SampleInfo info;
00129 ret = other_dri->read_next_instance_generic(other_data.ptr_, info, ih,
00130 READ_SAMPLE_STATE, ANY_VIEW_STATE, ALIVE_INSTANCE_STATE);
00131 if (ret != RETCODE_OK && ret != RETCODE_NO_DATA) {
00132 std::ostringstream rc_ss;
00133 rc_ss << ret;
00134 throw std::runtime_error("In join(), incoming DataReader for " +
00135 OPENDDS_STRING(other_topic) + " read_next_instance_generic, error #" +
00136 rc_ss.str());
00137 } else if (ret == RETCODE_NO_DATA) {
00138 break;
00139 }
00140 ih = info.instance_handle;
00141
00142 bool match = true;
00143 for (size_t i = 0; match && i < key_names.size(); ++i) {
00144 if (!other_meta.compare(key_data, other_data.ptr_,
00145 key_names[i].c_str())) {
00146 match = false;
00147 }
00148 }
00149
00150 if (match) {
00151 resulting.push_back(prototype);
00152 resulting.back().combine(SampleWithInfo(other_topic.in(), info));
00153 assign_fields(other_data.ptr_, resulting.back().sample_,
00154 other_qp, other_meta);
00155 }
00156 }
00157 }
00158 }
00159
00160 template<typename Sample, typename TypedDataReader>
00161 void
00162 MultiTopicDataReader_T<Sample, TypedDataReader>::combine(
00163 SampleVec& resulting, const SampleVec& other,
00164 const std::vector<OPENDDS_STRING>& key_names, const TopicSet& other_topics)
00165 {
00166 const MetaStruct& meta = getResultingMeta();
00167 SampleVec newData;
00168 for (typename SampleVec::iterator iterRes = resulting.begin();
00169 iterRes != resulting.end(); ) {
00170 bool foundOneMatch = false;
00171 for (typename SampleVec::const_iterator iterOther = other.begin();
00172 iterOther != other.end(); ++iterOther) {
00173 bool match = true;
00174 for (size_t i = 0; match && i < key_names.size(); ++i) {
00175 if (!meta.compare(&*iterRes, &*iterOther, key_names[i].c_str())) {
00176 match = false;
00177 }
00178 }
00179 if (!match) {
00180 continue;
00181 }
00182 if (foundOneMatch) {
00183 newData.push_back(*iterRes);
00184 newData.back().combine(*iterOther);
00185 assign_resulting_fields(newData.back().sample_,
00186 iterOther->sample_, other_topics);
00187 } else {
00188 foundOneMatch = true;
00189 iterRes->combine(*iterOther);
00190 assign_resulting_fields(iterRes->sample_,
00191 iterOther->sample_, other_topics);
00192 }
00193 }
00194 if (foundOneMatch) {
00195 ++iterRes;
00196 } else {
00197
00198 iterRes = resulting.erase(iterRes);
00199 }
00200 }
00201 resulting.insert(resulting.end(), newData.begin(), newData.end());
00202 }
00203
00204 template<typename Sample, typename TypedDataReader>
00205 void
00206 MultiTopicDataReader_T<Sample, TypedDataReader>::process_joins(
00207 std::map<TopicSet, SampleVec>& partialResults, SampleVec starting,
00208 const TopicSet& seen, const QueryPlan& qp)
00209 {
00210 using namespace std;
00211 const MetaStruct& resulting_meta = getResultingMeta();
00212 const OPENDDS_STRING this_topic = topicNameFor(qp.data_reader_);
00213 typedef multimap<OPENDDS_STRING, OPENDDS_STRING>::const_iterator iter_t;
00214 for (iter_t iter = qp.adjacent_joins_.begin();
00215 iter != qp.adjacent_joins_.end();) {
00216 const OPENDDS_STRING& other_topic = iter->first;
00217 iter_t range_end = qp.adjacent_joins_.upper_bound(other_topic);
00218 const QueryPlan& other_qp = query_plans_[other_topic];
00219 DDS::DataReader_ptr other_dr = other_qp.data_reader_;
00220
00221 try {
00222 const MetaStruct& other_meta = metaStructFor(other_dr);
00223
00224 vector<OPENDDS_STRING> keys;
00225 for (; iter != range_end; ++iter) {
00226 keys.push_back(iter->second);
00227 }
00228
00229 typename std::map<TopicSet, SampleVec>::iterator found =
00230 find_if(partialResults.begin(), partialResults.end(),
00231 Contains(other_topic));
00232
00233 if (found == partialResults.end()) {
00234
00235 partialResults.erase(seen);
00236 TopicSet withJoin(seen);
00237 withJoin.insert(other_topic);
00238 SampleVec& join_result = partialResults[withJoin];
00239 for (size_t i = 0; i < starting.size(); ++i) {
00240 GenericData other_key(other_meta);
00241 for (size_t j = 0; j < keys.size(); ++j) {
00242 other_meta.assign(other_key.ptr_, keys[j].c_str(),
00243 &starting[i], keys[j].c_str(), resulting_meta);
00244 }
00245 join(join_result, starting[i], keys,
00246 other_key.ptr_, other_dr, other_meta);
00247 }
00248
00249 if (!join_result.empty() && !seen.count(other_topic)) {
00250
00251 process_joins(partialResults, join_result, withJoin, other_qp);
00252 }
00253
00254 } else if (!found->first.count(this_topic) ) {
00255
00256
00257 combine(starting, found->second, keys, found->first);
00258 TopicSet newKey(found->first);
00259 for (set<OPENDDS_STRING>::const_iterator i3 = found->first.begin();
00260 i3 != found->first.end(); ++i3) {
00261 newKey.insert(*i3);
00262 }
00263 partialResults.erase(found);
00264 partialResults[newKey] = starting;
00265
00266 }
00267 } catch (const std::runtime_error& e) {
00268 ACE_ERROR((LM_ERROR, "(%P|%t) MultiTopicDataReader_T::process_joins: "
00269 "%C", e.what()));
00270 }
00271 }
00272 }
00273
00274 template<typename Sample, typename TypedDataReader>
00275 void
00276 MultiTopicDataReader_T<Sample, TypedDataReader>::cross_join(
00277 std::map<TopicSet, SampleVec>& partialResults, const TopicSet& seen,
00278 const QueryPlan& qp)
00279 {
00280 using namespace std;
00281 try {
00282 const MetaStruct& other_meta = metaStructFor(qp.data_reader_);
00283 vector<OPENDDS_STRING> no_keys;
00284 for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00285 partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00286 SampleVec resulting;
00287 for (typename SampleVec::iterator i = iterPR->second.begin();
00288 i != iterPR->second.end(); ++i) {
00289 join(resulting, *i, no_keys, NULL, qp.data_reader_, other_meta);
00290 }
00291 resulting.swap(iterPR->second);
00292 }
00293 TopicSet withJoin(seen);
00294 withJoin.insert(topicNameFor(qp.data_reader_));
00295 partialResults[withJoin].swap(partialResults[seen]);
00296 partialResults.erase(seen);
00297 process_joins(partialResults, partialResults[withJoin], withJoin, qp);
00298 } catch (const std::runtime_error& e) {
00299 if (OpenDDS::DCPS::DCPS_debug_level) {
00300 ACE_ERROR((LM_ERROR, "(%P|%t) MultiTopicDataReader_T::cross_join: %C", e.what()));
00301 }
00302 }
00303 }
00304
00305 template<typename Sample, typename TypedDataReader>
00306 void
00307 MultiTopicDataReader_T<Sample, TypedDataReader>::incoming_sample(void* sample,
00308 const DDS::SampleInfo& info, const char* topic, const MetaStruct& meta)
00309 {
00310 using namespace std;
00311 using namespace DDS;
00312 const QueryPlan& qp = query_plans_[topic];
00313
00314
00315 std::map<TopicSet, SampleVec> partialResults;
00316 TopicSet seen;
00317 seen.insert(topic);
00318 partialResults[seen].push_back(SampleWithInfo(topic, info));
00319 assign_fields(sample, partialResults[seen].back().sample_, qp, meta);
00320
00321 process_joins(partialResults, partialResults[seen], seen, qp);
00322
00323
00324 for (std::map<OPENDDS_STRING, QueryPlan>::iterator iter = query_plans_.begin();
00325 iter != query_plans_.end(); ++iter) {
00326 typename std::map<TopicSet, SampleVec>::iterator found =
00327 find_if(partialResults.begin(), partialResults.end(),
00328 Contains(iter->first));
00329 if (found == partialResults.end()) {
00330 cross_join(partialResults, seen, iter->second);
00331 }
00332 }
00333
00334 TypedDataReader* tdr = dynamic_cast<TypedDataReader*>(typed_reader_.in());
00335
00336 if (!tdr) {
00337 ACE_ERROR((LM_ERROR,
00338 ACE_TEXT("(%P|%t) ERROR: ")
00339 ACE_TEXT("MultiTopicDataReader_T::incoming_sample, ")
00340 ACE_TEXT("Failed to get TypedDataReader.\n")));
00341 return;
00342 }
00343
00344 for (typename std::map<TopicSet, SampleVec>::iterator iterPR =
00345 partialResults.begin(); iterPR != partialResults.end(); ++iterPR) {
00346 for (typename SampleVec::iterator i = iterPR->second.begin();
00347 i != iterPR->second.end(); ++i) {
00348 InstanceHandle_t ih = tdr->store_synthetic_data(i->sample_, i->view_);
00349 if (ih != HANDLE_NIL) {
00350 typedef std::map<OPENDDS_STRING, InstanceHandle_t>::iterator mapiter_t;
00351 for (mapiter_t iterMap = i->info_.begin(); iterMap != i->info_.end();
00352 ++iterMap) {
00353 query_plans_[iterMap->first].instances_.insert(
00354 make_pair(iterMap->second, ih));
00355 }
00356 }
00357 }
00358 }
00359 }
00360
00361
00362
00363
00364 template<typename Sample, typename TypedDataReader>
00365 DDS::ReturnCode_t
00366 MultiTopicDataReader_T<Sample, TypedDataReader>::read(SampleSeq& received_data,
00367 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00368 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00369 DDS::InstanceStateMask instance_states)
00370 {
00371 return typed_reader_->read(received_data, info_seq, max_samples,
00372 sample_states, view_states, instance_states);
00373 }
00374
00375 template<typename Sample, typename TypedDataReader>
00376 DDS::ReturnCode_t
00377 MultiTopicDataReader_T<Sample, TypedDataReader>::take(SampleSeq& received_data,
00378 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00379 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00380 DDS::InstanceStateMask instance_states)
00381 {
00382 return typed_reader_->take(received_data, info_seq, max_samples,
00383 sample_states, view_states, instance_states);
00384 }
00385
00386 template<typename Sample, typename TypedDataReader>
00387 DDS::ReturnCode_t
00388 MultiTopicDataReader_T<Sample, TypedDataReader>::read_w_condition(
00389 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00390 CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00391 {
00392 return typed_reader_->read_w_condition(data_values, sample_infos,
00393 max_samples, a_condition);
00394 }
00395
00396 template<typename Sample, typename TypedDataReader>
00397 DDS::ReturnCode_t
00398 MultiTopicDataReader_T<Sample, TypedDataReader>::take_w_condition(
00399 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00400 CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
00401 {
00402 return typed_reader_->take_w_condition(data_values, sample_infos,
00403 max_samples, a_condition);
00404 }
00405
00406 template<typename Sample, typename TypedDataReader>
00407 DDS::ReturnCode_t
00408 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_sample(
00409 Sample& received_data, DDS::SampleInfo& sample_info)
00410 {
00411 return typed_reader_->read_next_sample(received_data, sample_info);
00412 }
00413
00414 template<typename Sample, typename TypedDataReader>
00415 DDS::ReturnCode_t
00416 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_sample(
00417 Sample& received_data, DDS::SampleInfo& sample_info)
00418 {
00419 return typed_reader_->take_next_sample(received_data, sample_info);
00420 }
00421
00422 template<typename Sample, typename TypedDataReader>
00423 DDS::ReturnCode_t
00424 MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance(
00425 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00426 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00427 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00428 DDS::InstanceStateMask instance_states)
00429 {
00430 return typed_reader_->read_instance(received_data, info_seq, max_samples,
00431 a_handle, sample_states, view_states, instance_states);
00432 }
00433
00434 template<typename Sample, typename TypedDataReader>
00435 DDS::ReturnCode_t
00436 MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance(
00437 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00438 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00439 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00440 DDS::InstanceStateMask instance_states)
00441 {
00442 return typed_reader_->take_instance(received_data, info_seq, max_samples,
00443 a_handle, sample_states, view_states, instance_states);
00444 }
00445
00446 template<typename Sample, typename TypedDataReader>
00447 DDS::ReturnCode_t
00448 MultiTopicDataReader_T<Sample, TypedDataReader>::read_instance_w_condition(
00449 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00450 CORBA::Long max_samples, DDS::InstanceHandle_t handle,
00451 DDS::ReadCondition_ptr a_condition)
00452 {
00453 return typed_reader_->read_instance_w_condition(data_values,
00454 sample_infos, max_samples, handle, a_condition);
00455 }
00456
00457 template<typename Sample, typename TypedDataReader>
00458 DDS::ReturnCode_t
00459 MultiTopicDataReader_T<Sample, TypedDataReader>::take_instance_w_condition(
00460 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00461 CORBA::Long max_samples, DDS::InstanceHandle_t handle,
00462 DDS::ReadCondition_ptr a_condition)
00463 {
00464 return typed_reader_->take_instance_w_condition(data_values,
00465 sample_infos, max_samples, handle, a_condition);
00466 }
00467
00468 template<typename Sample, typename TypedDataReader>
00469 DDS::ReturnCode_t
00470 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance(
00471 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00472 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00473 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00474 DDS::InstanceStateMask instance_states)
00475 {
00476 return typed_reader_->read_next_instance(received_data, info_seq, max_samples,
00477 a_handle, sample_states, view_states, instance_states);
00478 }
00479
00480 template<typename Sample, typename TypedDataReader>
00481 DDS::ReturnCode_t
00482 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance(
00483 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00484 CORBA::Long max_samples, DDS::InstanceHandle_t a_handle,
00485 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00486 DDS::InstanceStateMask instance_states)
00487 {
00488 return typed_reader_->take_next_instance(received_data, info_seq, max_samples,
00489 a_handle, sample_states, view_states, instance_states);
00490 }
00491
00492 template<typename Sample, typename TypedDataReader>
00493 DDS::ReturnCode_t
00494 MultiTopicDataReader_T<Sample, TypedDataReader>::read_next_instance_w_condition(
00495 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00496 CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00497 DDS::ReadCondition_ptr a_condition)
00498 {
00499 return typed_reader_->read_next_instance_w_condition(data_values,
00500 sample_infos, max_samples, previous_handle, a_condition);
00501 }
00502
00503 template<typename Sample, typename TypedDataReader>
00504 DDS::ReturnCode_t
00505 MultiTopicDataReader_T<Sample, TypedDataReader>::take_next_instance_w_condition(
00506 SampleSeq& data_values, DDS::SampleInfoSeq& sample_infos,
00507 CORBA::Long max_samples, DDS::InstanceHandle_t previous_handle,
00508 DDS::ReadCondition_ptr a_condition)
00509 {
00510 return typed_reader_->take_next_instance_w_condition(data_values,
00511 sample_infos, max_samples, previous_handle, a_condition);
00512 }
00513
00514 template<typename Sample, typename TypedDataReader>
00515 DDS::ReturnCode_t
00516 MultiTopicDataReader_T<Sample, TypedDataReader>::return_loan(
00517 SampleSeq& received_data, DDS::SampleInfoSeq& info_seq)
00518 {
00519 return typed_reader_->return_loan(received_data, info_seq);
00520 }
00521
00522 template<typename Sample, typename TypedDataReader>
00523 DDS::ReturnCode_t
00524 MultiTopicDataReader_T<Sample, TypedDataReader>::get_key_value(
00525 Sample& key_holder, DDS::InstanceHandle_t handle)
00526 {
00527 return typed_reader_->get_key_value(key_holder, handle);
00528 }
00529
00530 template<typename Sample, typename TypedDataReader>
00531 DDS::InstanceHandle_t
00532 MultiTopicDataReader_T<Sample, TypedDataReader>::lookup_instance(
00533 const Sample& instance_data)
00534 {
00535 return typed_reader_->lookup_instance(instance_data);
00536 }
00537
00538 }
00539 }
00540
00541 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00542
00543 #endif
00544 #endif