Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #ifndef OPENDDS_NO_MULTI_TOPIC
10 :
11 : #include "MultiTopicDataReaderBase.h"
12 :
13 : #include "DomainParticipantImpl.h"
14 : #include "Marked_Default_Qos.h"
15 : #include "SubscriberImpl.h"
16 : #include "TypeSupportImpl.h"
17 : #include "DCPS_Utils.h"
18 :
19 : #include <stdexcept>
20 :
21 : namespace {
22 : struct MatchesIncomingName { // predicate for std::find_if()
23 0 : explicit MatchesIncomingName(const OPENDDS_STRING& s) : look_for_(s) {}
24 :
25 0 : bool operator()(const OpenDDS::DCPS::MultiTopicImpl::SubjectFieldSpec& sfs) const {
26 0 : return sfs.incoming_name_ == look_for_;
27 : }
28 :
29 : const OPENDDS_STRING& look_for_;
30 : };
31 :
32 : class Listener
33 : : public virtual OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> {
34 : public:
35 0 : explicit Listener(OpenDDS::DCPS::MultiTopicDataReaderBase* outer)
36 0 : : outer_(outer)
37 0 : {}
38 :
39 0 : void on_requested_deadline_missed(DDS::DataReader_ptr /*reader*/,
40 0 : const DDS::RequestedDeadlineMissedStatus& /*status*/){}
41 :
42 0 : void on_requested_incompatible_qos(DDS::DataReader_ptr /*reader*/,
43 0 : const DDS::RequestedIncompatibleQosStatus& /*status*/){}
44 :
45 0 : void on_sample_rejected(DDS::DataReader_ptr /*reader*/,
46 0 : const DDS::SampleRejectedStatus& /*status*/){}
47 :
48 0 : void on_liveliness_changed(DDS::DataReader_ptr /*reader*/,
49 0 : const DDS::LivelinessChangedStatus& /*status*/){}
50 :
51 0 : void on_data_available(DDS::DataReader_ptr reader){
52 : try {
53 0 : outer_->data_available(reader);
54 0 : } catch (std::exception& e) {
55 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::Listener::on_data_available: %C\n"),
56 : e.what()));
57 0 : }
58 0 : }
59 :
60 0 : void on_subscription_matched(DDS::DataReader_ptr /*reader*/,
61 0 : const DDS::SubscriptionMatchedStatus& /*status*/){}
62 :
63 0 : void on_sample_lost(DDS::DataReader_ptr /*reader*/,
64 0 : const DDS::SampleLostStatus& /*status*/){}
65 :
66 : /// Increment the reference count.
67 0 : virtual void _add_ref (void){
68 0 : outer_->_add_ref();
69 0 : }
70 :
71 : /// Decrement the reference count.
72 0 : virtual void _remove_ref (void){
73 0 : outer_->_remove_ref();
74 0 : }
75 :
76 : /// Get the refcount
77 0 : virtual CORBA::ULong _refcount_value (void) const{
78 0 : return outer_->_refcount_value();
79 : }
80 :
81 : private:
82 : OpenDDS::DCPS::MultiTopicDataReaderBase* outer_;
83 : };
84 : }
85 :
86 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
87 :
88 : namespace OpenDDS {
89 : namespace DCPS {
90 :
91 0 : void MultiTopicDataReaderBase::init(const DDS::DataReaderQos& dr_qos,
92 : DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask,
93 : SubscriberImpl* parent, MultiTopicImpl* multitopic)
94 : {
95 : using namespace std;
96 0 : DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
97 0 : resulting_reader_ = DataReaderEx::_narrow(dr);
98 : DataReaderImpl* resulting_impl =
99 0 : dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
100 :
101 0 : if (!resulting_impl) {
102 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
103 : ACE_TEXT("Failed to get DataReaderImpl.\n")));
104 0 : return;
105 : }
106 :
107 0 : resulting_impl->enable_multi_topic(multitopic);
108 0 : resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
109 0 : resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
110 :
111 0 : DDS::DomainParticipant_var participant = parent->get_participant();
112 0 : DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
113 0 : if (!dpi) {
114 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
115 : ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
116 0 : return;
117 : }
118 0 : resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
119 :
120 0 : init_typed(resulting_reader_);
121 :
122 0 : std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
123 :
124 : // key: name of field that's a key for the 'join'
125 : // mapped: set of topicNames that have this key in common
126 0 : std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
127 :
128 0 : listener_.reset(new Listener(this));
129 :
130 0 : const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
131 0 : for (size_t i = 0; i < selection.size(); ++i) {
132 :
133 0 : const DDS::Duration_t no_wait = {0, 0};
134 0 : DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
135 0 : if (!t.in()) {
136 0 : throw runtime_error("Topic: " + selection[i] + " not found.");
137 : }
138 :
139 :
140 0 : QueryPlan& qp = query_plans_[selection[i]];
141 : {
142 0 : ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, qp_lock_);
143 : qp.data_reader_ =
144 0 : parent->create_datareader(t, DATAREADER_QOS_USE_TOPIC_QOS,
145 0 : listener_.get(), ALL_STATUS_MASK);
146 0 : }
147 0 : if (!qp.data_reader_.in()) {
148 0 : throw runtime_error("Could not create incoming DataReader "
149 0 : + selection[i]);
150 : }
151 :
152 : try {
153 0 : const MetaStruct& meta = metaStructFor(qp.data_reader_);
154 :
155 0 : for (const char** names = meta.getFieldNames(); *names; ++names) {
156 0 : if (fieldToTopic.count(*names)) { // already seen this field name
157 0 : set<OPENDDS_STRING>& topics = joinKeys[*names];
158 0 : topics.insert(fieldToTopic[*names]);
159 0 : topics.insert(selection[i]);
160 : } else {
161 0 : fieldToTopic[*names] = selection[i];
162 : }
163 : }
164 0 : } catch (const std::runtime_error& e) {
165 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: %C\n"), e.what()));
166 0 : throw std::runtime_error("Failed to obtain metastruct for incoming.");
167 0 : }
168 0 : }
169 :
170 0 : const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
171 0 : if (aggregation.size() == 0) { // "SELECT * FROM ..."
172 0 : const MetaStruct& meta = getResultingMeta();
173 0 : for (const char** names = meta.getFieldNames(); *names; ++names) {
174 : std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
175 0 : fieldToTopic.find(*names);
176 0 : if (found == fieldToTopic.end()) {
177 0 : if (DCPS_debug_level > 1) {
178 0 : ACE_DEBUG((LM_WARNING,
179 : ACE_TEXT("(%P|%t) WARNING: ")
180 : ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
181 : ACE_TEXT("resulting field %C has no corresponding ")
182 : ACE_TEXT("incoming field.\n"), *names));
183 : }
184 : } else {
185 0 : query_plans_[found->second].projection_.push_back(SubjectFieldSpec(*names));
186 : }
187 : }
188 : } else { // "SELECT A, B FROM ..."
189 0 : for (size_t i = 0; i < aggregation.size(); ++i) {
190 : std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
191 0 : fieldToTopic.find(aggregation[i].incoming_name_);
192 0 : if (found == fieldToTopic.end()) {
193 0 : throw std::runtime_error("Projected field " +
194 0 : aggregation[i].incoming_name_ + " has no incoming field.");
195 : } else {
196 0 : query_plans_[found->second].projection_.push_back(aggregation[i]);
197 : }
198 : }
199 : }
200 :
201 : typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
202 0 : for (iter_t it = joinKeys.begin(); it != joinKeys.end(); ++it) {
203 0 : const OPENDDS_STRING& field = it->first;
204 0 : const set<OPENDDS_STRING>& topics = it->second;
205 0 : for (set<OPENDDS_STRING>::const_iterator it2 = topics.begin(); it2 != topics.end(); ++it2) {
206 0 : const OPENDDS_STRING& topic = *it2;
207 0 : QueryPlan& qp = query_plans_[topic];
208 0 : if (find_if(qp.projection_.begin(), qp.projection_.end(), MatchesIncomingName(field))
209 0 : == qp.projection_.end()) {
210 0 : qp.keys_projected_out_.push_back(field);
211 : }
212 0 : for (set<OPENDDS_STRING>::const_iterator it3 = topics.begin(); it3 != topics.end(); ++it3) {
213 0 : if (topic != *it3) { // other topics
214 0 : qp.adjacent_joins_.insert(make_pair(*it3, field));
215 : }
216 : }
217 : }
218 : }
219 0 : }
220 :
221 0 : OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
222 : {
223 0 : DDS::TopicDescription_var td = reader->get_topicdescription();
224 0 : CORBA::String_var topic = td->get_name();
225 0 : return topic.in();
226 0 : }
227 :
228 : const MetaStruct&
229 0 : MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
230 : {
231 0 : DDS::TopicDescription_var td = reader->get_topicdescription();
232 0 : TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
233 0 : if (tdi) {
234 0 : TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
235 0 : if (ts) {
236 0 : return ts->getMetaStructForType();
237 : }
238 : }
239 0 : throw std::runtime_error("Failed to obtain type support for incoming DataReader");
240 0 : }
241 :
242 0 : void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
243 : {
244 : using namespace std;
245 : using namespace DDS;
246 :
247 0 : const OPENDDS_STRING topic = topicNameFor(reader);
248 0 : DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
249 0 : if (!dri) {
250 0 : throw runtime_error("Incoming DataReader for " + topic +
251 0 : " could not be cast to DataReaderImpl.");
252 : }
253 0 : DataReaderImpl::GenericBundle gen;
254 0 : const ReturnCode_t rc = dri->read_generic(gen,
255 : NOT_READ_SAMPLE_STATE, ANY_VIEW_STATE, ANY_INSTANCE_STATE, false);
256 0 : if (rc == RETCODE_NO_DATA) {
257 0 : return;
258 0 : } else if (rc != RETCODE_OK) {
259 0 : throw runtime_error("Incoming DataReader for " + topic +
260 0 : " could not be read: " + retcode_to_string(rc));
261 : }
262 :
263 : try {
264 0 : const MetaStruct& meta = metaStructFor(reader);
265 0 : const QueryPlan& qp = query_plans_[topic];
266 0 : for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
267 0 : const SampleInfo& si = gen.info_[i];
268 0 : if (si.valid_data) {
269 0 : incoming_sample(gen.samples_[i], si, topic.c_str(), meta);
270 0 : } else if (si.instance_state != ALIVE_INSTANCE_STATE) {
271 0 : DataReaderImpl* resulting_impl = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
272 0 : if (resulting_impl) {
273 0 : set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator iter = qp.instances_.begin();
274 0 : while (iter != qp.instances_.end() && iter->first != si.instance_handle) ++iter;
275 0 : for (; iter != qp.instances_.end() && iter->first == si.instance_handle; ++iter) {
276 0 : resulting_impl->set_instance_state(iter->second, si.instance_state);
277 : }
278 : } else {
279 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available:")
280 : ACE_TEXT(" failed to obtain DataReaderImpl.\n")));
281 : }
282 : }
283 : }
284 0 : } catch (const std::runtime_error& e) {
285 0 : ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available: %C\n"), e.what()));
286 0 : }
287 0 : }
288 :
289 0 : void MultiTopicDataReaderBase::set_status_changed_flag(DDS::StatusKind status,
290 : bool flag)
291 : {
292 0 : DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
293 0 : if (dri) {
294 0 : dri->set_status_changed_flag(status, flag);
295 : }
296 0 : }
297 :
298 0 : bool MultiTopicDataReaderBase::have_sample_states(
299 : DDS::SampleStateMask sample_states) const
300 : {
301 0 : DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
302 0 : if (dri) {
303 0 : return dri->have_sample_states(sample_states);
304 : } else {
305 0 : return false;
306 : }
307 : }
308 :
309 0 : void MultiTopicDataReaderBase::cleanup()
310 : {
311 0 : DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
312 0 : DDS::DomainParticipant_var participant = sub->get_participant();
313 0 : for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
314 0 : it != query_plans_.end(); ++it) {
315 0 : const DDS::TopicDescription_var topicDescr = it->second.data_reader_->get_topicdescription();
316 0 : const DDS::Topic_var topic = DDS::Topic::_narrow(topicDescr);
317 0 : sub->delete_datareader(it->second.data_reader_);
318 0 : participant->delete_topic(topic);
319 0 : }
320 0 : DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
321 0 : SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
322 0 : if (dri) {
323 0 : if (si) {
324 0 : si->remove_from_datareader_set(dri);
325 : }
326 0 : dri->cleanup();
327 : }
328 0 : }
329 :
330 0 : DDS::InstanceHandle_t MultiTopicDataReaderBase::get_instance_handle()
331 : {
332 0 : return resulting_reader_->get_instance_handle();
333 : }
334 :
335 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::enable()
336 : {
337 0 : return resulting_reader_->enable();
338 : }
339 :
340 0 : DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
341 : {
342 0 : return resulting_reader_->get_statuscondition();
343 : }
344 :
345 0 : DDS::StatusMask MultiTopicDataReaderBase::get_status_changes()
346 : {
347 0 : return resulting_reader_->get_status_changes();
348 : }
349 :
350 0 : DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
351 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
352 : DDS::InstanceStateMask instance_states)
353 : {
354 0 : return resulting_reader_->create_readcondition(sample_states, view_states,
355 0 : instance_states);
356 : }
357 :
358 : #ifndef OPENDDS_NO_QUERY_CONDITION
359 0 : DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
360 : DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states,
361 : DDS::InstanceStateMask instance_states, const char* query_expression,
362 : const DDS::StringSeq& query_parameters)
363 : {
364 0 : return resulting_reader_->create_querycondition(sample_states, view_states,
365 0 : instance_states, query_expression, query_parameters);
366 : }
367 : #endif
368 :
369 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::delete_readcondition(
370 : DDS::ReadCondition_ptr a_condition)
371 : {
372 0 : return resulting_reader_->delete_readcondition(a_condition);
373 : }
374 :
375 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::delete_contained_entities()
376 : {
377 0 : return resulting_reader_->delete_contained_entities();
378 : }
379 :
380 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::set_qos(
381 : const DDS::DataReaderQos& qos)
382 : {
383 0 : return resulting_reader_->set_qos(qos);
384 : }
385 :
386 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_qos(DDS::DataReaderQos& qos)
387 : {
388 0 : return resulting_reader_->get_qos(qos);
389 : }
390 :
391 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::set_listener(
392 : DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
393 : {
394 0 : return resulting_reader_->set_listener(a_listener, mask);
395 : }
396 :
397 0 : DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
398 : {
399 0 : return resulting_reader_->get_listener();
400 : }
401 :
402 0 : DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
403 : {
404 0 : return resulting_reader_->get_topicdescription();
405 : }
406 :
407 0 : DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
408 : {
409 0 : return resulting_reader_->get_subscriber();
410 : }
411 :
412 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_rejected_status(
413 : DDS::SampleRejectedStatus& status)
414 : {
415 0 : return resulting_reader_->get_sample_rejected_status(status);
416 : }
417 :
418 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_liveliness_changed_status(
419 : DDS::LivelinessChangedStatus& status)
420 : {
421 0 : return resulting_reader_->get_liveliness_changed_status(status);
422 : }
423 :
424 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_deadline_missed_status(
425 : DDS::RequestedDeadlineMissedStatus& status)
426 : {
427 0 : return resulting_reader_->get_requested_deadline_missed_status(status);
428 : }
429 :
430 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_requested_incompatible_qos_status(
431 : DDS::RequestedIncompatibleQosStatus& status)
432 : {
433 0 : return resulting_reader_->get_requested_incompatible_qos_status(status);
434 : }
435 :
436 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_subscription_matched_status(
437 : DDS::SubscriptionMatchedStatus& status)
438 : {
439 0 : return resulting_reader_->get_subscription_matched_status(status);
440 : }
441 :
442 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_sample_lost_status(
443 : DDS::SampleLostStatus& status)
444 : {
445 0 : return resulting_reader_->get_sample_lost_status(status);
446 : }
447 :
448 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::wait_for_historical_data(
449 : const DDS::Duration_t& max_wait)
450 : {
451 0 : return resulting_reader_->wait_for_historical_data(max_wait);
452 : }
453 :
454 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publications(
455 : DDS::InstanceHandleSeq& publication_handles)
456 : {
457 0 : return resulting_reader_->get_matched_publications(publication_handles);
458 : }
459 :
460 : #ifndef DDS_HAS_MINIMUM_BIT
461 0 : DDS::ReturnCode_t MultiTopicDataReaderBase::get_matched_publication_data(
462 : DDS::PublicationBuiltinTopicData& publication_data,
463 : DDS::InstanceHandle_t publication_handle)
464 : {
465 0 : return resulting_reader_->get_matched_publication_data(publication_data,
466 0 : publication_handle);
467 : }
468 : #endif
469 :
470 0 : void MultiTopicDataReaderBase::get_latency_stats(LatencyStatisticsSeq& stats)
471 : {
472 0 : resulting_reader_->get_latency_stats(stats);
473 0 : }
474 :
475 0 : void MultiTopicDataReaderBase::reset_latency_stats()
476 : {
477 0 : resulting_reader_->reset_latency_stats();
478 0 : }
479 :
480 0 : CORBA::Boolean MultiTopicDataReaderBase::statistics_enabled()
481 : {
482 0 : return resulting_reader_->statistics_enabled();
483 : }
484 :
485 0 : void MultiTopicDataReaderBase::statistics_enabled(
486 : CORBA::Boolean statistics_enabled)
487 : {
488 0 : resulting_reader_->statistics_enabled(statistics_enabled);
489 0 : }
490 :
491 : }
492 : }
493 :
494 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
495 :
496 : #endif
|