OpenDDS::DCPS::MultiTopicDataReaderBase Class Reference

#include <MultiTopicDataReaderBase.h>

Inheritance diagram for OpenDDS::DCPS::MultiTopicDataReaderBase:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MultiTopicDataReaderBase:
Collaboration graph
[legend]

List of all members.

Classes

struct  QueryPlan

Public Member Functions

 MultiTopicDataReaderBase ()
void init (const DDS::DataReaderQos &dr_qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask, SubscriberImpl *parent, MultiTopicImpl *multitopic)
void data_available (DDS::DataReader_ptr reader)
void set_status_changed_flag (DDS::StatusKind status, bool flag)
bool have_sample_states (DDS::SampleStateMask sample_states) const
void cleanup ()
DDS::InstanceHandle_t get_instance_handle ()
DDS::ReturnCode_t enable ()
DDS::StatusCondition_ptr get_statuscondition ()
DDS::StatusMask get_status_changes ()
DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t delete_contained_entities ()
DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
DDS::DataReaderListener_ptr get_listener ()
DDS::TopicDescription_ptr get_topicdescription ()
DDS::Subscriber_ptr get_subscriber ()
DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
void get_latency_stats (LatencyStatisticsSeq &stats)
void reset_latency_stats ()
 Clear any intermediate statistical values.
CORBA::Boolean statistics_enabled ()
void statistics_enabled (CORBA::Boolean statistics_enabled)

Protected Types

typedef
MultiTopicImpl::SubjectFieldSpec 
SubjectFieldSpec

Protected Member Functions

OPENDDS_STRING topicNameFor (DDS::DataReader_ptr dr)
const MetaStructmetaStructFor (DDS::DataReader_ptr dr)
 OPENDDS_MAP (OPENDDS_STRING, QueryPlan) query_plans_

Private Member Functions

virtual void init_typed (DataReaderEx *dr)=0
virtual const MetaStructgetResultingMeta ()=0
virtual void incoming_sample (void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)=0

Private Attributes

unique_ptr
< OpenDDS::DCPS::LocalObject
< DDS::DataReaderListener > > 
listener_
DataReaderEx_var resulting_reader_

Detailed Description

Definition at line 30 of file MultiTopicDataReaderBase.h.


Member Typedef Documentation

Definition at line 139 of file MultiTopicDataReaderBase.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::MultiTopicDataReaderBase::MultiTopicDataReaderBase (  )  [inline]

Definition at line 33 of file MultiTopicDataReaderBase.h.

00033 {}


Member Function Documentation

void OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup ( void   ) 

Definition at line 322 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::DataReaderImpl::cleanup(), OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set(), and resulting_reader_.

00323 {
00324   DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
00325   for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
00326        it != query_plans_.end(); ++it) {
00327     sub->delete_datareader(it->second.data_reader_);
00328   }
00329   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00330   SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
00331   if (dri) {
00332     if (si) {
00333       si->remove_from_datareader_set(dri);
00334     }
00335     dri->cleanup();
00336   }
00337 }

Here is the call graph for this function:

DDS::QueryCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::create_querycondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const char *  query_expression,
const DDS::StringSeq query_parameters 
)

Definition at line 368 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00372 {
00373   return resulting_reader_->create_querycondition(sample_states, view_states,
00374     instance_states, query_expression, query_parameters);
00375 }

DDS::ReadCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::create_readcondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Definition at line 359 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00362 {
00363   return resulting_reader_->create_readcondition(sample_states, view_states,
00364     instance_states);
00365 }

void OpenDDS::DCPS::MultiTopicDataReaderBase::data_available ( DDS::DataReader_ptr  reader  ) 

Definition at line 249 of file MultiTopicDataReaderBase.cpp.

References ACE_TEXT(), DDS::ALIVE_INSTANCE_STATE, DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::DCPS_debug_level, incoming_sample(), OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::instances_, LM_ERROR, metaStructFor(), DDS::NOT_READ_SAMPLE_STATE, OPENDDS_STRING, OpenDDS::DCPS::DataReaderImpl::read_generic(), resulting_reader_, DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_, OpenDDS::DCPS::DataReaderImpl::set_instance_state(), OpenDDS::DCPS::to_dds_string(), and topicNameFor().

00250 {
00251   using namespace std;
00252   using namespace DDS;
00253 
00254   const OPENDDS_STRING topic = topicNameFor(reader);
00255   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
00256   if (!dri) {
00257     throw runtime_error("Incoming DataReader for " + topic +
00258       " could not be cast to DataReaderImpl.");
00259   }
00260   DataReaderImpl::GenericBundle gen;
00261   ReturnCode_t rc = dri->read_generic(gen, NOT_READ_SAMPLE_STATE,
00262                                       ANY_VIEW_STATE, ANY_INSTANCE_STATE,false);
00263   if (rc == RETCODE_NO_DATA) {
00264     return;
00265   } else if (rc != RETCODE_OK) {
00266     throw runtime_error("Incoming DataReader for " + topic +
00267       " could not be read, error #" + to_dds_string(rc));
00268   }
00269   try {
00270     const MetaStruct& meta = metaStructFor(reader);
00271     const QueryPlan& qp = query_plans_[topic];
00272     for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
00273       if (gen.info_[i].valid_data) {
00274         incoming_sample(gen.samples_[i], gen.info_[i], topic.c_str(), meta);
00275       } else if (gen.info_[i].instance_state != ALIVE_INSTANCE_STATE) {
00276         DataReaderImpl* resulting_impl =
00277           dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00278 
00279         if (resulting_impl) {
00280           set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator
00281             iter = qp.instances_.begin();
00282           while (iter != qp.instances_.end() &&
00283             iter->first != gen.info_[i].instance_handle) ++iter;
00284           for (; iter != qp.instances_.end() &&
00285             iter->first == gen.info_[i].instance_handle; ++iter) {
00286             resulting_impl->set_instance_state(iter->second,
00287               gen.info_[i].instance_state);
00288           }
00289         } else {
00290           ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available:")
00291             ACE_TEXT(" failed to obtain DataReaderImpl.")));
00292         }
00293       }
00294     }
00295   } catch (const std::runtime_error& e) {
00296     if (OpenDDS::DCPS::DCPS_debug_level) {
00297       ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::data_available: %C"), e.what()));
00298     }
00299   }
00300 }

Here is the call graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::delete_contained_entities (  ) 

Implements DDS::DataReader.

Definition at line 384 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00385 {
00386   return resulting_reader_->delete_contained_entities();
00387 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::delete_readcondition ( DDS::ReadCondition_ptr  a_condition  ) 

Definition at line 378 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00380 {
00381   return resulting_reader_->delete_readcondition(a_condition);
00382 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::enable (  ) 

Implements DDS::Entity.

Definition at line 344 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00345 {
00346   return resulting_reader_->enable();
00347 }

DDS::InstanceHandle_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_instance_handle (  ) 

Implements DDS::Entity.

Definition at line 339 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00340 {
00341   return resulting_reader_->get_instance_handle();
00342 }

void OpenDDS::DCPS::MultiTopicDataReaderBase::get_latency_stats ( LatencyStatisticsSeq stats  ) 

Definition at line 479 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00480 {
00481   resulting_reader_->get_latency_stats(stats);
00482 }

DDS::DataReaderListener_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_listener (  ) 

Implements DDS::DataReader.

Definition at line 406 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

00407 {
00408   return resulting_reader_->get_listener();
00409 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_liveliness_changed_status ( DDS::LivelinessChangedStatus status  ) 

Definition at line 427 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00429 {
00430   return resulting_reader_->get_liveliness_changed_status(status);
00431 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_matched_publication_data ( DDS::PublicationBuiltinTopicData publication_data,
DDS::InstanceHandle_t  publication_handle 
)

Definition at line 470 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00473 {
00474   return resulting_reader_->get_matched_publication_data(publication_data,
00475     publication_handle);
00476 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_matched_publications ( DDS::InstanceHandleSeq publication_handles  ) 

Definition at line 463 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00465 {
00466   return resulting_reader_->get_matched_publications(publication_handles);
00467 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_qos ( DDS::DataReaderQos qos  ) 

Definition at line 395 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00396 {
00397   return resulting_reader_->get_qos(qos);
00398 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_requested_deadline_missed_status ( DDS::RequestedDeadlineMissedStatus status  ) 

Definition at line 433 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00435 {
00436   return resulting_reader_->get_requested_deadline_missed_status(status);
00437 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_requested_incompatible_qos_status ( DDS::RequestedIncompatibleQosStatus status  ) 

Definition at line 439 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00441 {
00442   return resulting_reader_->get_requested_incompatible_qos_status(status);
00443 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_sample_lost_status ( DDS::SampleLostStatus status  ) 

Definition at line 451 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00453 {
00454   return resulting_reader_->get_sample_lost_status(status);
00455 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_sample_rejected_status ( DDS::SampleRejectedStatus status  ) 

Definition at line 421 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00423 {
00424   return resulting_reader_->get_sample_rejected_status(status);
00425 }

DDS::StatusMask OpenDDS::DCPS::MultiTopicDataReaderBase::get_status_changes (  ) 

Implements DDS::Entity.

Definition at line 354 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00355 {
00356   return resulting_reader_->get_status_changes();
00357 }

DDS::StatusCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_statuscondition (  ) 

Implements DDS::Entity.

Definition at line 349 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00350 {
00351   return resulting_reader_->get_statuscondition();
00352 }

DDS::Subscriber_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_subscriber (  ) 

Implements DDS::DataReader.

Definition at line 416 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00417 {
00418   return resulting_reader_->get_subscriber();
00419 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_subscription_matched_status ( DDS::SubscriptionMatchedStatus status  ) 

Definition at line 445 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00447 {
00448   return resulting_reader_->get_subscription_matched_status(status);
00449 }

DDS::TopicDescription_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_topicdescription (  ) 

Implements DDS::DataReader.

Definition at line 411 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00412 {
00413   return resulting_reader_->get_topicdescription();
00414 }

virtual const MetaStruct& OpenDDS::DCPS::MultiTopicDataReaderBase::getResultingMeta (  )  [private, pure virtual]

Implemented in OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >.

Referenced by init().

Here is the caller graph for this function:

bool OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states ( DDS::SampleStateMask  sample_states  )  const

Definition at line 311 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::DataReaderImpl::have_sample_states(), and resulting_reader_.

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

00313 {
00314   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00315   if (dri) {
00316     return dri->have_sample_states(sample_states);
00317   } else {
00318     return false;
00319   }
00320 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::MultiTopicDataReaderBase::incoming_sample ( void *  sample,
const DDS::SampleInfo info,
const char *  topic,
const MetaStruct meta 
) [private, pure virtual]

Implemented in OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >.

Referenced by data_available().

Here is the caller graph for this function:

void OpenDDS::DCPS::MultiTopicDataReaderBase::init ( const DDS::DataReaderQos dr_qos,
DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask,
SubscriberImpl parent,
MultiTopicImpl multitopic 
)

Definition at line 93 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::LocalObject< DataReaderEx >::_narrow(), ACE_TEXT(), OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::adjacent_joins_, OpenDDS::DCPS::ALL_STATUS_MASK, OpenDDS::DCPS::SubscriberImpl::create_datareader(), OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::data_reader_, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), OpenDDS::DCPS::MultiTopicImpl::get_aggregation(), OpenDDS::DCPS::SubscriberImpl::get_participant(), OpenDDS::DCPS::MultiTopicImpl::get_selection(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::MetaStruct::getFieldNames(), getResultingMeta(), OpenDDS::DCPS::DataReaderImpl::init(), init_typed(), OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::keys_projected_out_, listener_, LM_ERROR, LM_WARNING, metaStructFor(), OPENDDS_STRING, OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::projection_, OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size(), OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type(), OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), and resulting_reader_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

00096 {
00097   using namespace std;
00098   DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
00099   resulting_reader_ = DataReaderEx::_narrow(dr);
00100   DataReaderImpl* resulting_impl =
00101     dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00102 
00103   if (!resulting_impl) {
00104     ACE_ERROR((LM_ERROR,
00105       ACE_TEXT("(%P|%t) ERROR: ")
00106       ACE_TEXT("MultiTopicDataReaderBase::init, ")
00107       ACE_TEXT("Failed to get DataReaderImpl.\n")));
00108     return;
00109   }
00110 
00111   resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
00112   resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
00113 
00114   DDS::DomainParticipant_var participant = parent->get_participant();
00115   DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
00116   if (!dpi) {
00117     ACE_ERROR((LM_ERROR,
00118       ACE_TEXT("(%P|%t) ERROR: ")
00119       ACE_TEXT("MultiTopicDataReaderBase::init, ")
00120       ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
00121     return;
00122   }
00123   resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
00124 
00125   init_typed(resulting_reader_);
00126 
00127   std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
00128 
00129   // key: name of field that's a key for the 'join'
00130   // mapped: set of topicNames that have this key in common
00131   std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
00132 
00133   listener_.reset(new Listener(this));
00134 
00135   const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
00136   for (size_t i = 0; i < selection.size(); ++i) {
00137 
00138     const DDS::Duration_t no_wait = {0, 0};
00139     DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
00140     if (!t.in()) {
00141       throw runtime_error("Topic: " + selection[i] + " not found.");
00142     }
00143 
00144 
00145     DDS::DataReader_var incoming =
00146       parent->create_datareader(t, DATAREADER_QOS_USE_TOPIC_QOS,
00147                                 listener_.get(), ALL_STATUS_MASK);
00148     if (!incoming.in()) {
00149       throw runtime_error("Could not create incoming DataReader "
00150         + selection[i]);
00151     }
00152 
00153     QueryPlan& qp = query_plans_[selection[i]];
00154     qp.data_reader_ = incoming;
00155     try {
00156       const MetaStruct& meta = metaStructFor(incoming);
00157 
00158       for (const char** names = meta.getFieldNames(); *names; ++names) {
00159         if (fieldToTopic.count(*names)) { // already seen this field name
00160           set<OPENDDS_STRING>& topics = joinKeys[*names];
00161           topics.insert(fieldToTopic[*names]);
00162           topics.insert(selection[i]);
00163         } else {
00164           fieldToTopic[*names] = selection[i];
00165         }
00166       }
00167     } catch (const std::runtime_error& e) {
00168         ACE_ERROR((LM_ERROR,
00169           ACE_TEXT("(%P|%t) MultiTopicDataReaderBase::init: %C"), e.what()));
00170         throw std::runtime_error("Failed to obtain metastruct for incoming.");
00171     }
00172   }
00173 
00174   const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
00175   if (aggregation.size() == 0) { // "SELECT * FROM ..."
00176     const MetaStruct& meta = getResultingMeta();
00177     for (const char** names = meta.getFieldNames(); *names; ++names) {
00178       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00179         fieldToTopic.find(*names);
00180       if (found == fieldToTopic.end()) {
00181         if (DCPS_debug_level > 1) {
00182           ACE_DEBUG((LM_WARNING,
00183                      ACE_TEXT("(%P|%t) WARNING: ")
00184                      ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
00185                      ACE_TEXT("resulting field %C has no corresponding ")
00186                      ACE_TEXT("incoming field.\n"), *names));
00187         }
00188       } else {
00189         query_plans_[found->second].projection_.push_back(
00190           SubjectFieldSpec(*names));
00191       }
00192     }
00193   } else { // "SELECT A, B FROM ..."
00194     for (size_t i = 0; i < aggregation.size(); ++i) {
00195       std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
00196         fieldToTopic.find(aggregation[i].incoming_name_);
00197       if (found == fieldToTopic.end()) {
00198         throw std::runtime_error("Projected field " +
00199           aggregation[i].incoming_name_ + " has no incoming field.");
00200       } else {
00201         query_plans_[found->second].projection_.push_back(aggregation[i]);
00202       }
00203     }
00204   }
00205 
00206   typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
00207   for (iter_t iter = joinKeys.begin(); iter != joinKeys.end(); ++iter) {
00208     const OPENDDS_STRING& field = iter->first;
00209     const set<OPENDDS_STRING>& topics = iter->second;
00210     for (set<OPENDDS_STRING>::const_iterator iter2 = topics.begin();
00211          iter2 != topics.end(); ++iter2) {
00212       const OPENDDS_STRING& topic = *iter2;
00213       QueryPlan& qp = query_plans_[topic];
00214       if (find_if(qp.projection_.begin(), qp.projection_.end(),
00215                   MatchesIncomingName(field)) == qp.projection_.end()) {
00216         qp.keys_projected_out_.push_back(field);
00217       }
00218       for (set<OPENDDS_STRING>::const_iterator iter3 = topics.begin();
00219            iter3 != topics.end(); ++iter3) {
00220         if (topic != *iter3) { // other topics
00221           qp.adjacent_joins_.insert(pair<const OPENDDS_STRING, OPENDDS_STRING>(*iter3, field));
00222         }
00223       }
00224     }
00225   }
00226 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual void OpenDDS::DCPS::MultiTopicDataReaderBase::init_typed ( DataReaderEx dr  )  [private, pure virtual]

Implemented in OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >.

Referenced by init().

Here is the caller graph for this function:

const MetaStruct & OpenDDS::DCPS::MultiTopicDataReaderBase::metaStructFor ( DDS::DataReader_ptr  dr  )  [protected]

Definition at line 236 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), and OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType().

Referenced by data_available(), and init().

00237 {
00238   DDS::TopicDescription_var td = reader->get_topicdescription();
00239   TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
00240   if (tdi) {
00241     TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
00242     if (ts) {
00243       return ts->getMetaStructForType();
00244     }
00245   }
00246   throw std::runtime_error("Failed to obtain type support for incoming DataReader");
00247 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::MultiTopicDataReaderBase::OPENDDS_MAP ( OPENDDS_STRING  ,
QueryPlan   
) [protected]
void OpenDDS::DCPS::MultiTopicDataReaderBase::reset_latency_stats (  ) 

Clear any intermediate statistical values.

Implements OpenDDS::DCPS::DataReaderEx.

Definition at line 484 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00485 {
00486   resulting_reader_->reset_latency_stats();
00487 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::set_listener ( DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
)

Definition at line 400 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00402 {
00403   return resulting_reader_->set_listener(a_listener, mask);
00404 }

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::set_qos ( const DDS::DataReaderQos qos  ) 

Definition at line 389 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00391 {
00392   return resulting_reader_->set_qos(qos);
00393 }

void OpenDDS::DCPS::MultiTopicDataReaderBase::set_status_changed_flag ( DDS::StatusKind  status,
bool  flag 
)

Definition at line 302 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

00304 {
00305   DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
00306   if (dri) {
00307     dri->set_status_changed_flag(status, flag);
00308   }
00309 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::MultiTopicDataReaderBase::statistics_enabled ( CORBA::Boolean  statistics_enabled  ) 

Definition at line 494 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00496 {
00497   resulting_reader_->statistics_enabled(statistics_enabled);
00498 }

CORBA::Boolean OpenDDS::DCPS::MultiTopicDataReaderBase::statistics_enabled (  ) 

Definition at line 489 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00490 {
00491   return resulting_reader_->statistics_enabled();
00492 }

OPENDDS_STRING OpenDDS::DCPS::MultiTopicDataReaderBase::topicNameFor ( DDS::DataReader_ptr  dr  )  [protected]

Definition at line 228 of file MultiTopicDataReaderBase.cpp.

Referenced by data_available().

00229 {
00230   DDS::TopicDescription_var td = reader->get_topicdescription();
00231   CORBA::String_var topic = td->get_name();
00232   return topic.in();
00233 }

Here is the caller graph for this function:

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::wait_for_historical_data ( const DDS::Duration_t max_wait  ) 

Definition at line 457 of file MultiTopicDataReaderBase.cpp.

References resulting_reader_.

00459 {
00460   return resulting_reader_->wait_for_historical_data(max_wait);
00461 }


Member Data Documentation

Definition at line 131 of file MultiTopicDataReaderBase.h.

Referenced by init().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1