00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #ifndef OPENDDS_NO_MULTI_TOPIC
00010
00011 #include "MultiTopicDataReaderBase.h"
00012 #include "SubscriberImpl.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "TypeSupportImpl.h"
00015
00016 #include <stdexcept>
00017
00018 namespace {
00019 struct MatchesIncomingName {
00020 const OPENDDS_STRING& look_for_;
00021 explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
00022 bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs)
00023 const {
00024 return sfs.incoming_name_ == look_for_;
00025 }
00026 };
00027 }
00028
00029
00030 namespace OpenDDS {
00031 namespace DCPS {
00032
00033 void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
00034 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00035 SubscriberImpl* parent, MultiTopicImpl* multitopic)
00036 {
00037 using namespace std;
00038 DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
00039 resulting_reader_ = DataReaderEx::_narrow(dr);
00040 DataReaderImpl* resulting_impl =
00041 dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00042
00043 resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
00044 resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
00045
00046 DDS::DomainParticipant_var participant = parent->get_participant();
00047 resulting_impl->init(multitopic, dr_qos, a_listener, mask,
00048 dynamic_cast<DomainParticipantImpl*>(participant.in()), parent,
00049 resulting_reader_);
00050
00051 init_typed(resulting_reader_);
00052 listener_ = new Listener(this);
00053
00054 std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
00055
00056
00057
00058 std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
00059
00060 const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
00061 for (size_t i = 0; i < selection.size(); ++i) {
00062
00063 const DDS::Duration_t no_wait = {0, 0};
00064 DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
00065 if (!t.in()) {
00066 throw runtime_error("Topic: " + selection[i] + " not found.");
00067 }
00068
00069 DDS::DataReader_var incoming =
00070 parent->create_datareader(t, dr_qos, listener_, ALL_STATUS_MASK);
00071 if (!incoming.in()) {
00072 throw runtime_error("Could not create incoming DataReader "
00073 + selection[i]);
00074 }
00075
00076 QueryPlan& qp = query_plans_[selection[i]];
00077 qp.data_reader_ = incoming;
00078 const MetaStruct& meta = metaStructFor(incoming);
00079
00080 for (const char** names = meta.getFieldNames(); *names; ++names) {
00081 if (fieldToTopic.count(*names)) {
00082 set<OPENDDS_STRING>& topics = joinKeys[*names];
00083 topics.insert(fieldToTopic[*names]);
00084 topics.insert(selection[i]);
00085 } else {
00086 fieldToTopic[*names] = selection[i];
00087 }
00088 }
00089 }
00090
00091 const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
00092 if (aggregation.size() == 0) {
00093 const MetaStruct& meta = getResultingMeta();
00094 for (const char** names = meta.getFieldNames(); *names; ++names) {
00095 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00096 fieldToTopic.find(*names);
00097 if (found == fieldToTopic.end()) {
00098 if (DCPS_debug_level > 1) {
00099 ACE_DEBUG((LM_WARNING,
00100 ACE_TEXT("(%P|%t) WARNING: ")
00101 ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
00102 ACE_TEXT("resulting field %C has no corresponding ")
00103 ACE_TEXT("incoming field.\n"), *names));
00104 }
00105 } else {
00106 query_plans_[found->second].projection_.push_back(
00107 SubjectFieldSpec(*names));
00108 }
00109 }
00110 } else {
00111 for (size_t i = 0; i < aggregation.size(); ++i) {
00112 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00113 fieldToTopic.find(aggregation[i].incoming_name_);
00114 if (found == fieldToTopic.end()) {
00115 throw std::runtime_error("Projected field " +
00116 aggregation[i].incoming_name_ + " has no incoming field.");
00117 } else {
00118 query_plans_[found->second].projection_.push_back(aggregation[i]);
00119 }
00120 }
00121 }
00122
00123 typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
00124 for (iter_t iter = joinKeys.begin(); iter != joinKeys.end(); ++iter) {
00125 const OPENDDS_STRING& field = iter->first;
00126 const set<OPENDDS_STRING>& topics = iter->second;
00127 for (set<OPENDDS_STRING>::const_iterator iter2 = topics.begin();
00128 iter2 != topics.end(); ++iter2) {
00129 const OPENDDS_STRING& topic = *iter2;
00130 QueryPlan& qp = query_plans_[topic];
00131 if (find_if(qp.projection_.begin(), qp.projection_.end(),
00132 MatchesIncomingName(field)) == qp.projection_.end()) {
00133 qp.keys_projected_out_.push_back(field);
00134 }
00135 for (set<OPENDDS_STRING>::const_iterator iter3 = topics.begin();
00136 iter3 != topics.end(); ++iter3) {
00137 if (topic != *iter3) {
00138 qp.adjacent_joins_.insert(pair<const OPENDDS_STRING, OPENDDS_STRING>(*iter3, field));
00139 }
00140 }
00141 }
00142 }
00143 }
00144
00145 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
00146 {
00147 DDS::TopicDescription_var td = reader->get_topicdescription();
00148 CORBA::String_var topic = td->get_name();
00149 return topic.in();
00150 }
00151
00152 const MetaStruct&
00153 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
00154 {
00155 DDS::TopicDescription_var td = reader->get_topicdescription();
00156 TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
00157 TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
00158 return ts->getMetaStructForType();
00159 }
00160
00161 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
00162 {
00163 using namespace std;
00164 using namespace DDS;
00165
00166 const OPENDDS_STRING topic = topicNameFor(reader);
00167 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
00168 DataReaderImpl::GenericBundle gen;
00169 ReturnCode_t rc = dri->read_generic(gen, NOT_READ_SAMPLE_STATE,
00170 ANY_VIEW_STATE, ANY_INSTANCE_STATE,false);
00171 if (rc == RETCODE_NO_DATA) {
00172 return;
00173 } else if (rc != RETCODE_OK) {
00174 throw runtime_error("Incoming DataReader for " + topic +
00175 " could not be read, error #" + to_dds_string(rc));
00176 }
00177
00178 const MetaStruct& meta = metaStructFor(reader);
00179 const QueryPlan& qp = query_plans_[topic];
00180 for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
00181 if (gen.info_[i].valid_data) {
00182 incoming_sample(gen.samples_[i], gen.info_[i], topic.c_str(), meta);
00183 } else if (gen.info_[i].instance_state != ALIVE_INSTANCE_STATE) {
00184 DataReaderImpl* resulting_impl =
00185 dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00186 set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator
00187 iter = qp.instances_.begin();
00188 while (iter != qp.instances_.end() &&
00189 iter->first != gen.info_[i].instance_handle) ++iter;
00190 for (; iter != qp.instances_.end() &&
00191 iter->first == gen.info_[i].instance_handle; ++iter) {
00192 resulting_impl->set_instance_state(iter->second,
00193 gen.info_[i].instance_state);
00194 }
00195 }
00196 }
00197 }
00198
00199 void MultiTopicDataReaderBase::Listener::on_requested_deadline_missed(
00200 DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus&)
00201 {
00202 }
00203
00204 void MultiTopicDataReaderBase::Listener::on_requested_incompatible_qos(
00205 DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus&)
00206 {
00207 }
00208
00209 void MultiTopicDataReaderBase::Listener::on_sample_rejected(
00210 DDS::DataReader_ptr, const DDS::SampleRejectedStatus&)
00211 {
00212 }
00213
00214 void MultiTopicDataReaderBase::Listener::on_liveliness_changed(
00215 DDS::DataReader_ptr, const DDS::LivelinessChangedStatus&)
00216 {
00217 }
00218
00219 void MultiTopicDataReaderBase::Listener::on_data_available(
00220 DDS::DataReader_ptr reader)
00221 {
00222 try {
00223 outer_->data_available(reader);
00224 } catch (std::exception& e) {
00225 if (DCPS_debug_level) {
00226 ACE_DEBUG((LM_ERROR, "(%P|%t) MultiTopicDataReaderBase::Listener::"
00227 "on_data_available(): %C", e.what()));
00228 }
00229 }
00230 }
00231
00232 void MultiTopicDataReaderBase::Listener::on_subscription_matched(
00233 DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus&)
00234 {
00235 }
00236
00237 void MultiTopicDataReaderBase::Listener::on_sample_lost(DDS::DataReader_ptr,
00238 const DDS::SampleLostStatus&)
00239 {
00240 }
00241
00242 void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
00243 bool flag)
00244 {
00245 dynamic_cast<DataReaderImpl*>(resulting_reader_.in())
00246 ->set_status_changed_flag(status, flag);
00247 }
00248
00249 bool MultiTopicDataReaderBase::have_sample_states(
00250 DDS::SampleStateMask sample_states) const
00251 {
00252 return dynamic_cast<DataReaderImpl*>(resulting_reader_.in())
00253 ->have_sample_states(sample_states);
00254 }
00255
00256 void MultiTopicDataReaderBase::cleanup()
00257 {
00258 DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
00259 for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
00260 it != query_plans_.end(); ++it) {
00261 sub->delete_datareader(it->second.data_reader_);
00262 }
00263 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00264 SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
00265 si->remove_from_datareader_set(dri);
00266 dri->cleanup();
00267 }
00268
00269 DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
00270 {
00271 return resulting_reader_->get_instance_handle();
00272 }
00273
00274 DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
00275 {
00276 return resulting_reader_->enable();
00277 }
00278
00279 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
00280 {
00281 return resulting_reader_->get_statuscondition();
00282 }
00283
00284 DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
00285 {
00286 return resulting_reader_->get_status_changes();
00287 }
00288
00289 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
00290 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00291 DDS::InstanceStateMask instance_states)
00292 {
00293 return resulting_reader_->create_readcondition(sample_states, view_states,
00294 instance_states);
00295 }
00296
00297 #ifndef OPENDDS_NO_QUERY_CONDITION
00298 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
00299 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00300 DDS::InstanceStateMask instance_states, const char* query_expression,
00301 const DDS::StringSeq& query_parameters)
00302 {
00303 return resulting_reader_->create_querycondition(sample_states, view_states,
00304 instance_states, query_expression, query_parameters);
00305 }
00306 #endif
00307
00308 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
00309 DDS::ReadCondition_ptr a_condition)
00310 {
00311 return resulting_reader_->delete_readcondition(a_condition);
00312 }
00313
00314 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
00315 {
00316 return resulting_reader_->delete_contained_entities();
00317 }
00318
00319 DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
00320 const DDS::DataReaderQos& qos)
00321 {
00322 return resulting_reader_->set_qos(qos);
00323 }
00324
00325 DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
00326 {
00327 return resulting_reader_->get_qos(qos);
00328 }
00329
00330 DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
00331 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
00332 {
00333 return resulting_reader_->set_listener(a_listener, mask);
00334 }
00335
00336 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
00337 {
00338 return resulting_reader_->get_listener();
00339 }
00340
00341 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
00342 {
00343 return resulting_reader_->get_topicdescription();
00344 }
00345
00346 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
00347 {
00348 return resulting_reader_->get_subscriber();
00349 }
00350
00351 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
00352 DDS::SampleRejectedStatus& status)
00353 {
00354 return resulting_reader_->get_sample_rejected_status(status);
00355 }
00356
00357 DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
00358 DDS::LivelinessChangedStatus& status)
00359 {
00360 return resulting_reader_->get_liveliness_changed_status(status);
00361 }
00362
00363 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
00364 DDS::RequestedDeadlineMissedStatus& status)
00365 {
00366 return resulting_reader_->get_requested_deadline_missed_status(status);
00367 }
00368
00369 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
00370 DDS::RequestedIncompatibleQosStatus& status)
00371 {
00372 return resulting_reader_->get_requested_incompatible_qos_status(status);
00373 }
00374
00375 DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
00376 DDS::SubscriptionMatchedStatus& status)
00377 {
00378 return resulting_reader_->get_subscription_matched_status(status);
00379 }
00380
00381 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
00382 DDS::SampleLostStatus& status)
00383 {
00384 return resulting_reader_->get_sample_lost_status(status);
00385 }
00386
00387 DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
00388 const DDS::Duration_t& max_wait)
00389 {
00390 return resulting_reader_->wait_for_historical_data(max_wait);
00391 }
00392
00393 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
00394 DDS::InstanceHandleSeq& publication_handles)
00395 {
00396 return resulting_reader_->get_matched_publications(publication_handles);
00397 }
00398
00399 #ifndef DDS_HAS_MINIMUM_BIT
00400 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
00401 DDS::PublicationBuiltinTopicData& publication_data,
00402 DDS::InstanceHandle_t publication_handle)
00403 {
00404 return resulting_reader_->get_matched_publication_data(publication_data,
00405 publication_handle);
00406 }
00407 #endif
00408
00409 void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
00410 {
00411 resulting_reader_->get_latency_stats(stats);
00412 }
00413
00414 void MultiTopicDataReaderBase::reset_latency_stats()
00415 {
00416 resulting_reader_->reset_latency_stats();
00417 }
00418
00419 CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
00420 {
00421 return resulting_reader_->statistics_enabled();
00422 }
00423
00424 void MultiTopicDataReaderBase::statistics_enabled(
00425 CORBA::Boolean statistics_enabled)
00426 {
00427 resulting_reader_->statistics_enabled(statistics_enabled);
00428 }
00429
00430 }
00431 }
00432
00433 #endif