MultiTopicDataReaderBase.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00009 #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
00010 
00011 #ifndef OPENDDS_NO_MULTI_TOPIC
00012 
00013 #include "dds/DdsDcpsSubscriptionC.h"
00014 #include "dds/DCPS/ZeroCopySeq_T.h"
00015 #include "dds/DCPS/MultiTopicImpl.h"
00016 #include "dds/DCPS/PoolAllocator.h"
00017 
00018 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00019 #pragma once
00020 #endif /* ACE_LACKS_PRAGMA_ONCE */
00021 
00022 namespace OpenDDS {
00023 namespace DCPS {
00024 
00025 class OpenDDS_Dcps_Export MultiTopicDataReaderBase
00026   : public virtual LocalObject<DataReaderEx> {
00027 public:
00028   MultiTopicDataReaderBase() {}
00029 
00030   void init(const DDS::DataReaderQos& dr_qos,
00031     DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
00032     SubscriberImpl* parent, MultiTopicImpl* multitopic);
00033 
00034   void data_available(DDS::DataReader_ptr reader);
00035 
00036   // used by the SubscriberImpl
00037 
00038   void set_status_changed_flag(DDS::StatusKind status, bool flag);
00039   bool have_sample_states(DDS::SampleStateMask sample_states) const;
00040   void cleanup();
00041 
00042   // DDS::Entity interface
00043 
00044   DDS::InstanceHandle_t get_instance_handle();
00045 
00046   DDS::ReturnCode_t enable();
00047 
00048   DDS::StatusCondition_ptr get_statuscondition();
00049 
00050   DDS::StatusMask get_status_changes();
00051 
00052   // DDS::DataReader interface
00053 
00054   DDS::ReadCondition_ptr create_readcondition(
00055     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00056     DDS::InstanceStateMask instance_states);
00057 
00058 #ifndef OPENDDS_NO_QUERY_CONDITION
00059   DDS::QueryCondition_ptr create_querycondition(
00060     DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
00061     DDS::InstanceStateMask instance_states, const char* query_expression,
00062     const DDS::StringSeq& query_parameters);
00063 #endif
00064 
00065   DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition);
00066 
00067   DDS::ReturnCode_t delete_contained_entities();
00068 
00069   DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos);
00070 
00071   DDS::ReturnCode_t get_qos(DDS::DataReaderQos& qos);
00072 
00073   DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener,
00074     DDS::StatusMask mask);
00075 
00076   DDS::DataReaderListener_ptr get_listener();
00077 
00078   DDS::TopicDescription_ptr get_topicdescription();
00079 
00080   DDS::Subscriber_ptr get_subscriber();
00081 
00082   DDS::ReturnCode_t get_sample_rejected_status(
00083     DDS::SampleRejectedStatus& status);
00084 
00085   DDS::ReturnCode_t get_liveliness_changed_status(
00086     DDS::LivelinessChangedStatus& status);
00087 
00088   DDS::ReturnCode_t get_requested_deadline_missed_status(
00089     DDS::RequestedDeadlineMissedStatus& status);
00090 
00091   DDS::ReturnCode_t get_requested_incompatible_qos_status(
00092     DDS::RequestedIncompatibleQosStatus& status);
00093 
00094   DDS::ReturnCode_t get_subscription_matched_status(
00095     DDS::SubscriptionMatchedStatus& status);
00096 
00097   DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status);
00098 
00099   DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait);
00100 
00101   DDS::ReturnCode_t get_matched_publications(
00102     DDS::InstanceHandleSeq& publication_handles);
00103 
00104 #ifndef DDS_HAS_MINIMUM_BIT
00105   DDS::ReturnCode_t get_matched_publication_data(
00106     DDS::PublicationBuiltinTopicData& publication_data,
00107     DDS::InstanceHandle_t publication_handle);
00108 #endif
00109 
00110   // OpenDDS::DCPS::DataReaderEx interface
00111 
00112   void get_latency_stats(LatencyStatisticsSeq& stats);
00113 
00114   void reset_latency_stats();
00115 
00116   CORBA::Boolean statistics_enabled();
00117 
00118   void statistics_enabled(CORBA::Boolean statistics_enabled);
00119 
00120 private:
00121   virtual void init_typed(DataReaderEx* dr) = 0;
00122   virtual const MetaStruct& getResultingMeta() = 0;
00123   virtual void incoming_sample(void* sample, const DDS::SampleInfo& info,
00124                                const char* topic, const MetaStruct& meta) = 0;
00125 
00126   class Listener
00127     : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
00128   public:
00129     explicit Listener(MultiTopicDataReaderBase* outer)
00130       : outer_(outer)
00131     {}
00132 
00133     void on_requested_deadline_missed(DDS::DataReader_ptr reader,
00134       const DDS::RequestedDeadlineMissedStatus& status);
00135 
00136     void on_requested_incompatible_qos(DDS::DataReader_ptr reader,
00137       const DDS::RequestedIncompatibleQosStatus& status);
00138 
00139     void on_sample_rejected(DDS::DataReader_ptr reader,
00140       const DDS::SampleRejectedStatus& status);
00141 
00142     void on_liveliness_changed(DDS::DataReader_ptr reader,
00143       const DDS::LivelinessChangedStatus& status);
00144 
00145     void on_data_available(DDS::DataReader_ptr reader);
00146 
00147     void on_subscription_matched(DDS::DataReader_ptr reader,
00148       const DDS::SubscriptionMatchedStatus& status);
00149 
00150     void on_sample_lost(DDS::DataReader_ptr reader,
00151       const DDS::SampleLostStatus& status);
00152 
00153   private:
00154     MultiTopicDataReaderBase* outer_;
00155   };
00156 
00157   DDS::DataReaderListener_var listener_;
00158   DataReaderEx_var resulting_reader_;
00159 
00160 protected:
00161 
00162   OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr);
00163   const MetaStruct& metaStructFor(DDS::DataReader_ptr dr);
00164 
00165   typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec;
00166 
00167   struct QueryPlan {
00168     DDS::DataReader_var data_reader_;
00169     std::vector<SubjectFieldSpec> projection_;
00170     std::vector<OPENDDS_STRING> keys_projected_out_;
00171     std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_; // topic -> key
00172     std::set<std::pair<DDS::InstanceHandle_t /*of this data_reader_*/,
00173       DDS::InstanceHandle_t /*of the resulting DR*/> > instances_;
00174   };
00175 
00176   // key: topicName for this reader
00177   OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_;
00178 
00179   OPENDDS_DELETED_COPY_CTOR_ASSIGN(MultiTopicDataReaderBase)
00180 };
00181 
00182 }
00183 }
00184 
00185 #endif
00186 #endif

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7