MultiTopicDataReaderBase.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #ifndef OPENDDS_NO_MULTI_TOPIC
00010 
00011 #include "MultiTopicDataReaderBase.h"
00012 
00013 #include "DomainParticipantImpl.h"
00014 #include "Marked_Default_Qos.h"
00015 #include "SubscriberImpl.h"
00016 #include "TypeSupportImpl.h"
00017 
00018 #include <stdexcept>
00019 
00020 namespace {
00021   struct MatchesIncomingName { // predicate for std::find_if()
00022     const OPENDDS_STRING& look_for_;
00023     explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
00024     bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs)
00025       const {
00026       return sfs.incoming_name_ == look_for_;
00027     }
00028   };
00029 
00030   class Listener
00031     : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
00032   public:
00033     explicit Listener(OpenDDS::DCPS::MultiTopicDataReaderBase* outer)
00034       : outer_(outer)
00035     {}
00036 
00037     void on_requested_deadline_missed(DDS::DataReader_ptr /*reader*/,
00038       const DDS::RequestedDeadlineMissedStatus& /*status*/){}
00039 
00040     void on_requested_incompatible_qos(DDS::DataReader_ptr /*reader*/,
00041       const DDS::RequestedIncompatibleQosStatus& /*status*/){}
00042 
00043     void on_sample_rejected(DDS::DataReader_ptr /*reader*/,
00044       const DDS::SampleRejectedStatus& /*status*/){}
00045 
00046     void on_liveliness_changed(DDS::DataReader_ptr /*reader*/,
00047       const DDS::LivelinessChangedStatus& /*status*/){}
00048 
00049     void on_data_available(DDS::DataReader_ptr reader){
00050       try {
00051         outer_->data_available(reader);
00052       } catch (std::exception& e) {
00053         if (OpenDDS::DCPS::DCPS_debug_level) {
00054           ACE_ERROR((LM_ERROR, "(%P|%t) MultiTopicDataReaderBase::Listener::"
00055                      "on_data_available(): %C", e.what()));
00056         }
00057       }
00058     }
00059 
00060     void on_subscription_matched(DDS::DataReader_ptr /*reader*/,
00061       const DDS::SubscriptionMatchedStatus& /*status*/){}
00062 
00063     void on_sample_lost(DDS::DataReader_ptr /*reader*/,
00064       const DDS::SampleLostStatus& /*status*/){}
00065 
00066     /// Increment the reference count.
00067     virtual void _add_ref (void){
00068       outer_->_add_ref();
00069     }
00070 
00071 
00072     /// Decrement the reference count.
00073     virtual void _remove_ref (void){
00074       outer_->_remove_ref();
00075     }
00076 
00077 
00078     /// Get the refcount
00079     virtual CORBA::ULong _refcount_value (void) const{
00080       return outer_->_refcount_value();
00081     }
00082 
00083   private:
00084     OpenDDS::DCPS::MultiTopicDataReaderBase* outer_;
00085   };
00086 }
00087 
00088 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00089 
00090 namespace OpenDDS {
00091 namespace DCPS {
00092 
00093 void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
00094   DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00095   SubscriberImpl* parent, MultiTopicImpl* multitopic)
00096 {
00097   using namespace std;
00098   DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
00099   resulting_reader_ = DataReaderEx::_narrow(dr);
00100   DataReaderImpl* resulting_impl =
00101     dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00102 
00103   if (!resulting_impl) {
00104     ACE_ERROR((LM_ERROR,
00105       ACE_TEXT("(%P|%t) ERROR: ")
00106       ACE_TEXT("MultiTopicDataReaderBase::init, ")
00107       ACE_TEXT("Failed to get DataReaderImpl.\n")));
00108     return;
00109   }
00110 
00111   resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
00112   resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
00113 
00114   DDS::DomainParticipant_var participant = parent->get_participant();
00115   DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
00116   if (!dpi) {
00117     ACE_ERROR((LM_ERROR,
00118       ACE_TEXT("(%P|%t) ERROR: ")
00119       ACE_TEXT("MultiTopicDataReaderBase::init, ")
00120       ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
00121     return;
00122   }
00123   resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
00124 
00125   init_typed(resulting_reader_);
00126 
00127   std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
00128 
00129   // key: name of field that's a key for the 'join'
00130   // mapped: set of topicNames that have this key in common
00131   std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
00132 
00133   listener_.reset(new Listener(this));
00134 
00135   const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
00136   for (size_t i = 0; i < selection.size(); ++i) {
00137 
00138     const DDS::Duration_t no_wait = {0, 0};
00139     DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
00140     if (!t.in()) {
00141       throw runtime_error("Topic: " + selection[i] + " not found.");
00142     }
00143 
00144 
00145     DDS::DataReader_var incoming =
00146       parent->create_datareader(t, DATAREADER_QOS_USE_TOPIC_QOS,
00147                                 listener_.get(), ALL_STATUS_MASK);
00148     if (!incoming.in()) {
00149       throw runtime_error("Could not create incoming DataReader "
00150         + selection[i]);
00151     }
00152 
00153     QueryPlan& qp = query_plans_[selection[i]];
00154     qp.data_reader_ = incoming;
00155     try {
00156       const MetaStruct& meta = metaStructFor(incoming);
00157 
00158       for (const char** names = meta.getFieldNames(); *names; ++names) {
00159         if (fieldToTopic.count(*names)) { // already seen this field name
00160           set<OPENDDS_STRING>& topics = joinKeys[*names];
00161           topics.insert(fieldToTopic[*names]);
00162           topics.insert(selection[i]);
00163         } else {
00164           fieldToTopic[*names] = selection[i];
00165         }
00166       }
00167     } catch (const std::runtime_error& e) {
00168         ACE_ERROR((LM_ERROR,
00169           ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::init: %C"), e.what()));
00170         throw std::runtime_error("Failed to obtain metastruct for incoming.");
00171     }
00172   }
00173 
00174   const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
00175   if (aggregation.size() == 0) { // "SELECT * FROM ..."
00176     const MetaStruct& meta = getResultingMeta();
00177     for (const char** names = meta.getFieldNames(); *names; ++names) {
00178       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00179         fieldToTopic.find(*names);
00180       if (found == fieldToTopic.end()) {
00181         if (DCPS_debug_level > 1) {
00182           ACE_DEBUG((LM_WARNING,
00183                      ACE_TEXT("(%P|%t) WARNING: ")
00184                      ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
00185                      ACE_TEXT("resulting field %C has no corresponding ")
00186                      ACE_TEXT("incoming field.\n"), *names));
00187         }
00188       } else {
00189         query_plans_[found->second].projection_.push_back(
00190           SubjectFieldSpec(*names));
00191       }
00192     }
00193   } else { // "SELECT A, B FROM ..."
00194     for (size_t i = 0; i < aggregation.size(); ++i) {
00195       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00196         fieldToTopic.find(aggregation[i].incoming_name_);
00197       if (found == fieldToTopic.end()) {
00198         throw std::runtime_error("Projected field " +
00199           aggregation[i].incoming_name_ + " has no incoming field.");
00200       } else {
00201         query_plans_[found->second].projection_.push_back(aggregation[i]);
00202       }
00203     }
00204   }
00205 
00206   typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
00207   for (iter_t iter = joinKeys.begin(); iter != joinKeys.end(); ++iter) {
00208     const OPENDDS_STRING& field = iter->first;
00209     const set<OPENDDS_STRING>& topics = iter->second;
00210     for (set<OPENDDS_STRING>::const_iterator iter2 = topics.begin();
00211          iter2 != topics.end(); ++iter2) {
00212       const OPENDDS_STRING& topic = *iter2;
00213       QueryPlan& qp = query_plans_[topic];
00214       if (find_if(qp.projection_.begin(), qp.projection_.end(),
00215                   MatchesIncomingName(field)) == qp.projection_.end()) {
00216         qp.keys_projected_out_.push_back(field);
00217       }
00218       for (set<OPENDDS_STRING>::const_iterator iter3 = topics.begin();
00219            iter3 != topics.end(); ++iter3) {
00220         if (topic != *iter3) { // other topics
00221           qp.adjacent_joins_.insert(pair<const OPENDDS_STRING, OPENDDS_STRING>(*iter3, field));
00222         }
00223       }
00224     }
00225   }
00226 }
00227 
00228 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
00229 {
00230   DDS::TopicDescription_var td = reader->get_topicdescription();
00231   CORBA::String_var topic = td->get_name();
00232   return topic.in();
00233 }
00234 
00235 const MetaStruct&
00236 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
00237 {
00238   DDS::TopicDescription_var td = reader->get_topicdescription();
00239   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
00240   if (tdi) {
00241     TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
00242     if (ts) {
00243       return ts->getMetaStructForType();
00244     }
00245   }
00246   throw std::runtime_error("Failed to obtain type support for incoming DataReader");
00247 }
00248 
00249 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
00250 {
00251   using namespace std;
00252   using namespace DDS;
00253 
00254   const OPENDDS_STRING topic = topicNameFor(reader);
00255   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
00256   if (!dri) {
00257     throw runtime_error("Incoming DataReader for " + topic +
00258       " could not be cast to DataReaderImpl.");
00259   }
00260   DataReaderImpl::GenericBundle gen;
00261   ReturnCode_t rc = dri->read_generic(gen, NOT_READ_SAMPLE_STATE,
00262                                       ANY_VIEW_STATE, ANY_INSTANCE_STATE,false);
00263   if (rc == RETCODE_NO_DATA) {
00264     return;
00265   } else if (rc != RETCODE_OK) {
00266     throw runtime_error("Incoming DataReader for " + topic +
00267       " could not be read, error #" + to_dds_string(rc));
00268   }
00269   try {
00270     const MetaStruct& meta = metaStructFor(reader);
00271     const QueryPlan& qp = query_plans_[topic];
00272     for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
00273       if (gen.info_[i].valid_data) {
00274         incoming_sample(gen.samples_[i], gen.info_[i], topic.c_str(), meta);
00275       } else if (gen.info_[i].instance_state != ALIVE_INSTANCE_STATE) {
00276         DataReaderImpl* resulting_impl =
00277           dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00278 
00279         if (resulting_impl) {
00280           set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator
00281             iter = qp.instances_.begin();
00282           while (iter != qp.instances_.end() &&
00283             iter->first != gen.info_[i].instance_handle) ++iter;
00284           for (; iter != qp.instances_.end() &&
00285             iter->first == gen.info_[i].instance_handle; ++iter) {
00286             resulting_impl->set_instance_state(iter->second,
00287               gen.info_[i].instance_state);
00288           }
00289         } else {
00290           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available:")
00291             ACE_TEXT(" failed to obtain DataReaderImpl.")));
00292         }
00293       }
00294     }
00295   } catch (const std::runtime_error& e) {
00296     if (OpenDDS::DCPS::DCPS_debug_level) {
00297       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available: %C"), e.what()));
00298     }
00299   }
00300 }
00301 
00302 void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
00303   bool flag)
00304 {
00305   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00306   if (dri) {
00307     dri->set_status_changed_flag(status, flag);
00308   }
00309 }
00310 
00311 bool MultiTopicDataReaderBase::have_sample_states(
00312   DDS::SampleStateMask sample_states) const
00313 {
00314   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00315   if (dri) {
00316     return dri->have_sample_states(sample_states);
00317   } else {
00318     return false;
00319   }
00320 }
00321 
00322 void MultiTopicDataReaderBase::cleanup()
00323 {
00324   DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
00325   for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
00326        it != query_plans_.end(); ++it) {
00327     sub->delete_datareader(it->second.data_reader_);
00328   }
00329   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00330   SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
00331   if (dri) {
00332     if (si) {
00333       si->remove_from_datareader_set(dri);
00334     }
00335     dri->cleanup();
00336   }
00337 }
00338 
00339 DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
00340 {
00341   return resulting_reader_->get_instance_handle();
00342 }
00343 
00344 DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
00345 {
00346   return resulting_reader_->enable();
00347 }
00348 
00349 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
00350 {
00351   return resulting_reader_->get_statuscondition();
00352 }
00353 
00354 DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
00355 {
00356   return resulting_reader_->get_status_changes();
00357 }
00358 
00359 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
00360   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00361   DDS::InstanceStateMask instance_states)
00362 {
00363   return resulting_reader_->create_readcondition(sample_states, view_states,
00364     instance_states);
00365 }
00366 
00367 #ifndef OPENDDS_NO_QUERY_CONDITION
00368 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
00369   DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00370   DDS::InstanceStateMask instance_states, const char* query_expression,
00371   const DDS::StringSeq& query_parameters)
00372 {
00373   return resulting_reader_->create_querycondition(sample_states, view_states,
00374     instance_states, query_expression, query_parameters);
00375 }
00376 #endif
00377 
00378 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
00379   DDS::ReadCondition_ptr a_condition)
00380 {
00381   return resulting_reader_->delete_readcondition(a_condition);
00382 }
00383 
00384 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
00385 {
00386   return resulting_reader_->delete_contained_entities();
00387 }
00388 
00389 DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
00390   const DDS::DataReaderQos& qos)
00391 {
00392   return resulting_reader_->set_qos(qos);
00393 }
00394 
00395 DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
00396 {
00397   return resulting_reader_->get_qos(qos);
00398 }
00399 
00400 DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
00401   DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
00402 {
00403   return resulting_reader_->set_listener(a_listener, mask);
00404 }
00405 
00406 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
00407 {
00408   return resulting_reader_->get_listener();
00409 }
00410 
00411 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
00412 {
00413   return resulting_reader_->get_topicdescription();
00414 }
00415 
00416 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
00417 {
00418   return resulting_reader_->get_subscriber();
00419 }
00420 
00421 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
00422   DDS::SampleRejectedStatus& status)
00423 {
00424   return resulting_reader_->get_sample_rejected_status(status);
00425 }
00426 
00427 DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
00428   DDS::LivelinessChangedStatus& status)
00429 {
00430   return resulting_reader_->get_liveliness_changed_status(status);
00431 }
00432 
00433 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
00434   DDS::RequestedDeadlineMissedStatus& status)
00435 {
00436   return resulting_reader_->get_requested_deadline_missed_status(status);
00437 }
00438 
00439 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
00440   DDS::RequestedIncompatibleQosStatus& status)
00441 {
00442   return resulting_reader_->get_requested_incompatible_qos_status(status);
00443 }
00444 
00445 DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
00446   DDS::SubscriptionMatchedStatus& status)
00447 {
00448   return resulting_reader_->get_subscription_matched_status(status);
00449 }
00450 
00451 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
00452   DDS::SampleLostStatus& status)
00453 {
00454   return resulting_reader_->get_sample_lost_status(status);
00455 }
00456 
00457 DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
00458   const DDS::Duration_t& max_wait)
00459 {
00460   return resulting_reader_->wait_for_historical_data(max_wait);
00461 }
00462 
00463 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
00464   DDS::InstanceHandleSeq& publication_handles)
00465 {
00466   return resulting_reader_->get_matched_publications(publication_handles);
00467 }
00468 
00469 #ifndef DDS_HAS_MINIMUM_BIT
00470 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
00471   DDS::PublicationBuiltinTopicData& publication_data,
00472   DDS::InstanceHandle_t publication_handle)
00473 {
00474   return resulting_reader_->get_matched_publication_data(publication_data,
00475     publication_handle);
00476 }
00477 #endif
00478 
00479 void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
00480 {
00481   resulting_reader_->get_latency_stats(stats);
00482 }
00483 
00484 void MultiTopicDataReaderBase::reset_latency_stats()
00485 {
00486   resulting_reader_->reset_latency_stats();
00487 }
00488 
00489 CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
00490 {
00491   return resulting_reader_->statistics_enabled();
00492 }
00493 
00494 void MultiTopicDataReaderBase::statistics_enabled(
00495   CORBA::Boolean statistics_enabled)
00496 {
00497   resulting_reader_->statistics_enabled(statistics_enabled);
00498 }
00499 
00500 }
00501 }
00502 
00503 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00504 
00505 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1