MultiTopicDataReader_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00010 
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012 
00013 #include "dds/DCPS/MultiTopicDataReaderBase.h"
00014 
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 #pragma once
00017 #endif /* ACE_LACKS_PRAGMA_ONCE */
00018 
00019 namespace OpenDDS {
00020 namespace DCPS {
00021 
00022 template<typename Sample, typename TypedDataReader>
00023 class MultiTopicDataReader_T
00024   : public virtual LocalObject<typename TypedDataReader::Interface>
00025   , public virtual MultiTopicDataReaderBase {
00026 public:
00027   typedef TAO::DCPS::ZeroCopyDataSeq<Sample> SampleSeq;
00028 
00029   MultiTopicDataReader_T() {}
00030 
00031   void init_typed(DataReaderEx* dr);
00032   const MetaStruct& getResultingMeta();
00033   void incoming_sample(void* sample, const DDS::SampleInfo& info,
00034                        const char* topic, const MetaStruct& meta);
00035 
00036   DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00037     CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00038     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00039 
00040   DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00041     CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00042     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00043 
00044   DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
00045     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00046     DDS::ReadCondition_ptr a_condition);
00047 
00048   DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
00049     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00050     DDS::ReadCondition_ptr a_condition);
00051 
00052   DDS::ReturnCode_t read_next_sample(Sample& received_data,
00053     DDS::SampleInfo& sample_info);
00054 
00055   DDS::ReturnCode_t take_next_sample(Sample& received_data,
00056     DDS::SampleInfo& sample_info);
00057 
00058   DDS::ReturnCode_t read_instance(SampleSeq& received_data,
00059     DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00060     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00061     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00062 
00063   DDS::ReturnCode_t take_instance(SampleSeq& received_data,
00064     DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00065     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00066     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00067 
00068   DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
00069     DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00070     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00071     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00072 
00073   DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
00074     DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00075     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00076     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00077 
00078   DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq& data_values,
00079     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00080     DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00081 
00082   DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq& data_values,
00083     DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00084     DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00085 
00086   DDS::ReturnCode_t return_loan(SampleSeq& received_data,
00087     DDS::SampleInfoSeq& info_seq);
00088 
00089   DDS::ReturnCode_t get_key_value(Sample& key_holder,
00090     DDS::InstanceHandle_t handle);
00091 
00092   DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
00093 
00094 private:
00095 
00096   struct SampleWithInfo {
00097     SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
00098       : sample_(),
00099         view_(sampinfo.view_state) {
00100       info_[topic] = sampinfo.instance_handle;
00101     }
00102     void combine(const SampleWithInfo& other) {
00103       info_.insert(other.info_.begin(), other.info_.end());
00104       if (other.view_ == DDS::NEW_VIEW_STATE) view_ = DDS::NEW_VIEW_STATE;
00105     }
00106     Sample sample_;
00107     DDS::ViewStateKind view_;
00108     OPENDDS_MAP(OPENDDS_STRING/*topicName*/, DDS::InstanceHandle_t) info_;
00109   };
00110 
00111   typedef std::vector<SampleWithInfo> SampleVec;
00112   typedef std::set<OPENDDS_STRING> TopicSet;
00113 
00114   // Given a QueryPlan that describes how to treat 'incoming' data from a
00115   // certain topic (with MetaStruct 'meta'), assign its relevant fields to
00116   // the corresponding fields of 'resulting'.
00117   void assign_fields(void* incoming, Sample& resulting, const QueryPlan& qp,
00118                      const MetaStruct& meta);
00119 
00120   // Process all joins (recursively) in the QueryPlan 'qp'.
00121   void process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00122                      SampleVec starting, const TopicSet& seen,
00123                      const QueryPlan& qp);
00124 
00125   // Starting with a 'prototype' sample, fill a 'resulting' vector with all
00126   // data from 'other_dr' (with MetaStruct 'other_meta') such that all key
00127   // fields named in 'key_names' match the values in 'key_data'.  The struct
00128   // pointed-to by 'key_data' is of the type used by the 'other_dr'.
00129   void join(SampleVec& resulting, const SampleWithInfo& prototype,
00130             const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00131             DDS::DataReader_ptr other_dr, const MetaStruct& other_meta);
00132 
00133   // When no common keys are found, natural join devolves to a cross-join where
00134   // each instance in the joined-to-topic (qp) is combined with the results so
00135   // far (partialResults).
00136   void cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00137                   const TopicSet& seen, const QueryPlan& qp);
00138 
00139   // Combine two vectors of data, 'resulting' and 'other', with the results of
00140   // the combination going into 'resulting'.  Use the keys in 'key_names' to
00141   // determine which elements to combine, and the topic names in 'other_topics'
00142   // to determine which fields from 'other' should be assigned to 'resulting'.
00143   void combine(SampleVec& resulting, const SampleVec& other,
00144                const std::vector<OPENDDS_STRING>& key_names,
00145                const TopicSet& other_topics);
00146 
00147   // Helper for combine(), similar to assign_fields but instead of coming from
00148   // a differently-typed struct in a void*, the data comes from an existing
00149   // Sample, 'source'.  Each field projeted from any of the topics in
00150   // 'other_topics' is copied from 'source' to 'target'.
00151   void assign_resulting_fields(Sample& target, const Sample& source,
00152                                const TopicSet& other_topics);
00153 
00154   struct GenericData {
00155     explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
00156       : meta_(meta), ptr_(doAlloc ? meta.allocate() : NULL) {}
00157     ~GenericData() { meta_.deallocate(ptr_); }
00158     const MetaStruct& meta_;
00159     void* ptr_;
00160   };
00161 
00162   struct Contains { // predicate for std::find_if()
00163     const OPENDDS_STRING& look_for_;
00164     explicit Contains(const OPENDDS_STRING& s) : look_for_(s) {}
00165     bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e)
00166       const {
00167       return e.first.count(look_for_);
00168     }
00169   };
00170 
00171   typename TypedDataReader::Interface::_var_type typed_reader_;
00172 };
00173 
00174 
00175 }
00176 }
00177 
00178 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00179 #include "dds/DCPS/MultiTopicDataReader_T.cpp"
00180 #endif
00181 
00182 #endif
00183 #endif

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7