MultiTopicDataReaderBase.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00010 
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012 
00013 #include "dds/DdsDcpsSubscriptionExtC.h"
00014 #include "dds/DCPS/ZeroCopySeq_T.h"
00015 #include "dds/DCPS/MultiTopicImpl.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017 #include "dds/DCPS/unique_ptr.h"
00018 
00019 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00020 #pragma once
00021 #endif /* ACE_LACKS_PRAGMA_ONCE */
00022 
00023 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00024 
00025 namespace OpenDDS {
00026 namespace DCPS {
00027 
00028 class SubscriberImpl;
00029 
00030 class OpenDDS_Dcps_Export MultiTopicDataReaderBase
00031   : public virtual LocalObject<DataReaderEx> {
00032 public:
00033   MultiTopicDataReaderBase() {}
00034 
00035   void init(const DDS::DataReaderQos& dr_qos,
00036     DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00037     SubscriberImpl* parent, MultiTopicImpl* multitopic);
00038 
00039   void data_available(DDS::DataReader_ptr reader);
00040 
00041   // used by the SubscriberImpl
00042 
00043   void set_status_changed_flag(DDS::StatusKind status, bool flag);
00044   bool have_sample_states(DDS::SampleStateMask sample_states) const;
00045   void cleanup();
00046 
00047   // DDS::Entity interface
00048 
00049   DDS::InstanceHandle_t get_instance_handle();
00050 
00051   DDS::ReturnCode_t enable();
00052 
00053   DDS::StatusCondition_ptr get_statuscondition();
00054 
00055   DDS::StatusMask get_status_changes();
00056 
00057   // DDS::DataReader interface
00058 
00059   DDS::ReadCondition_ptr create_readcondition(
00060     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00061     DDS::InstanceStateMask instance_states);
00062 
00063 #ifndef OPENDDS_NO_QUERY_CONDITION
00064   DDS::QueryCondition_ptr create_querycondition(
00065     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00066     DDS::InstanceStateMask instance_states, const char* query_expression,
00067     const DDS::StringSeq& query_parameters);
00068 #endif
00069 
00070   DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition);
00071 
00072   DDS::ReturnCode_t delete_contained_entities();
00073 
00074   DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos);
00075 
00076   DDS::ReturnCode_t get_qos(DDS::DataReaderQos& qos);
00077 
00078   DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener,
00079     DDS::StatusMask mask);
00080 
00081   DDS::DataReaderListener_ptr get_listener();
00082 
00083   DDS::TopicDescription_ptr get_topicdescription();
00084 
00085   DDS::Subscriber_ptr get_subscriber();
00086 
00087   DDS::ReturnCode_t get_sample_rejected_status(
00088     DDS::SampleRejectedStatus& status);
00089 
00090   DDS::ReturnCode_t get_liveliness_changed_status(
00091     DDS::LivelinessChangedStatus& status);
00092 
00093   DDS::ReturnCode_t get_requested_deadline_missed_status(
00094     DDS::RequestedDeadlineMissedStatus& status);
00095 
00096   DDS::ReturnCode_t get_requested_incompatible_qos_status(
00097     DDS::RequestedIncompatibleQosStatus& status);
00098 
00099   DDS::ReturnCode_t get_subscription_matched_status(
00100     DDS::SubscriptionMatchedStatus& status);
00101 
00102   DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status);
00103 
00104   DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait);
00105 
00106   DDS::ReturnCode_t get_matched_publications(
00107     DDS::InstanceHandleSeq& publication_handles);
00108 
00109 #ifndef DDS_HAS_MINIMUM_BIT
00110   DDS::ReturnCode_t get_matched_publication_data(
00111     DDS::PublicationBuiltinTopicData& publication_data,
00112     DDS::InstanceHandle_t publication_handle);
00113 #endif
00114 
00115   // OpenDDS::DCPS::DataReaderEx interface
00116 
00117   void get_latency_stats(LatencyStatisticsSeq& stats);
00118 
00119   void reset_latency_stats();
00120 
00121   CORBA::Boolean statistics_enabled();
00122 
00123   void statistics_enabled(CORBA::Boolean statistics_enabled);
00124 
00125 private:
00126   virtual void init_typed(DataReaderEx* dr) = 0;
00127   virtual const MetaStruct& getResultingMeta() = 0;
00128   virtual void incoming_sample(void* sample, const DDS::SampleInfo& info,
00129                                const char* topic, const MetaStruct& meta) = 0;
00130 
00131   unique_ptr<OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> > listener_;
00132   DataReaderEx_var resulting_reader_;
00133 
00134 protected:
00135 
00136   OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr);
00137   const MetaStruct& metaStructFor(DDS::DataReader_ptr dr);
00138 
00139   typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec;
00140 
00141   struct QueryPlan {
00142     DDS::DataReader_var data_reader_;
00143     std::vector<SubjectFieldSpec> projection_;
00144     std::vector<OPENDDS_STRING> keys_projected_out_;
00145     std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_; // topic -> key
00146     std::set<std::pair<DDS::InstanceHandle_t /*of this data_reader_*/,
00147       DDS::InstanceHandle_t /*of the resulting DR*/> > instances_;
00148   };
00149 
00150   // key: topicName for this reader
00151   OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_;
00152 
00153   OPENDDS_DELETED_COPY_MOVE_CTOR_ASSIGN(MultiTopicDataReaderBase)
00154 };
00155 
00156 }
00157 }
00158 
00159 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00160 
00161 #endif
00162 #endif
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1