OpenDDS  Snapshot(2023/04/28-20:55)
MultiTopicDataReaderBase.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
9 #define OPENDDS_DCPS_MULTITOPICDATAREADERBASE_H
10 
11 #ifndef OPENDDS_NO_MULTI_TOPIC
12 
13 #include "dds/DdsDcpsSubscriptionExtC.h"
14 #include "ZeroCopySeq_T.h"
15 #include "MultiTopicImpl.h"
16 #include "PoolAllocator.h"
17 #include "unique_ptr.h"
18 
19 #if !defined (ACE_LACKS_PRAGMA_ONCE)
20 #pragma once
21 #endif /* ACE_LACKS_PRAGMA_ONCE */
22 
24 
25 namespace OpenDDS {
26 namespace DCPS {
27 
28 class SubscriberImpl;
29 
31  : public virtual LocalObject<DataReaderEx> {
32 public:
34 
35  void init(const DDS::DataReaderQos& dr_qos,
36  DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
37  SubscriberImpl* parent, MultiTopicImpl* multitopic);
38 
39  void data_available(DDS::DataReader_ptr reader);
40 
41  // used by the SubscriberImpl
42 
43  void set_status_changed_flag(DDS::StatusKind status, bool flag);
44  bool have_sample_states(DDS::SampleStateMask sample_states) const;
45  void cleanup();
46 
47  // DDS::Entity interface
48 
49  DDS::InstanceHandle_t get_instance_handle();
50 
51  DDS::ReturnCode_t enable();
52 
53  DDS::StatusCondition_ptr get_statuscondition();
54 
55  DDS::StatusMask get_status_changes();
56 
57  // DDS::DataReader interface
58 
59  DDS::ReadCondition_ptr create_readcondition(
62 
63 #ifndef OPENDDS_NO_QUERY_CONDITION
64  DDS::QueryCondition_ptr create_querycondition(
65  DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
66  DDS::InstanceStateMask instance_states, const char* query_expression,
67  const DDS::StringSeq& query_parameters);
68 #endif
69 
70  DDS::ReturnCode_t delete_readcondition(DDS::ReadCondition_ptr a_condition);
71 
72  DDS::ReturnCode_t delete_contained_entities();
73 
74  DDS::ReturnCode_t set_qos(const DDS::DataReaderQos& qos);
75 
77 
78  DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener,
79  DDS::StatusMask mask);
80 
81  DDS::DataReaderListener_ptr get_listener();
82 
83  DDS::TopicDescription_ptr get_topicdescription();
84 
85  DDS::Subscriber_ptr get_subscriber();
86 
87  DDS::ReturnCode_t get_sample_rejected_status(
89 
90  DDS::ReturnCode_t get_liveliness_changed_status(
92 
93  DDS::ReturnCode_t get_requested_deadline_missed_status(
95 
96  DDS::ReturnCode_t get_requested_incompatible_qos_status(
98 
99  DDS::ReturnCode_t get_subscription_matched_status(
101 
102  DDS::ReturnCode_t get_sample_lost_status(DDS::SampleLostStatus& status);
103 
104  DDS::ReturnCode_t wait_for_historical_data(const DDS::Duration_t& max_wait);
105 
106  DDS::ReturnCode_t get_matched_publications(
107  DDS::InstanceHandleSeq& publication_handles);
108 
109 #ifndef DDS_HAS_MINIMUM_BIT
110  DDS::ReturnCode_t get_matched_publication_data(
111  DDS::PublicationBuiltinTopicData& publication_data,
112  DDS::InstanceHandle_t publication_handle);
113 #endif
114 
115  // OpenDDS::DCPS::DataReaderEx interface
116 
117  void get_latency_stats(LatencyStatisticsSeq& stats);
118 
119  void reset_latency_stats();
120 
121  CORBA::Boolean statistics_enabled();
122 
123  void statistics_enabled(CORBA::Boolean statistics_enabled);
124 
125 private:
126  virtual void init_typed(DataReaderEx* dr) = 0;
127  virtual const MetaStruct& getResultingMeta() = 0;
128  virtual void incoming_sample(void* sample, const DDS::SampleInfo& info,
129  const char* topic, const MetaStruct& meta) = 0;
130 
132  DataReaderEx_var resulting_reader_;
133 
134 protected:
135 
136  OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr);
137  const MetaStruct& metaStructFor(DDS::DataReader_ptr dr);
138 
140 
141  struct QueryPlan {
142  DDS::DataReader_var data_reader_;
143  std::vector<SubjectFieldSpec> projection_;
144  std::vector<OPENDDS_STRING> keys_projected_out_;
145  std::multimap<OPENDDS_STRING, OPENDDS_STRING> adjacent_joins_; // topic -> key
146  std::set<std::pair<DDS::InstanceHandle_t /*of this data_reader_*/,
147  DDS::InstanceHandle_t /*of the resulting DR*/> > instances_;
148  };
150 
151  // key: topicName for this reader
152  OPENDDS_MAP(OPENDDS_STRING, QueryPlan) query_plans_;
153 
155 };
156 
157 }
158 }
159 
161 
162 #endif
163 #endif
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
#define OPENDDS_DELETED_COPY_MOVE_CTOR_ASSIGN(CLASS)
Definition: Definitions.h:35
MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec
std::set< std::pair< DDS::InstanceHandle_t, DDS::InstanceHandle_t > > instances_
unsigned long InstanceStateMask
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
sequence< LatencyStatistics > LatencyStatisticsSeq
ACE_CDR::Boolean Boolean
int init(void)
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
unsigned long StatusMask
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
unsigned long StatusKind
std::multimap< OPENDDS_STRING, OPENDDS_STRING > adjacent_joins_
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
unique_ptr< OpenDDS::DCPS::LocalObject< DDS::DataReaderListener > > listener_
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50