MultiTopicDataReaderBase.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #ifndef OPENDDS_NO_MULTI_TOPIC
00010 
00011 #include "MultiTopicDataReaderBase.h"
00012 #include "SubscriberImpl.h"
00013 #include "DomainParticipantImpl.h"
00014 #include "TypeSupportImpl.h"
00015 
00016 #include <stdexcept>
00017 
00018 namespace {
00019   struct MatchesIncomingName { // predicate for std::find_if()
00020     const OPENDDS_STRING& look_for_;
00021     explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
00022     bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs)
00023       const {
00024       return sfs.incoming_name_ == look_for_;
00025     }
00026   };
00027 }
00028 
00029 
00030 namespace OpenDDS {
00031 namespace DCPS {
00032 
00033 void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
00034   DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00035   SubscriberImpl* parent, MultiTopicImpl* multitopic)
00036 {
00037   using namespace std;
00038   DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
00039   resulting_reader_ = DataReaderEx::_narrow(dr);
00040   DataReaderImpl* resulting_impl =
00041     dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00042 
00043   resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
00044   resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
00045 
00046   DDS::DomainParticipant_var participant = parent->get_participant();
00047   resulting_impl->init(multitopic, dr_qos, a_listener, mask,
00048     dynamic_cast<DomainParticipantImpl*>(participant.in()), parent,
00049     resulting_reader_);
00050 
00051   init_typed(resulting_reader_);
00052   listener_ = new Listener(this);
00053 
00054   std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
00055 
00056   // key: name of field that's a key for the 'join'
00057   // mapped: set of topicNames that have this key in common
00058   std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
00059 
00060   const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
00061   for (size_t i = 0; i < selection.size(); ++i) {
00062 
00063     const DDS::Duration_t no_wait = {0, 0};
00064     DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
00065     if (!t.in()) {
00066       throw runtime_error("Topic: " + selection[i] + " not found.");
00067     }
00068 
00069     DDS::DataReader_var incoming =
00070       parent->create_datareader(t, dr_qos, listener_, ALL_STATUS_MASK);
00071     if (!incoming.in()) {
00072       throw runtime_error("Could not create incoming DataReader "
00073         + selection[i]);
00074     }
00075 
00076     QueryPlan& qp = query_plans_[selection[i]];
00077     qp.data_reader_ = incoming;
00078     const MetaStruct& meta = metaStructFor(incoming);
00079 
00080     for (const char** names = meta.getFieldNames(); *names; ++names) {
00081       if (fieldToTopic.count(*names)) { // already seen this field name
00082         set<OPENDDS_STRING>& topics = joinKeys[*names];
00083         topics.insert(fieldToTopic[*names]);
00084         topics.insert(selection[i]);
00085       } else {
00086         fieldToTopic[*names] = selection[i];
00087       }
00088     }
00089   }
00090 
00091   const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
00092   if (aggregation.size() == 0) { // "SELECT * FROM ..."
00093     const MetaStruct& meta = getResultingMeta();
00094     for (const char** names = meta.getFieldNames(); *names; ++names) {
00095       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00096         fieldToTopic.find(*names);
00097       if (found == fieldToTopic.end()) {
00098         if (DCPS_debug_level > 1) {
00099           ACE_DEBUG((LM_WARNING,
00100                      ACE_TEXT("(%P|%t) WARNING: ")
00101                      ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
00102                      ACE_TEXT("resulting field %C has no corresponding ")
00103                      ACE_TEXT("incoming field.\n"), *names));
00104         }
00105       } else {
00106         query_plans_[found->second].projection_.push_back(
00107           SubjectFieldSpec(*names));
00108       }
00109     }
00110   } else { // "SELECT A, B FROM ..."
00111     for (size_t i = 0; i < aggregation.size(); ++i) {
00112       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00113         fieldToTopic.find(aggregation[i].incoming_name_);
00114       if (found == fieldToTopic.end()) {
00115         throw std::runtime_error("Projected field " +
00116           aggregation[i].incoming_name_ + " has no incoming field.");
00117       } else {
00118         query_plans_[found->second].projection_.push_back(aggregation[i]);
00119       }
00120     }
00121   }
00122 
00123   typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
00124   for (iter_t iter = joinKeys.begin(); iter != joinKeys.end(); ++iter) {
00125     const OPENDDS_STRING& field = iter->first;
00126     const set<OPENDDS_STRING>& topics = iter->second;
00127     for (set<OPENDDS_STRING>::const_iterator iter2 = topics.begin();
00128          iter2 != topics.end(); ++iter2) {
00129       const OPENDDS_STRING& topic = *iter2;
00130       QueryPlan& qp = query_plans_[topic];
00131       if (find_if(qp.projection_.begin(), qp.projection_.end(),
00132                   MatchesIncomingName(field)) == qp.projection_.end()) {
00133         qp.keys_projected_out_.push_back(field);
00134       }
00135       for (set<OPENDDS_STRING>::const_iterator iter3 = topics.begin();
00136            iter3 != topics.end(); ++iter3) {
00137         if (topic != *iter3) { // other topics
00138           qp.adjacent_joins_.insert(pair<const OPENDDS_STRING, OPENDDS_STRING>(*iter3, field));
00139         }
00140       }
00141     }
00142   }
00143 }
00144 
00145 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
00146 {
00147   DDS::TopicDescription_var td = reader->get_topicdescription();
00148   CORBA::String_var topic = td->get_name();
00149   return topic.in();
00150 }
00151 
00152 const MetaStruct&
00153 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
00154 {
00155   DDS::TopicDescription_var td = reader->get_topicdescription();
00156   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
00157   TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
00158   return ts->getMetaStructForType();
00159 }
00160 
00161 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
00162 {
00163   using namespace std;
00164   using namespace DDS;
00165 
00166   const OPENDDS_STRING topic = topicNameFor(reader);
00167   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
00168   DataReaderImpl::GenericBundle gen;
00169   ReturnCode_t rc = dri->read_generic(gen, NOT_READ_SAMPLE_STATE,
00170                                       ANY_VIEW_STATE, ANY_INSTANCE_STATE,false);
00171   if (rc == RETCODE_NO_DATA) {
00172     return;
00173   } else if (rc != RETCODE_OK) {
00174     throw runtime_error("Incoming DataReader for " + topic +
00175       " could not be read, error #" + to_dds_string(rc));
00176   }
00177 
00178   const MetaStruct& meta = metaStructFor(reader);
00179   const QueryPlan& qp = query_plans_[topic];
00180   for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
00181     if (gen.info_[i].valid_data) {
00182       incoming_sample(gen.samples_[i], gen.info_[i], topic.c_str(), meta);
00183     } else if (gen.info_[i].instance_state != ALIVE_INSTANCE_STATE) {
00184       DataReaderImpl* resulting_impl =
00185         dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00186       set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator
00187         iter = qp.instances_.begin();
00188       while (iter != qp.instances_.end() &&
00189              iter->first != gen.info_[i].instance_handle) ++iter;
00190       for (; iter != qp.instances_.end() &&
00191            iter->first == gen.info_[i].instance_handle; ++iter) {
00192         resulting_impl->set_instance_state(iter->second,
00193                                            gen.info_[i].instance_state);
00194       }
00195     }
00196   }
00197 }
00198 
00199 void MultiTopicDataReaderBase::Listener::on_requested_deadline_missed(
00200   DDS::DataReader_ptr, const DDS::RequestedDeadlineMissedStatus&)
00201 {
00202 }
00203 
00204 void MultiTopicDataReaderBase::Listener::on_requested_incompatible_qos(
00205   DDS::DataReader_ptr, const DDS::RequestedIncompatibleQosStatus&)
00206 {
00207 }
00208 
00209 void MultiTopicDataReaderBase::Listener::on_sample_rejected(
00210   DDS::DataReader_ptr, const DDS::SampleRejectedStatus&)
00211 {
00212 }
00213 
00214 void MultiTopicDataReaderBase::Listener::on_liveliness_changed(
00215   DDS::DataReader_ptr, const DDS::LivelinessChangedStatus&)
00216 {
00217 }
00218 
00219 void MultiTopicDataReaderBase::Listener::on_data_available(
00220   DDS::DataReader_ptr reader)
00221 {
00222   try {
00223     outer_->data_available(reader);
00224   } catch (std::exception& e) {
00225     if (DCPS_debug_level) {
00226       ACE_DEBUG((LM_ERROR, "(%P|%t) MultiTopicDataReaderBase::Listener::"
00227                  "on_data_available(): %C", e.what()));
00228     }
00229   }
00230 }
00231 
00232 void MultiTopicDataReaderBase::Listener::on_subscription_matched(
00233   DDS::DataReader_ptr, const DDS::SubscriptionMatchedStatus&)
00234 {
00235 }
00236 
00237 void MultiTopicDataReaderBase::Listener::on_sample_lost(DDS::DataReader_ptr,
00238   const DDS::SampleLostStatus&)
00239 {
00240 }
00241 
00242 void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
00243   bool flag)
00244 {
00245   dynamic_cast<DataReaderImpl*>(resulting_reader_.in())
00246     ->set_status_changed_flag(status, flag);
00247 }
00248 
00249 bool MultiTopicDataReaderBase::have_sample_states(
00250   DDS::SampleStateMask sample_states) const
00251 {
00252   return dynamic_cast<DataReaderImpl*>(resulting_reader_.in())
00253     ->have_sample_states(sample_states);
00254 }
00255 
00256 void MultiTopicDataReaderBase::cleanup()
00257 {
00258   DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
00259   for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
00260        it != query_plans_.end(); ++it) {
00261     sub->delete_datareader(it->second.data_reader_);
00262   }
00263   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00264   SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
00265   si->remove_from_datareader_set(dri);
00266   dri->cleanup();
00267 }
00268 
00269 DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
00270 {
00271   return resulting_reader_->get_instance_handle();
00272 }
00273 
00274 DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
00275 {
00276   return resulting_reader_->enable();
00277 }
00278 
00279 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
00280 {
00281   return resulting_reader_->get_statuscondition();
00282 }
00283 
00284 DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
00285 {
00286   return resulting_reader_->get_status_changes();
00287 }
00288 
00289 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
00290   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00291   DDS::InstanceStateMask instance_states)
00292 {
00293   return resulting_reader_->create_readcondition(sample_states, view_states,
00294     instance_states);
00295 }
00296 
00297 #ifndef OPENDDS_NO_QUERY_CONDITION
00298 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
00299   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00300   DDS::InstanceStateMask instance_states, const char* query_expression,
00301   const DDS::StringSeq& query_parameters)
00302 {
00303   return resulting_reader_->create_querycondition(sample_states, view_states,
00304     instance_states, query_expression, query_parameters);
00305 }
00306 #endif
00307 
00308 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
00309   DDS::ReadCondition_ptr a_condition)
00310 {
00311   return resulting_reader_->delete_readcondition(a_condition);
00312 }
00313 
00314 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
00315 {
00316   return resulting_reader_->delete_contained_entities();
00317 }
00318 
00319 DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
00320   const DDS::DataReaderQos& qos)
00321 {
00322   return resulting_reader_->set_qos(qos);
00323 }
00324 
00325 DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
00326 {
00327   return resulting_reader_->get_qos(qos);
00328 }
00329 
00330 DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
00331   DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
00332 {
00333   return resulting_reader_->set_listener(a_listener, mask);
00334 }
00335 
00336 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
00337 {
00338   return resulting_reader_->get_listener();
00339 }
00340 
00341 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
00342 {
00343   return resulting_reader_->get_topicdescription();
00344 }
00345 
00346 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
00347 {
00348   return resulting_reader_->get_subscriber();
00349 }
00350 
00351 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
00352   DDS::SampleRejectedStatus& status)
00353 {
00354   return resulting_reader_->get_sample_rejected_status(status);
00355 }
00356 
00357 DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
00358   DDS::LivelinessChangedStatus& status)
00359 {
00360   return resulting_reader_->get_liveliness_changed_status(status);
00361 }
00362 
00363 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
00364   DDS::RequestedDeadlineMissedStatus& status)
00365 {
00366   return resulting_reader_->get_requested_deadline_missed_status(status);
00367 }
00368 
00369 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
00370   DDS::RequestedIncompatibleQosStatus& status)
00371 {
00372   return resulting_reader_->get_requested_incompatible_qos_status(status);
00373 }
00374 
00375 DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
00376   DDS::SubscriptionMatchedStatus& status)
00377 {
00378   return resulting_reader_->get_subscription_matched_status(status);
00379 }
00380 
00381 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
00382   DDS::SampleLostStatus& status)
00383 {
00384   return resulting_reader_->get_sample_lost_status(status);
00385 }
00386 
00387 DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
00388   const DDS::Duration_t& max_wait)
00389 {
00390   return resulting_reader_->wait_for_historical_data(max_wait);
00391 }
00392 
00393 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
00394   DDS::InstanceHandleSeq& publication_handles)
00395 {
00396   return resulting_reader_->get_matched_publications(publication_handles);
00397 }
00398 
00399 #ifndef DDS_HAS_MINIMUM_BIT
00400 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
00401   DDS::PublicationBuiltinTopicData& publication_data,
00402   DDS::InstanceHandle_t publication_handle)
00403 {
00404   return resulting_reader_->get_matched_publication_data(publication_data,
00405     publication_handle);
00406 }
00407 #endif
00408 
00409 void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
00410 {
00411   resulting_reader_->get_latency_stats(stats);
00412 }
00413 
00414 void MultiTopicDataReaderBase::reset_latency_stats()
00415 {
00416   resulting_reader_->reset_latency_stats();
00417 }
00418 
00419 CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
00420 {
00421   return resulting_reader_->statistics_enabled();
00422 }
00423 
00424 void MultiTopicDataReaderBase::statistics_enabled(
00425   CORBA::Boolean statistics_enabled)
00426 {
00427   resulting_reader_->statistics_enabled(statistics_enabled);
00428 }
00429 
00430 }
00431 }
00432 
00433 #endif

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7