MultiTopicDataReaderBase.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00010
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012
00013 #include "dds/DdsDcpsSubscriptionExtC.h"
00014 #include "dds/DCPS/ZeroCopySeq_T.h"
00015 #include "dds/DCPS/MultiTopicImpl.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017 #include "dds/DCPS/unique_ptr.h"
00018
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 #pragma once
00021 #endif
00022
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024
00025 namespace OpenDDS {
00026 namespace DCPS {
00027
00028 class SubscriberImpl;
00029
00030 class OpenDDS_Dcps_Export MultiTopicDataReaderBase
00031 : public virtual LocalObject<DataReaderEx> {
00032 public:
00033 MultiTopicDataReaderBase() {}
00034
00035 void init(const DDS::DataReaderQos& dr_qos,
00036 DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00037 SubscriberImpl* parent, MultiTopicImpl* multitopic);
00038
00039 void data_available(DDS::DataReader_ptr reader);
00040
00041
00042
00043 void set_status_changed_flag(DDS::StatusKind status, bool flag);
00044 bool have_sample_states(DDS::SampleStateMask sample_states) const;
00045 void cleanup();
00046
00047
00048
00049 DDS::InstanceHandle_t get_instance_handle();
00050
00051 DDS::ReturnCode_t enable();
00052
00053 DDS::StatusCondition_ptr get_statuscondition();
00054
00055 DDS::StatusMask get_status_changes();
00056
00057
00058
00059 DDS::ReadCondition_ptr create_readcondition(
00060 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00061 DDS::InstanceStateMask instance_states);
00062
00063 #ifndef OPENDDS_NO_QUERY_CONDITION
00064 DDS::QueryCondition_ptr create_querycondition(
00065 DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00066 DDS::InstanceStateMask instance_states, const char* query_expression,
00067 const DDS::StringSeq& query_parameters);
00068 #endif
00069
00070 DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition);
00071
00072 DDS::ReturnCode_t delete_contained_entities();
00073
00074 DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos);
00075
00076 DDS::ReturnCode_t get_qos(DDS::DataReaderQos& qos);
00077
00078 DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener,
00079 DDS::StatusMask mask);
00080
00081 DDS::DataReaderListener_ptr get_listener();
00082
00083 DDS::TopicDescription_ptr get_topicdescription();
00084
00085 DDS::Subscriber_ptr get_subscriber();
00086
00087 DDS::ReturnCode_t get_sample_rejected_status(
00088 DDS::SampleRejectedStatus& status);
00089
00090 DDS::ReturnCode_t get_liveliness_changed_status(
00091 DDS::LivelinessChangedStatus& status);
00092
00093 DDS::ReturnCode_t get_requested_deadline_missed_status(
00094 DDS::RequestedDeadlineMissedStatus& status);
00095
00096 DDS::ReturnCode_t get_requested_incompatible_qos_status(
00097 DDS::RequestedIncompatibleQosStatus& status);
00098
00099 DDS::ReturnCode_t get_subscription_matched_status(
00100 DDS::SubscriptionMatchedStatus& status);
00101
00102 DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status);
00103
00104 DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait);
00105
00106 DDS::ReturnCode_t get_matched_publications(
00107 DDS::InstanceHandleSeq& publication_handles);
00108
00109 #ifndef DDS_HAS_MINIMUM_BIT
00110 DDS::ReturnCode_t get_matched_publication_data(
00111 DDS::PublicationBuiltinTopicData& publication_data,
00112 DDS::InstanceHandle_t publication_handle);
00113 #endif
00114
00115
00116
00117 void get_latency_stats(LatencyStatisticsSeq& stats);
00118
00119 void reset_latency_stats();
00120
00121 CORBA::Boolean statistics_enabled();
00122
00123 void statistics_enabled(CORBA::Boolean statistics_enabled);
00124
00125 private:
00126 virtual void init_typed(DataReaderEx* dr) = 0;
00127 virtual const MetaStruct& getResultingMeta() = 0;
00128 virtual void incoming_sample(void* sample, const DDS::SampleInfo& info,
00129 const char* topic, const MetaStruct& meta) = 0;
00130
00131 unique_ptr<OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> > listener_;
00132 DataReaderEx_var resulting_reader_;
00133
00134 protected:
00135
00136 OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr);
00137 const MetaStruct& metaStructFor(DDS::DataReader_ptr dr);
00138
00139 typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec;
00140
00141 struct QueryPlan {
00142 DDS::DataReader_var data_reader_;
00143 std::vector<SubjectFieldSpec> projection_;
00144 std::vector<OPENDDS_STRING> keys_projected_out_;
00145 std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_;
00146 std::set<std::pair<DDS::InstanceHandle_t ,
00147 DDS::InstanceHandle_t > > instances_;
00148 };
00149
00150
00151 OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_;
00152
00153 OPENDDS_DELETED_COPY_MOVE_CTOR_ASSIGN(MultiTopicDataReaderBase)
00154 };
00155
00156 }
00157 }
00158
00159 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00160
00161 #endif
00162 #endif