00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include "dds/DCPS/MultiTopicDataReaderBase.h"
00014
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 #pragma once
00017 #endif
00018
00019 namespace OpenDDS {
00020 namespace DCPS {
00021
00022 template<typename Sample, typename TypedDataReader>
00023 class MultiTopicDataReader_T
00024 : public virtual LocalObject<typename TypedDataReader::Interface>
00025 , public virtual MultiTopicDataReaderBase {
00026 public:
00027 typedef TAO::DCPS::ZeroCopyDataSeq<Sample> SampleSeq;
00028
00029 MultiTopicDataReader_T() {}
00030
00031 void init_typed(DataReaderEx* dr);
00032 const MetaStruct& getResultingMeta();
00033 void incoming_sample(void* sample, const DDS::SampleInfo& info,
00034 const char* topic, const MetaStruct& meta);
00035
00036 DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00037 CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00038 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00039
00040 DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00041 CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00042 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00043
00044 DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
00045 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00046 DDS::ReadCondition_ptr a_condition);
00047
00048 DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
00049 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00050 DDS::ReadCondition_ptr a_condition);
00051
00052 DDS::ReturnCode_t read_next_sample(Sample& received_data,
00053 DDS::SampleInfo& sample_info);
00054
00055 DDS::ReturnCode_t take_next_sample(Sample& received_data,
00056 DDS::SampleInfo& sample_info);
00057
00058 DDS::ReturnCode_t read_instance(SampleSeq& received_data,
00059 DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00060 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00061 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00062
00063 DDS::ReturnCode_t take_instance(SampleSeq& received_data,
00064 DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00065 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00066 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00067
00068 DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
00069 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00070 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00071 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00072
00073 DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
00074 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00075 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00076 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00077
00078 DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq& data_values,
00079 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00080 DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00081
00082 DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq& data_values,
00083 DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00084 DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00085
00086 DDS::ReturnCode_t return_loan(SampleSeq& received_data,
00087 DDS::SampleInfoSeq& info_seq);
00088
00089 DDS::ReturnCode_t get_key_value(Sample& key_holder,
00090 DDS::InstanceHandle_t handle);
00091
00092 DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
00093
00094 private:
00095
00096 struct SampleWithInfo {
00097 SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
00098 : sample_(),
00099 view_(sampinfo.view_state) {
00100 info_[topic] = sampinfo.instance_handle;
00101 }
00102 void combine(const SampleWithInfo& other) {
00103 info_.insert(other.info_.begin(), other.info_.end());
00104 if (other.view_ == DDS::NEW_VIEW_STATE) view_ = DDS::NEW_VIEW_STATE;
00105 }
00106 Sample sample_;
00107 DDS::ViewStateKind view_;
00108 OPENDDS_MAP(OPENDDS_STRING, DDS::InstanceHandle_t) info_;
00109 };
00110
00111 typedef std::vector<SampleWithInfo> SampleVec;
00112 typedef std::set<OPENDDS_STRING> TopicSet;
00113
00114
00115
00116
00117 void assign_fields(void* incoming, Sample& resulting, const QueryPlan& qp,
00118 const MetaStruct& meta);
00119
00120
00121 void process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00122 SampleVec starting, const TopicSet& seen,
00123 const QueryPlan& qp);
00124
00125
00126
00127
00128
00129 void join(SampleVec& resulting, const SampleWithInfo& prototype,
00130 const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00131 DDS::DataReader_ptr other_dr, const MetaStruct& other_meta);
00132
00133
00134
00135
00136 void cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00137 const TopicSet& seen, const QueryPlan& qp);
00138
00139
00140
00141
00142
00143 void combine(SampleVec& resulting, const SampleVec& other,
00144 const std::vector<OPENDDS_STRING>& key_names,
00145 const TopicSet& other_topics);
00146
00147
00148
00149
00150
00151 void assign_resulting_fields(Sample& target, const Sample& source,
00152 const TopicSet& other_topics);
00153
00154 struct GenericData {
00155 explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
00156 : meta_(meta), ptr_(doAlloc ? meta.allocate() : NULL) {}
00157 ~GenericData() { meta_.deallocate(ptr_); }
00158 const MetaStruct& meta_;
00159 void* ptr_;
00160 };
00161
00162 struct Contains {
00163 const OPENDDS_STRING& look_for_;
00164 explicit Contains(const OPENDDS_STRING& s) : look_for_(s) {}
00165 bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e)
00166 const {
00167 return e.first.count(look_for_);
00168 }
00169 };
00170
00171 typename TypedDataReader::Interface::_var_type typed_reader_;
00172 };
00173
00174
00175 }
00176 }
00177
00178 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00179 #include "dds/DCPS/MultiTopicDataReader_T.cpp"
00180 #endif
00181
00182 #endif
00183 #endif