00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include "dds/DdsDcpsSubscriptionC.h"
00014 #include "dds/DCPS/ZeroCopySeq_T.h"
00015 #include "dds/DCPS/MultiTopicImpl.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 #pragma once
00020 #endif
00021
00022 namespace OpenDDS {
00023 namespace DCPS {
00024
00025 class OpenDDS_Dcps_Export MultiTopicDataReaderBase
00026 : public virtual LocalObject<DataReaderEx> {
00027 public:
00028 MultiTopicDataReaderBase() {}
00029
00030 void init(const DDS::DataReaderQos& dr_qos,
00031 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00032 SubscriberImpl* parent, MultiTopicImpl* multitopic);
00033
00034 void data_available(DDS::DataReader_ptr reader);
00035
00036
00037
00038 void set_status_changed_flag(DDS::StatusKind status, bool flag);
00039 bool have_sample_states(DDS::SampleStateMask sample_states) const;
00040 void cleanup();
00041
00042
00043
00044 DDS::InstanceHandle_t get_instance_handle();
00045
00046 DDS::ReturnCode_t enable();
00047
00048 DDS::StatusCondition_ptr get_statuscondition();
00049
00050 DDS::StatusMask get_status_changes();
00051
00052
00053
00054 DDS::ReadCondition_ptr create_readcondition(
00055 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00056 DDS::InstanceStateMask instance_states);
00057
00058 #ifndef OPENDDS_NO_QUERY_CONDITION
00059 DDS::QueryCondition_ptr create_querycondition(
00060 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00061 DDS::InstanceStateMask instance_states, const char* query_expression,
00062 const DDS::StringSeq& query_parameters);
00063 #endif
00064
00065 DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition);
00066
00067 DDS::ReturnCode_t delete_contained_entities();
00068
00069 DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos);
00070
00071 DDS::ReturnCode_t get_qos(DDS::DataReaderQos& qos);
00072
00073 DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener,
00074 DDS::StatusMask mask);
00075
00076 DDS::DataReaderListener_ptr get_listener();
00077
00078 DDS::TopicDescription_ptr get_topicdescription();
00079
00080 DDS::Subscriber_ptr get_subscriber();
00081
00082 DDS::ReturnCode_t get_sample_rejected_status(
00083 DDS::SampleRejectedStatus& status);
00084
00085 DDS::ReturnCode_t get_liveliness_changed_status(
00086 DDS::LivelinessChangedStatus& status);
00087
00088 DDS::ReturnCode_t get_requested_deadline_missed_status(
00089 DDS::RequestedDeadlineMissedStatus& status);
00090
00091 DDS::ReturnCode_t get_requested_incompatible_qos_status(
00092 DDS::RequestedIncompatibleQosStatus& status);
00093
00094 DDS::ReturnCode_t get_subscription_matched_status(
00095 DDS::SubscriptionMatchedStatus& status);
00096
00097 DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status);
00098
00099 DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait);
00100
00101 DDS::ReturnCode_t get_matched_publications(
00102 DDS::InstanceHandleSeq& publication_handles);
00103
00104 #ifndef DDS_HAS_MINIMUM_BIT
00105 DDS::ReturnCode_t get_matched_publication_data(
00106 DDS::PublicationBuiltinTopicData& publication_data,
00107 DDS::InstanceHandle_t publication_handle);
00108 #endif
00109
00110
00111
00112 void get_latency_stats(LatencyStatisticsSeq& stats);
00113
00114 void reset_latency_stats();
00115
00116 CORBA::Boolean statistics_enabled();
00117
00118 void statistics_enabled(CORBA::Boolean statistics_enabled);
00119
00120 private:
00121 virtual void init_typed(DataReaderEx* dr) = 0;
00122 virtual const MetaStruct& getResultingMeta() = 0;
00123 virtual void incoming_sample(void* sample, const DDS::SampleInfo& info,
00124 const char* topic, const MetaStruct& meta) = 0;
00125
00126 class Listener
00127 : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
00128 public:
00129 explicit Listener(MultiTopicDataReaderBase* outer)
00130 : outer_(outer)
00131 {}
00132
00133 void on_requested_deadline_missed(DDS::DataReader_ptr reader,
00134 const DDS::RequestedDeadlineMissedStatus& status);
00135
00136 void on_requested_incompatible_qos(DDS::DataReader_ptr reader,
00137 const DDS::RequestedIncompatibleQosStatus& status);
00138
00139 void on_sample_rejected(DDS::DataReader_ptr reader,
00140 const DDS::SampleRejectedStatus& status);
00141
00142 void on_liveliness_changed(DDS::DataReader_ptr reader,
00143 const DDS::LivelinessChangedStatus& status);
00144
00145 void on_data_available(DDS::DataReader_ptr reader);
00146
00147 void on_subscription_matched(DDS::DataReader_ptr reader,
00148 const DDS::SubscriptionMatchedStatus& status);
00149
00150 void on_sample_lost(DDS::DataReader_ptr reader,
00151 const DDS::SampleLostStatus& status);
00152
00153 private:
00154 MultiTopicDataReaderBase* outer_;
00155 };
00156
00157 DDS::DataReaderListener_var listener_;
00158 DataReaderEx_var resulting_reader_;
00159
00160 protected:
00161
00162 OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr);
00163 const MetaStruct& metaStructFor(DDS::DataReader_ptr dr);
00164
00165 typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec;
00166
00167 struct QueryPlan {
00168 DDS::DataReader_var data_reader_;
00169 std::vector<SubjectFieldSpec> projection_;
00170 std::vector<OPENDDS_STRING> keys_projected_out_;
00171 std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_;
00172 std::set<std::pair<DDS::InstanceHandle_t ,
00173 DDS::InstanceHandle_t > > instances_;
00174 };
00175
00176
00177 OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_;
00178
00179 OPENDDS_DELETED_COPY_CTOR_ASSIGN(MultiTopicDataReaderBase)
00180 };
00181
00182 }
00183 }
00184
00185 #endif
00186 #endif