00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADER_T_H
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include "dds/DCPS/MultiTopicDataReaderBase.h"
00014
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 #pragma once
00017 #endif
00018
00019 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00020
00021 namespace OpenDDS {
00022 namespace DCPS {
00023
00024 template<typename Sample, typename TypedDataReader>
00025 class MultiTopicDataReader_T
00026 : public virtual LocalObject<typename TypedDataReader::Interface>
00027 , public virtual MultiTopicDataReaderBase {
00028 public:
00029 typedef TAO::DCPS::ZeroCopyDataSeq<Sample> SampleSeq;
00030
00031 MultiTopicDataReader_T() {}
00032
00033 void init_typed(DataReaderEx* dr);
00034 const MetaStruct& getResultingMeta();
00035 void incoming_sample(void* sample, const DDS::SampleInfo& info,
00036 const char* topic, const MetaStruct& meta);
00037
00038 DDS::ReturnCode_t read(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00039 CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00040 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00041
00042 DDS::ReturnCode_t take(SampleSeq& received_data, DDS::SampleInfoSeq& info_seq,
00043 CORBA::Long max_samples, DDS::SampleStateMask sample_states,
00044 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00045
00046 DDS::ReturnCode_t read_w_condition(SampleSeq& data_values,
00047 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00048 DDS::ReadCondition_ptr a_condition);
00049
00050 DDS::ReturnCode_t take_w_condition(SampleSeq& data_values,
00051 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00052 DDS::ReadCondition_ptr a_condition);
00053
00054 DDS::ReturnCode_t read_next_sample(Sample& received_data,
00055 DDS::SampleInfo& sample_info);
00056
00057 DDS::ReturnCode_t take_next_sample(Sample& received_data,
00058 DDS::SampleInfo& sample_info);
00059
00060 DDS::ReturnCode_t read_instance(SampleSeq& received_data,
00061 DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00062 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00063 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00064
00065 DDS::ReturnCode_t take_instance(SampleSeq& received_data,
00066 DDS::SampleInfoSeq & info_seq, CORBA::Long max_samples,
00067 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00068 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00069
00070 DDS::ReturnCode_t read_instance_w_condition(SampleSeq& data_values,
00071 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00072 DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
00073
00074 DDS::ReturnCode_t take_instance_w_condition(SampleSeq& data_values,
00075 DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00076 DDS::InstanceHandle_t handle, DDS::ReadCondition_ptr a_condition);
00077
00078 DDS::ReturnCode_t read_next_instance(SampleSeq& received_data,
00079 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00080 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00081 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00082
00083 DDS::ReturnCode_t take_next_instance(SampleSeq& received_data,
00084 DDS::SampleInfoSeq& info_seq, CORBA::Long max_samples,
00085 DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states,
00086 DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states);
00087
00088 DDS::ReturnCode_t read_next_instance_w_condition(SampleSeq& data_values,
00089 DDS::SampleInfoSeq& sample_infos, CORBA::Long max_samples,
00090 DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00091
00092 DDS::ReturnCode_t take_next_instance_w_condition(SampleSeq& data_values,
00093 DDS::SampleInfoSeq & sample_infos, CORBA::Long max_samples,
00094 DDS::InstanceHandle_t previous_handle, DDS::ReadCondition_ptr a_condition);
00095
00096 DDS::ReturnCode_t return_loan(SampleSeq& received_data,
00097 DDS::SampleInfoSeq& info_seq);
00098
00099 DDS::ReturnCode_t get_key_value(Sample& key_holder,
00100 DDS::InstanceHandle_t handle);
00101
00102 DDS::InstanceHandle_t lookup_instance(const Sample& instance_data);
00103
00104 private:
00105
00106 struct SampleWithInfo {
00107 SampleWithInfo(const OPENDDS_STRING& topic, const DDS::SampleInfo& sampinfo)
00108 : sample_(),
00109 view_(sampinfo.view_state) {
00110 info_[topic] = sampinfo.instance_handle;
00111 }
00112 void combine(const SampleWithInfo& other) {
00113 info_.insert(other.info_.begin(), other.info_.end());
00114 if (other.view_ == DDS::NEW_VIEW_STATE) view_ = DDS::NEW_VIEW_STATE;
00115 }
00116 Sample sample_;
00117 DDS::ViewStateKind view_;
00118 OPENDDS_MAP(OPENDDS_STRING, DDS::InstanceHandle_t) info_;
00119 };
00120
00121 typedef std::vector<SampleWithInfo> SampleVec;
00122 typedef std::set<OPENDDS_STRING> TopicSet;
00123
00124
00125
00126
00127 void assign_fields(void* incoming, Sample& resulting, const QueryPlan& qp,
00128 const MetaStruct& meta);
00129
00130
00131 void process_joins(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00132 SampleVec starting, const TopicSet& seen,
00133 const QueryPlan& qp);
00134
00135
00136
00137
00138
00139 void join(SampleVec& resulting, const SampleWithInfo& prototype,
00140 const std::vector<OPENDDS_STRING>& key_names, const void* key_data,
00141 DDS::DataReader_ptr other_dr, const MetaStruct& other_meta);
00142
00143
00144
00145
00146 void cross_join(OPENDDS_MAP(TopicSet, SampleVec)& partialResults,
00147 const TopicSet& seen, const QueryPlan& qp);
00148
00149
00150
00151
00152
00153 void combine(SampleVec& resulting, const SampleVec& other,
00154 const std::vector<OPENDDS_STRING>& key_names,
00155 const TopicSet& other_topics);
00156
00157
00158
00159
00160
00161 void assign_resulting_fields(Sample& target, const Sample& source,
00162 const TopicSet& other_topics);
00163
00164 struct GenericData {
00165 explicit GenericData(const MetaStruct& meta, bool doAlloc = true)
00166 : meta_(meta), ptr_(doAlloc ? meta.allocate() : NULL) {}
00167 ~GenericData() { meta_.deallocate(ptr_); }
00168 const MetaStruct& meta_;
00169 void* ptr_;
00170 };
00171
00172 struct Contains {
00173 const OPENDDS_STRING& look_for_;
00174 explicit Contains(const OPENDDS_STRING& s) : look_for_(s) {}
00175 bool operator()(const std::pair<const std::set<OPENDDS_STRING>, SampleVec>& e)
00176 const {
00177 return e.first.count(look_for_);
00178 }
00179 };
00180
00181 typename TypedDataReader::Interface::_var_type typed_reader_;
00182 };
00183
00184 }
00185 }
00186
00187 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00188
00189 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00190 #include "dds/DCPS/MultiTopicDataReader_T.cpp"
00191 #endif
00192
00193 #endif
00194 #endif