00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #ifndef OPENDDS_NO_MULTI_TOPIC
00010
00011 #include "MultiTopicDataReaderBase.h"
00012
00013 #include "DomainParticipantImpl.h"
00014 #include "Marked_Default_Qos.h"
00015 #include "SubscriberImpl.h"
00016 #include "TypeSupportImpl.h"
00017
00018 #include <stdexcept>
00019
00020 namespace {
00021 struct MatchesIncomingName {
00022 const OPENDDS_STRING& look_for_;
00023 explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
00024 bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs)
00025 const {
00026 return sfs.incoming_name_ == look_for_;
00027 }
00028 };
00029
00030 class Listener
00031 : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
00032 public:
00033 explicit Listener(OpenDDS::DCPS::MultiTopicDataReaderBase* outer)
00034 : outer_(outer)
00035 {}
00036
00037 void on_requested_deadline_missed(DDS::DataReader_ptr ,
00038 const DDS::RequestedDeadlineMissedStatus& ){}
00039
00040 void on_requested_incompatible_qos(DDS::DataReader_ptr ,
00041 const DDS::RequestedIncompatibleQosStatus& ){}
00042
00043 void on_sample_rejected(DDS::DataReader_ptr ,
00044 const DDS::SampleRejectedStatus& ){}
00045
00046 void on_liveliness_changed(DDS::DataReader_ptr ,
00047 const DDS::LivelinessChangedStatus& ){}
00048
00049 void on_data_available(DDS::DataReader_ptr reader){
00050 try {
00051 outer_->data_available(reader);
00052 } catch (std::exception& e) {
00053 if (OpenDDS::DCPS::DCPS_debug_level) {
00054 ACE_ERROR((LM_ERROR, "(%P|%t) MultiTopicDataReaderBase::Listener::"
00055 "on_data_available(): %C", e.what()));
00056 }
00057 }
00058 }
00059
00060 void on_subscription_matched(DDS::DataReader_ptr ,
00061 const DDS::SubscriptionMatchedStatus& ){}
00062
00063 void on_sample_lost(DDS::DataReader_ptr ,
00064 const DDS::SampleLostStatus& ){}
00065
00066
00067 virtual void _add_ref (void){
00068 outer_->_add_ref();
00069 }
00070
00071
00072
00073 virtual void _remove_ref (void){
00074 outer_->_remove_ref();
00075 }
00076
00077
00078
00079 virtual CORBA::ULong _refcount_value (void) const{
00080 return outer_->_refcount_value();
00081 }
00082
00083 private:
00084 OpenDDS::DCPS::MultiTopicDataReaderBase* outer_;
00085 };
00086 }
00087
00088 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00089
00090 namespace OpenDDS {
00091 namespace DCPS {
00092
00093 void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
00094 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00095 SubscriberImpl* parent, MultiTopicImpl* multitopic)
00096 {
00097 using namespace std;
00098 DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
00099 resulting_reader_ = DataReaderEx::_narrow(dr);
00100 DataReaderImpl* resulting_impl =
00101 dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00102
00103 if (!resulting_impl) {
00104 ACE_ERROR((LM_ERROR,
00105 ACE_TEXT("(%P|%t) ERROR: ")
00106 ACE_TEXT("MultiTopicDataReaderBase::init, ")
00107 ACE_TEXT("Failed to get DataReaderImpl.\n")));
00108 return;
00109 }
00110
00111 resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
00112 resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
00113
00114 DDS::DomainParticipant_var participant = parent->get_participant();
00115 DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
00116 if (!dpi) {
00117 ACE_ERROR((LM_ERROR,
00118 ACE_TEXT("(%P|%t) ERROR: ")
00119 ACE_TEXT("MultiTopicDataReaderBase::init, ")
00120 ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
00121 return;
00122 }
00123 resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
00124
00125 init_typed(resulting_reader_);
00126
00127 std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
00128
00129
00130
00131 std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
00132
00133 listener_.reset(new Listener(this));
00134
00135 const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
00136 for (size_t i = 0; i < selection.size(); ++i) {
00137
00138 const DDS::Duration_t no_wait = {0, 0};
00139 DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
00140 if (!t.in()) {
00141 throw runtime_error("Topic: " + selection[i] + " not found.");
00142 }
00143
00144
00145 DDS::DataReader_var incoming =
00146 parent->create_datareader(t, DATAREADER_QOS_USE_TOPIC_QOS,
00147 listener_.get(), ALL_STATUS_MASK);
00148 if (!incoming.in()) {
00149 throw runtime_error("Could not create incoming DataReader "
00150 + selection[i]);
00151 }
00152
00153 QueryPlan& qp = query_plans_[selection[i]];
00154 qp.data_reader_ = incoming;
00155 try {
00156 const MetaStruct& meta = metaStructFor(incoming);
00157
00158 for (const char** names = meta.getFieldNames(); *names; ++names) {
00159 if (fieldToTopic.count(*names)) {
00160 set<OPENDDS_STRING>& topics = joinKeys[*names];
00161 topics.insert(fieldToTopic[*names]);
00162 topics.insert(selection[i]);
00163 } else {
00164 fieldToTopic[*names] = selection[i];
00165 }
00166 }
00167 } catch (const std::runtime_error& e) {
00168 ACE_ERROR((LM_ERROR,
00169 ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::init: %C"), e.what()));
00170 throw std::runtime_error("Failed to obtain metastruct for incoming.");
00171 }
00172 }
00173
00174 const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
00175 if (aggregation.size() == 0) {
00176 const MetaStruct& meta = getResultingMeta();
00177 for (const char** names = meta.getFieldNames(); *names; ++names) {
00178 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00179 fieldToTopic.find(*names);
00180 if (found == fieldToTopic.end()) {
00181 if (DCPS_debug_level > 1) {
00182 ACE_DEBUG((LM_WARNING,
00183 ACE_TEXT("(%P|%t) WARNING: ")
00184 ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
00185 ACE_TEXT("resulting field %C has no corresponding ")
00186 ACE_TEXT("incoming field.\n"), *names));
00187 }
00188 } else {
00189 query_plans_[found->second].projection_.push_back(
00190 SubjectFieldSpec(*names));
00191 }
00192 }
00193 } else {
00194 for (size_t i = 0; i < aggregation.size(); ++i) {
00195 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00196 fieldToTopic.find(aggregation[i].incoming_name_);
00197 if (found == fieldToTopic.end()) {
00198 throw std::runtime_error("Projected field " +
00199 aggregation[i].incoming_name_ + " has no incoming field.");
00200 } else {
00201 query_plans_[found->second].projection_.push_back(aggregation[i]);
00202 }
00203 }
00204 }
00205
00206 typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
00207 for (iter_t iter = joinKeys.begin(); iter != joinKeys.end(); ++iter) {
00208 const OPENDDS_STRING& field = iter->first;
00209 const set<OPENDDS_STRING>& topics = iter->second;
00210 for (set<OPENDDS_STRING>::const_iterator iter2 = topics.begin();
00211 iter2 != topics.end(); ++iter2) {
00212 const OPENDDS_STRING& topic = *iter2;
00213 QueryPlan& qp = query_plans_[topic];
00214 if (find_if(qp.projection_.begin(), qp.projection_.end(),
00215 MatchesIncomingName(field)) == qp.projection_.end()) {
00216 qp.keys_projected_out_.push_back(field);
00217 }
00218 for (set<OPENDDS_STRING>::const_iterator iter3 = topics.begin();
00219 iter3 != topics.end(); ++iter3) {
00220 if (topic != *iter3) {
00221 qp.adjacent_joins_.insert(pair<const OPENDDS_STRING, OPENDDS_STRING>(*iter3, field));
00222 }
00223 }
00224 }
00225 }
00226 }
00227
00228 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
00229 {
00230 DDS::TopicDescription_var td = reader->get_topicdescription();
00231 CORBA::String_var topic = td->get_name();
00232 return topic.in();
00233 }
00234
00235 const MetaStruct&
00236 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
00237 {
00238 DDS::TopicDescription_var td = reader->get_topicdescription();
00239 TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
00240 if (tdi) {
00241 TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
00242 if (ts) {
00243 return ts->getMetaStructForType();
00244 }
00245 }
00246 throw std::runtime_error("Failed to obtain type support for incoming DataReader");
00247 }
00248
00249 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
00250 {
00251 using namespace std;
00252 using namespace DDS;
00253
00254 const OPENDDS_STRING topic = topicNameFor(reader);
00255 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
00256 if (!dri) {
00257 throw runtime_error("Incoming DataReader for " + topic +
00258 " could not be cast to DataReaderImpl.");
00259 }
00260 DataReaderImpl::GenericBundle gen;
00261 ReturnCode_t rc = dri->read_generic(gen, NOT_READ_SAMPLE_STATE,
00262 ANY_VIEW_STATE, ANY_INSTANCE_STATE,false);
00263 if (rc == RETCODE_NO_DATA) {
00264 return;
00265 } else if (rc != RETCODE_OK) {
00266 throw runtime_error("Incoming DataReader for " + topic +
00267 " could not be read, error #" + to_dds_string(rc));
00268 }
00269 try {
00270 const MetaStruct& meta = metaStructFor(reader);
00271 const QueryPlan& qp = query_plans_[topic];
00272 for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
00273 if (gen.info_[i].valid_data) {
00274 incoming_sample(gen.samples_[i], gen.info_[i], topic.c_str(), meta);
00275 } else if (gen.info_[i].instance_state != ALIVE_INSTANCE_STATE) {
00276 DataReaderImpl* resulting_impl =
00277 dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00278
00279 if (resulting_impl) {
00280 set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator
00281 iter = qp.instances_.begin();
00282 while (iter != qp.instances_.end() &&
00283 iter->first != gen.info_[i].instance_handle) ++iter;
00284 for (; iter != qp.instances_.end() &&
00285 iter->first == gen.info_[i].instance_handle; ++iter) {
00286 resulting_impl->set_instance_state(iter->second,
00287 gen.info_[i].instance_state);
00288 }
00289 } else {
00290 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available:")
00291 ACE_TEXT(" failed to obtain DataReaderImpl.")));
00292 }
00293 }
00294 }
00295 } catch (const std::runtime_error& e) {
00296 if (OpenDDS::DCPS::DCPS_debug_level) {
00297 ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available: %C"), e.what()));
00298 }
00299 }
00300 }
00301
00302 void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
00303 bool flag)
00304 {
00305 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00306 if (dri) {
00307 dri->set_status_changed_flag(status, flag);
00308 }
00309 }
00310
00311 bool MultiTopicDataReaderBase::have_sample_states(
00312 DDS::SampleStateMask sample_states) const
00313 {
00314 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00315 if (dri) {
00316 return dri->have_sample_states(sample_states);
00317 } else {
00318 return false;
00319 }
00320 }
00321
00322 void MultiTopicDataReaderBase::cleanup()
00323 {
00324 DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
00325 for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
00326 it != query_plans_.end(); ++it) {
00327 sub->delete_datareader(it->second.data_reader_);
00328 }
00329 DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00330 SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
00331 if (dri) {
00332 if (si) {
00333 si->remove_from_datareader_set(dri);
00334 }
00335 dri->cleanup();
00336 }
00337 }
00338
00339 DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
00340 {
00341 return resulting_reader_->get_instance_handle();
00342 }
00343
00344 DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
00345 {
00346 return resulting_reader_->enable();
00347 }
00348
00349 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
00350 {
00351 return resulting_reader_->get_statuscondition();
00352 }
00353
00354 DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
00355 {
00356 return resulting_reader_->get_status_changes();
00357 }
00358
00359 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
00360 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00361 DDS::InstanceStateMask instance_states)
00362 {
00363 return resulting_reader_->create_readcondition(sample_states, view_states,
00364 instance_states);
00365 }
00366
00367 #ifndef OPENDDS_NO_QUERY_CONDITION
00368 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
00369 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00370 DDS::InstanceStateMask instance_states, const char* query_expression,
00371 const DDS::StringSeq& query_parameters)
00372 {
00373 return resulting_reader_->create_querycondition(sample_states, view_states,
00374 instance_states, query_expression, query_parameters);
00375 }
00376 #endif
00377
00378 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
00379 DDS::ReadCondition_ptr a_condition)
00380 {
00381 return resulting_reader_->delete_readcondition(a_condition);
00382 }
00383
00384 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
00385 {
00386 return resulting_reader_->delete_contained_entities();
00387 }
00388
00389 DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
00390 const DDS::DataReaderQos& qos)
00391 {
00392 return resulting_reader_->set_qos(qos);
00393 }
00394
00395 DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
00396 {
00397 return resulting_reader_->get_qos(qos);
00398 }
00399
00400 DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
00401 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
00402 {
00403 return resulting_reader_->set_listener(a_listener, mask);
00404 }
00405
00406 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
00407 {
00408 return resulting_reader_->get_listener();
00409 }
00410
00411 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
00412 {
00413 return resulting_reader_->get_topicdescription();
00414 }
00415
00416 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
00417 {
00418 return resulting_reader_->get_subscriber();
00419 }
00420
00421 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
00422 DDS::SampleRejectedStatus& status)
00423 {
00424 return resulting_reader_->get_sample_rejected_status(status);
00425 }
00426
00427 DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
00428 DDS::LivelinessChangedStatus& status)
00429 {
00430 return resulting_reader_->get_liveliness_changed_status(status);
00431 }
00432
00433 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
00434 DDS::RequestedDeadlineMissedStatus& status)
00435 {
00436 return resulting_reader_->get_requested_deadline_missed_status(status);
00437 }
00438
00439 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
00440 DDS::RequestedIncompatibleQosStatus& status)
00441 {
00442 return resulting_reader_->get_requested_incompatible_qos_status(status);
00443 }
00444
00445 DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
00446 DDS::SubscriptionMatchedStatus& status)
00447 {
00448 return resulting_reader_->get_subscription_matched_status(status);
00449 }
00450
00451 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
00452 DDS::SampleLostStatus& status)
00453 {
00454 return resulting_reader_->get_sample_lost_status(status);
00455 }
00456
00457 DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
00458 const DDS::Duration_t& max_wait)
00459 {
00460 return resulting_reader_->wait_for_historical_data(max_wait);
00461 }
00462
00463 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
00464 DDS::InstanceHandleSeq& publication_handles)
00465 {
00466 return resulting_reader_->get_matched_publications(publication_handles);
00467 }
00468
00469 #ifndef DDS_HAS_MINIMUM_BIT
00470 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
00471 DDS::PublicationBuiltinTopicData& publication_data,
00472 DDS::InstanceHandle_t publication_handle)
00473 {
00474 return resulting_reader_->get_matched_publication_data(publication_data,
00475 publication_handle);
00476 }
00477 #endif
00478
00479 void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
00480 {
00481 resulting_reader_->get_latency_stats(stats);
00482 }
00483
00484 void MultiTopicDataReaderBase::reset_latency_stats()
00485 {
00486 resulting_reader_->reset_latency_stats();
00487 }
00488
00489 CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
00490 {
00491 return resulting_reader_->statistics_enabled();
00492 }
00493
00494 void MultiTopicDataReaderBase::statistics_enabled(
00495 CORBA::Boolean statistics_enabled)
00496 {
00497 resulting_reader_->statistics_enabled(statistics_enabled);
00498 }
00499
00500 }
00501 }
00502
00503 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00504
00505 #endif