MultiTopicDataReader_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00010 
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012 
00013 #include "dds/DCPS/MultiTopicDataReaderBase.h"
00014 
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 #pragma once
00017 #endif /* ACE_LACKS_PRAGMA_ONCE */
00018 
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 template<typename Sample, typename TypedDataReader>
00025 class MultiTopicDataReader_T
00026   : public virtual LocalObject<typename TypedDataReader::Interface>
00027   , public virtual MultiTopicDataReaderBase {
00028 public:
00029   typedef TAO::DCPS::ZeroCopyDataSeq<Sample> SampleSeq;
00030 
00031   MultiTopicDataReader_T() {}
00032 
00033   void init_typed(DataReaderEx* dr);
00034   const MetaStruct& getResultingMeta();
00035   void incoming_sample(void* sample, const DDS::SampleInfo& info,
00036                        const char* topic, const MetaStruct& meta);
00037 
00038   DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00039     CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00040     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00041 
00042   DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00043     CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00044     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00045 
00046   DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
00047     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00048     DDS::ReadCondition_ptr a_condition);
00049 
00050   DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
00051     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00052     DDS::ReadCondition_ptr a_condition);
00053 
00054   DDS::ReturnCode_t read_next_sample(Sample& received_data,
00055     DDS::SampleInfo& sample_info);
00056 
00057   DDS::ReturnCode_t take_next_sample(Sample& received_data,
00058     DDS::SampleInfo& sample_info);
00059 
00060   DDS::ReturnCode_t read_instance(SampleSeq& received_data,
00061     DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00062     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00063     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00064 
00065   DDS::ReturnCode_t take_instance(SampleSeq& received_data,
00066     DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00067     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00068     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00069 
00070   DDS::ReturnCode_t read_instance_w_condition(SampleSeq& data_values,
00071     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00072     DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
00073 
00074   DDS::ReturnCode_t take_instance_w_condition(SampleSeq& data_values,
00075     DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00076     DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
00077 
00078   DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
00079     DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00080     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00081     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00082 
00083   DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
00084     DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00085     DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00086     DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00087 
00088   DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq& data_values,
00089     DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00090     DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00091 
00092   DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq& data_values,
00093     DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00094     DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00095 
00096   DDS::ReturnCode_t return_loan(SampleSeq& received_data,
00097     DDS::SampleInfoSeq& info_seq);
00098 
00099   DDS::ReturnCode_t get_key_value(Sample& key_holder,
00100     DDS::InstanceHandle_t handle);
00101 
00102   DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
00103 
00104 private:
00105 
00106   struct SampleWithInfo {
00107     SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
00108       : sample_(),
00109         view_(sampinfo.view_state) {
00110       info_[topic] = sampinfo.instance_handle;
00111     }
00112     void combine(const SampleWithInfo& other) {
00113       info_.insert(other.info_.begin(), other.info_.end());
00114       if (other.view_ == DDS::NEW_VIEW_STATE) view_ = DDS::NEW_VIEW_STATE;
00115     }
00116     Sample sample_;
00117     DDS::ViewStateKind view_;
00118     OPENDDS_MAP(OPENDDS_STRING/*topicName*/, DDS::InstanceHandle_t) info_;
00119   };
00120 
00121   typedef std::vector<SampleWithInfo> SampleVec;
00122   typedef std::set<OPENDDS_STRING> TopicSet;
00123 
00124   // Given a QueryPlan that describes how to treat 'incoming' data from a
00125   // certain topic (with MetaStruct 'meta'), assign its relevant fields to
00126   // the corresponding fields of 'resulting'.
00127   void assign_fields(void* incoming, Sample& resulting, const QueryPlan& qp,
00128                      const MetaStruct& meta);
00129 
00130   // Process all joins (recursively) in the QueryPlan 'qp'.
00131   void process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00132                      SampleVec starting, const TopicSet& seen,
00133                      const QueryPlan& qp);
00134 
00135   // Starting with a 'prototype' sample, fill a 'resulting' vector with all
00136   // data from 'other_dr' (with MetaStruct 'other_meta') such that all key
00137   // fields named in 'key_names' match the values in 'key_data'.  The struct
00138   // pointed-to by 'key_data' is of the type used by the 'other_dr'.
00139   void join(SampleVec& resulting, const SampleWithInfo& prototype,
00140             const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00141             DDS::DataReader_ptr other_dr, const MetaStruct& other_meta);
00142 
00143   // When no common keys are found, natural join devolves to a cross-join where
00144   // each instance in the joined-to-topic (qp) is combined with the results so
00145   // far (partialResults).
00146   void cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00147                   const TopicSet& seen, const QueryPlan& qp);
00148 
00149   // Combine two vectors of data, 'resulting' and 'other', with the results of
00150   // the combination going into 'resulting'.  Use the keys in 'key_names' to
00151   // determine which elements to combine, and the topic names in 'other_topics'
00152   // to determine which fields from 'other' should be assigned to 'resulting'.
00153   void combine(SampleVec& resulting, const SampleVec& other,
00154                const std::vector<OPENDDS_STRING>& key_names,
00155                const TopicSet& other_topics);
00156 
00157   // Helper for combine(), similar to assign_fields but instead of coming from
00158   // a differently-typed struct in a void*, the data comes from an existing
00159   // Sample, 'source'.  Each field projeted from any of the topics in
00160   // 'other_topics' is copied from 'source' to 'target'.
00161   void assign_resulting_fields(Sample& target, const Sample& source,
00162                                const TopicSet& other_topics);
00163 
00164   struct GenericData {
00165     explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
00166       : meta_(meta), ptr_(doAlloc ? meta.allocate() : NULL) {}
00167     ~GenericData() { meta_.deallocate(ptr_); }
00168     const MetaStruct& meta_;
00169     void* ptr_;
00170   };
00171 
00172   struct Contains { // predicate for std::find_if()
00173     const OPENDDS_STRING& look_for_;
00174     explicit Contains(const OPENDDS_STRING& s) : look_for_(s) {}
00175     bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e)
00176       const {
00177       return e.first.count(look_for_);
00178     }
00179   };
00180 
00181   typename TypedDataReader::Interface::_var_type typed_reader_;
00182 };
00183 
00184 }
00185 }
00186 
00187 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00188 
00189 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00190 #include "dds/DCPS/MultiTopicDataReader_T.cpp"
00191 #endif
00192 
00193 #endif
00194 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1