OpenDDS  Snapshot(2023/04/28-20:55)
MultiTopicDataReaderBase.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 #ifndef OPENDDS_NO_MULTI_TOPIC
10 
12 
13 #include "DomainParticipantImpl.h"
14 #include "Marked_Default_Qos.h"
15 #include "SubscriberImpl.h"
16 #include "TypeSupportImpl.h"
17 #include "DCPS_Utils.h"
18 
19 #include <stdexcept>
20 
21 namespace {
22  struct MatchesIncomingName { // predicate for std::find_if()
23  explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
24 
25  bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs) const {
26  return sfs.incoming_name_ == look_for_;
27  }
28 
29  const OPENDDS_STRING& look_for_;
30  };
31 
32  class Listener
33  : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
34  public:
35  explicit Listener(OpenDDS::DCPS::MultiTopicDataReaderBase* outer)
36  : outer_(outer)
37  {}
38 
39  void on_requested_deadline_missed(DDS::DataReader_ptr /*reader*/,
40  const DDS::RequestedDeadlineMissedStatus& /*status*/){}
41 
42  void on_requested_incompatible_qos(DDS::DataReader_ptr /*reader*/,
43  const DDS::RequestedIncompatibleQosStatus& /*status*/){}
44 
45  void on_sample_rejected(DDS::DataReader_ptr /*reader*/,
46  const DDS::SampleRejectedStatus& /*status*/){}
47 
48  void on_liveliness_changed(DDS::DataReader_ptr /*reader*/,
49  const DDS::LivelinessChangedStatus& /*status*/){}
50 
51  void on_data_available(DDS::DataReader_ptr reader){
52  try {
53  outer_->data_available(reader);
54  } catch (std::exception& e) {
55  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::Listener::on_data_available: %C\n"),
56  e.what()));
57  }
58  }
59 
60  void on_subscription_matched(DDS::DataReader_ptr /*reader*/,
61  const DDS::SubscriptionMatchedStatus& /*status*/){}
62 
63  void on_sample_lost(DDS::DataReader_ptr /*reader*/,
64  const DDS::SampleLostStatus& /*status*/){}
65 
66  /// Increment the reference count.
67  virtual void _add_ref (void){
68  outer_->_add_ref();
69  }
70 
71  /// Decrement the reference count.
72  virtual void _remove_ref (void){
73  outer_->_remove_ref();
74  }
75 
76  /// Get the refcount
77  virtual CORBA::ULong _refcount_value (void) const{
78  return outer_->_refcount_value();
79  }
80 
81  private:
83  };
84 }
85 
87 
88 namespace OpenDDS {
89 namespace DCPS {
90 
91 void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
92  DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
93  SubscriberImpl* parent, MultiTopicImpl* multitopic)
94 {
95  using namespace std;
96  DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
97  resulting_reader_ = DataReaderEx::_narrow(dr);
98  DataReaderImpl* resulting_impl =
99  dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
100 
101  if (!resulting_impl) {
102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
103  ACE_TEXT("Failed to get DataReaderImpl.\n")));
104  return;
105  }
106 
107  resulting_impl->enable_multi_topic(multitopic);
108  resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
109  resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
110 
111  DDS::DomainParticipant_var participant = parent->get_participant();
112  DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
113  if (!dpi) {
114  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
115  ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
116  return;
117  }
118  resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
119 
120  init_typed(resulting_reader_);
121 
122  std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
123 
124  // key: name of field that's a key for the 'join'
125  // mapped: set of topicNames that have this key in common
126  std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
127 
128  listener_.reset(new Listener(this));
129 
130  const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
131  for (size_t i = 0; i < selection.size(); ++i) {
132 
133  const DDS::Duration_t no_wait = {0, 0};
134  DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
135  if (!t.in()) {
136  throw runtime_error("Topic: " + selection[i] + " not found.");
137  }
138 
139 
140  QueryPlan& qp = query_plans_[selection[i]];
141  {
142  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, qp_lock_);
143  qp.data_reader_ =
145  listener_.get(), ALL_STATUS_MASK);
146  }
147  if (!qp.data_reader_.in()) {
148  throw runtime_error("Could not create incoming DataReader "
149  + selection[i]);
150  }
151 
152  try {
153  const MetaStruct& meta = metaStructFor(qp.data_reader_);
154 
155  for (const char** names = meta.getFieldNames(); *names; ++names) {
156  if (fieldToTopic.count(*names)) { // already seen this field name
157  set<OPENDDS_STRING>& topics = joinKeys[*names];
158  topics.insert(fieldToTopic[*names]);
159  topics.insert(selection[i]);
160  } else {
161  fieldToTopic[*names] = selection[i];
162  }
163  }
164  } catch (const std::runtime_error& e) {
165  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: %C\n"), e.what()));
166  throw std::runtime_error("Failed to obtain metastruct for incoming.");
167  }
168  }
169 
170  const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
171  if (aggregation.size() == 0) { // "SELECT * FROM ..."
172  const MetaStruct& meta = getResultingMeta();
173  for (const char** names = meta.getFieldNames(); *names; ++names) {
174  std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
175  fieldToTopic.find(*names);
176  if (found == fieldToTopic.end()) {
177  if (DCPS_debug_level > 1) {
179  ACE_TEXT("(%P|%t) WARNING: ")
180  ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
181  ACE_TEXT("resulting field %C has no corresponding ")
182  ACE_TEXT("incoming field.\n"), *names));
183  }
184  } else {
185  query_plans_[found->second].projection_.push_back(SubjectFieldSpec(*names));
186  }
187  }
188  } else { // "SELECT A, B FROM ..."
189  for (size_t i = 0; i < aggregation.size(); ++i) {
190  std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
191  fieldToTopic.find(aggregation[i].incoming_name_);
192  if (found == fieldToTopic.end()) {
193  throw std::runtime_error("Projected field " +
194  aggregation[i].incoming_name_ + " has no incoming field.");
195  } else {
196  query_plans_[found->second].projection_.push_back(aggregation[i]);
197  }
198  }
199  }
200 
201  typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
202  for (iter_t it = joinKeys.begin(); it != joinKeys.end(); ++it) {
203  const OPENDDS_STRING& field = it->first;
204  const set<OPENDDS_STRING>& topics = it->second;
205  for (set<OPENDDS_STRING>::const_iterator it2 = topics.begin(); it2 != topics.end(); ++it2) {
206  const OPENDDS_STRING& topic = *it2;
207  QueryPlan& qp = query_plans_[topic];
208  if (find_if(qp.projection_.begin(), qp.projection_.end(), MatchesIncomingName(field))
209  == qp.projection_.end()) {
210  qp.keys_projected_out_.push_back(field);
211  }
212  for (set<OPENDDS_STRING>::const_iterator it3 = topics.begin(); it3 != topics.end(); ++it3) {
213  if (topic != *it3) { // other topics
214  qp.adjacent_joins_.insert(make_pair(*it3, field));
215  }
216  }
217  }
218  }
219 }
220 
221 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
222 {
223  DDS::TopicDescription_var td = reader->get_topicdescription();
224  CORBA::String_var topic = td->get_name();
225  return topic.in();
226 }
227 
228 const MetaStruct&
229 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
230 {
231  DDS::TopicDescription_var td = reader->get_topicdescription();
232  TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
233  if (tdi) {
234  TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
235  if (ts) {
236  return ts->getMetaStructForType();
237  }
238  }
239  throw std::runtime_error("Failed to obtain type support for incoming DataReader");
240 }
241 
242 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
243 {
244  using namespace std;
245  using namespace DDS;
246 
247  const OPENDDS_STRING topic = topicNameFor(reader);
248  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
249  if (!dri) {
250  throw runtime_error("Incoming DataReader for " + topic +
251  " could not be cast to DataReaderImpl.");
252  }
254  const ReturnCode_t rc = dri->read_generic(gen,
256  if (rc == RETCODE_NO_DATA) {
257  return;
258  } else if (rc != RETCODE_OK) {
259  throw runtime_error("Incoming DataReader for " + topic +
260  " could not be read: " + retcode_to_string(rc));
261  }
262 
263  try {
264  const MetaStruct& meta = metaStructFor(reader);
265  const QueryPlan& qp = query_plans_[topic];
266  for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
267  const SampleInfo& si = gen.info_[i];
268  if (si.valid_data) {
269  incoming_sample(gen.samples_[i], si, topic.c_str(), meta);
270  } else if (si.instance_state != ALIVE_INSTANCE_STATE) {
271  DataReaderImpl* resulting_impl = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
272  if (resulting_impl) {
273  set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator iter = qp.instances_.begin();
274  while (iter != qp.instances_.end() && iter->first != si.instance_handle) ++iter;
275  for (; iter != qp.instances_.end() && iter->first == si.instance_handle; ++iter) {
276  resulting_impl->set_instance_state(iter->second, si.instance_state);
277  }
278  } else {
279  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available:")
280  ACE_TEXT(" failed to obtain DataReaderImpl.\n")));
281  }
282  }
283  }
284  } catch (const std::runtime_error& e) {
285  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available: %C\n"), e.what()));
286  }
287 }
288 
289 void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
290  bool flag)
291 {
292  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
293  if (dri) {
294  dri->set_status_changed_flag(status, flag);
295  }
296 }
297 
298 bool MultiTopicDataReaderBase::have_sample_states(
300 {
301  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
302  if (dri) {
303  return dri->have_sample_states(sample_states);
304  } else {
305  return false;
306  }
307 }
308 
309 void MultiTopicDataReaderBase::cleanup()
310 {
311  DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
312  DDS::DomainParticipant_var participant = sub->get_participant();
313  for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
314  it != query_plans_.end(); ++it) {
315  const DDS::TopicDescription_var topicDescr = it->second.data_reader_->get_topicdescription();
316  const DDS::Topic_var topic = DDS::Topic::_narrow(topicDescr);
317  sub->delete_datareader(it->second.data_reader_);
318  participant->delete_topic(topic);
319  }
320  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
321  SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
322  if (dri) {
323  if (si) {
325  }
326  dri->cleanup();
327  }
328 }
329 
330 DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
331 {
332  return resulting_reader_->get_instance_handle();
333 }
334 
335 DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
336 {
337  return resulting_reader_->enable();
338 }
339 
340 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
341 {
342  return resulting_reader_->get_statuscondition();
343 }
344 
345 DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
346 {
347  return resulting_reader_->get_status_changes();
348 }
349 
350 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
353 {
354  return resulting_reader_->create_readcondition(sample_states, view_states,
355  instance_states);
356 }
357 
358 #ifndef OPENDDS_NO_QUERY_CONDITION
359 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
361  DDS::InstanceStateMask instance_states, const char* query_expression,
362  const DDS::StringSeq& query_parameters)
363 {
364  return resulting_reader_->create_querycondition(sample_states, view_states,
365  instance_states, query_expression, query_parameters);
366 }
367 #endif
368 
369 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
370  DDS::ReadCondition_ptr a_condition)
371 {
372  return resulting_reader_->delete_readcondition(a_condition);
373 }
374 
375 DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
376 {
377  return resulting_reader_->delete_contained_entities();
378 }
379 
380 DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
381  const DDS::DataReaderQos& qos)
382 {
383  return resulting_reader_->set_qos(qos);
384 }
385 
386 DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
387 {
388  return resulting_reader_->get_qos(qos);
389 }
390 
391 DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
392  DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
393 {
394  return resulting_reader_->set_listener(a_listener, mask);
395 }
396 
397 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
398 {
399  return resulting_reader_->get_listener();
400 }
401 
402 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
403 {
404  return resulting_reader_->get_topicdescription();
405 }
406 
407 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
408 {
409  return resulting_reader_->get_subscriber();
410 }
411 
412 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
414 {
415  return resulting_reader_->get_sample_rejected_status(status);
416 }
417 
418 DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
420 {
421  return resulting_reader_->get_liveliness_changed_status(status);
422 }
423 
424 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
426 {
427  return resulting_reader_->get_requested_deadline_missed_status(status);
428 }
429 
430 DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
432 {
433  return resulting_reader_->get_requested_incompatible_qos_status(status);
434 }
435 
436 DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
438 {
439  return resulting_reader_->get_subscription_matched_status(status);
440 }
441 
442 DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
443  DDS::SampleLostStatus& status)
444 {
445  return resulting_reader_->get_sample_lost_status(status);
446 }
447 
448 DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
449  const DDS::Duration_t& max_wait)
450 {
451  return resulting_reader_->wait_for_historical_data(max_wait);
452 }
453 
454 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
455  DDS::InstanceHandleSeq& publication_handles)
456 {
457  return resulting_reader_->get_matched_publications(publication_handles);
458 }
459 
460 #ifndef DDS_HAS_MINIMUM_BIT
461 DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
462  DDS::PublicationBuiltinTopicData& publication_data,
463  DDS::InstanceHandle_t publication_handle)
464 {
465  return resulting_reader_->get_matched_publication_data(publication_data,
466  publication_handle);
467 }
468 #endif
469 
470 void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
471 {
472  resulting_reader_->get_latency_stats(stats);
473 }
474 
475 void MultiTopicDataReaderBase::reset_latency_stats()
476 {
477  resulting_reader_->reset_latency_stats();
478 }
479 
480 CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
481 {
482  return resulting_reader_->statistics_enabled();
483 }
484 
485 void MultiTopicDataReaderBase::statistics_enabled(
486  CORBA::Boolean statistics_enabled)
487 {
488  resulting_reader_->statistics_enabled(statistics_enabled);
489 }
490 
491 }
492 }
493 
495 
496 #endif
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
#define ACE_DEBUG(X)
bool have_sample_states(DDS::SampleStateMask sample_states) const
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
const DDS::StatusMask ALL_STATUS_MASK
#define ACE_ERROR(X)
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
InstanceStateKind instance_state
const std::vector< OPENDDS_STRING > & get_selection() const
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
virtual const MetaStruct & getMetaStructForType() const =0
std::set< std::pair< DDS::InstanceHandle_t, DDS::InstanceHandle_t > > instances_
const std::vector< SubjectFieldSpec > & get_aggregation() const
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
unsigned long InstanceStateMask
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
#define DATAREADER_QOS_USE_TOPIC_QOS
ACE_CDR::ULong ULong
sequence< LatencyStatistics > LatencyStatisticsSeq
InstanceHandle_t instance_handle
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
ACE_CDR::Boolean Boolean
virtual const char ** getFieldNames() const =0
STL namespace.
const InstanceStateMask ANY_INSTANCE_STATE
Implements the DDS::DataReader interface.
const ViewStateMask ANY_VIEW_STATE
void enable_multi_topic(MultiTopicImpl *mt)
LM_WARNING
The End User API.
virtual DDS::ReturnCode_t read_generic(GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)=0
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
unsigned long StatusMask
const ReturnCode_t RETCODE_NO_DATA
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual DDS::DomainParticipant_ptr get_participant()
virtual DDS::DataReader_ptr create_datareader(DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
Implements the DDS::TopicDescription interface.
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
const character_type * in(void) const
unsigned long StatusKind
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
std::multimap< OPENDDS_STRING, OPENDDS_STRING > adjacent_joins_
void remove_from_datareader_set(DataReaderImpl *reader)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
unsigned long ViewStateMask
const InstanceStateKind ALIVE_INSTANCE_STATE
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50