Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H 9 : #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H 10 : 11 : #ifndef OPENDDS_NO_MULTI_TOPIC 12 : 13 : #include "dds/DdsDcpsSubscriptionExtC.h" 14 : #include "ZeroCopySeq_T.h" 15 : #include "MultiTopicImpl.h" 16 : #include "PoolAllocator.h" 17 : #include "unique_ptr.h" 18 : 19 : #if !defined (ACE_LACKS_PRAGMA_ONCE) 20 : #pragma once 21 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 22 : 23 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 24 : 25 : namespace OpenDDS { 26 : namespace DCPS { 27 : 28 : class SubscriberImpl; 29 : 30 : class OpenDDS_Dcps_Export MultiTopicDataReaderBase 31 : : public virtual LocalObject<DataReaderEx> { 32 : public: 33 0 : MultiTopicDataReaderBase() {} 34 : 35 : void init(const DDS::DataReaderQos& dr_qos, 36 : DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask, 37 : SubscriberImpl* parent, MultiTopicImpl* multitopic); 38 : 39 : void data_available(DDS::DataReader_ptr reader); 40 : 41 : // used by the SubscriberImpl 42 : 43 : void set_status_changed_flag(DDS::StatusKind status, bool flag); 44 : bool have_sample_states(DDS::SampleStateMask sample_states) const; 45 : void cleanup(); 46 : 47 : // DDS::Entity interface 48 : 49 : DDS::InstanceHandle_t get_instance_handle(); 50 : 51 : DDS::ReturnCode_t enable(); 52 : 53 : DDS::StatusCondition_ptr get_statuscondition(); 54 : 55 : DDS::StatusMask get_status_changes(); 56 : 57 : // DDS::DataReader interface 58 : 59 : DDS::ReadCondition_ptr create_readcondition( 60 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, 61 : DDS::InstanceStateMask instance_states); 62 : 63 : #ifndef OPENDDS_NO_QUERY_CONDITION 64 : DDS::QueryCondition_ptr create_querycondition( 65 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, 66 : DDS::InstanceStateMask instance_states, const char* query_expression, 67 : const DDS::StringSeq& query_parameters); 68 : #endif 69 : 70 : DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition); 71 : 72 : DDS::ReturnCode_t delete_contained_entities(); 73 : 74 : DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos); 75 : 76 : DDS::ReturnCode_t get_qos(DDS::DataReaderQos& qos); 77 : 78 : DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener, 79 : DDS::StatusMask mask); 80 : 81 : DDS::DataReaderListener_ptr get_listener(); 82 : 83 : DDS::TopicDescription_ptr get_topicdescription(); 84 : 85 : DDS::Subscriber_ptr get_subscriber(); 86 : 87 : DDS::ReturnCode_t get_sample_rejected_status( 88 : DDS::SampleRejectedStatus& status); 89 : 90 : DDS::ReturnCode_t get_liveliness_changed_status( 91 : DDS::LivelinessChangedStatus& status); 92 : 93 : DDS::ReturnCode_t get_requested_deadline_missed_status( 94 : DDS::RequestedDeadlineMissedStatus& status); 95 : 96 : DDS::ReturnCode_t get_requested_incompatible_qos_status( 97 : DDS::RequestedIncompatibleQosStatus& status); 98 : 99 : DDS::ReturnCode_t get_subscription_matched_status( 100 : DDS::SubscriptionMatchedStatus& status); 101 : 102 : DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status); 103 : 104 : DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait); 105 : 106 : DDS::ReturnCode_t get_matched_publications( 107 : DDS::InstanceHandleSeq& publication_handles); 108 : 109 : #ifndef DDS_HAS_MINIMUM_BIT 110 : DDS::ReturnCode_t get_matched_publication_data( 111 : DDS::PublicationBuiltinTopicData& publication_data, 112 : DDS::InstanceHandle_t publication_handle); 113 : #endif 114 : 115 : // OpenDDS::DCPS::DataReaderEx interface 116 : 117 : void get_latency_stats(LatencyStatisticsSeq& stats); 118 : 119 : void reset_latency_stats(); 120 : 121 : CORBA::Boolean statistics_enabled(); 122 : 123 : void statistics_enabled(CORBA::Boolean statistics_enabled); 124 : 125 : private: 126 : virtual void init_typed(DataReaderEx* dr) = 0; 127 : virtual const MetaStruct& getResultingMeta() = 0; 128 : virtual void incoming_sample(void* sample, const DDS::SampleInfo& info, 129 : const char* topic, const MetaStruct& meta) = 0; 130 : 131 : unique_ptr<OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> > listener_; 132 : DataReaderEx_var resulting_reader_; 133 : 134 : protected: 135 : 136 : OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr); 137 : const MetaStruct& metaStructFor(DDS::DataReader_ptr dr); 138 : 139 : typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec; 140 : 141 : struct QueryPlan { 142 : DDS::DataReader_var data_reader_; 143 : std::vector<SubjectFieldSpec> projection_; 144 : std::vector<OPENDDS_STRING> keys_projected_out_; 145 : std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_; // topic -> key 146 : std::set<std::pair<DDS::InstanceHandle_t /*of this data_reader_*/, 147 : DDS::InstanceHandle_t /*of the resulting DR*/> > instances_; 148 : }; 149 : mutable ACE_RW_Thread_Mutex qp_lock_; 150 : 151 : // key: topicName for this reader 152 : OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_; 153 : 154 : OPENDDS_DELETED_COPY_MOVE_CTOR_ASSIGN(MultiTopicDataReaderBase) 155 : }; 156 : 157 : } 158 : } 159 : 160 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 161 : 162 : #endif 163 : #endif