9 #ifndef OPENDDS_NO_MULTI_TOPIC 22 struct MatchesIncomingName {
23 explicit MatchesIncomingName(
const OPENDDS_STRING& s) : look_for_(s) {}
39 void on_requested_deadline_missed(DDS::DataReader_ptr ,
42 void on_requested_incompatible_qos(DDS::DataReader_ptr ,
45 void on_sample_rejected(DDS::DataReader_ptr ,
48 void on_liveliness_changed(DDS::DataReader_ptr ,
51 void on_data_available(DDS::DataReader_ptr reader){
53 outer_->data_available(reader);
54 }
catch (std::exception& e) {
55 ACE_ERROR((LM_ERROR,
ACE_TEXT(
"(%P|%t) ERROR: MultiTopicDataReaderBase::Listener::on_data_available: %C\n"),
60 void on_subscription_matched(DDS::DataReader_ptr ,
63 void on_sample_lost(DDS::DataReader_ptr ,
67 virtual void _add_ref (
void){
72 virtual void _remove_ref (
void){
73 outer_->_remove_ref();
78 return outer_->_refcount_value();
96 DDS::DataReader_var dr = multitopic->
get_type_support()->create_datareader();
97 resulting_reader_ = DataReaderEx::_narrow(dr);
101 if (!resulting_impl) {
103 ACE_TEXT(
"Failed to get DataReaderImpl.\n")));
115 ACE_TEXT(
"Failed to get DomainParticipantImpl.\n")));
118 resulting_impl->
init(multitopic, dr_qos, a_listener, mask, dpi, parent);
120 init_typed(resulting_reader_);
122 std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
126 std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
128 listener_.reset(
new Listener(
this));
130 const vector<OPENDDS_STRING>& selection = multitopic->
get_selection();
131 for (
size_t i = 0; i < selection.size(); ++i) {
134 DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
136 throw runtime_error(
"Topic: " + selection[i] +
" not found.");
140 QueryPlan& qp = query_plans_[selection[i]];
148 throw runtime_error(
"Could not create incoming DataReader " 155 for (
const char** names = meta.
getFieldNames(); *names; ++names) {
156 if (fieldToTopic.count(*names)) {
157 set<OPENDDS_STRING>& topics = joinKeys[*names];
158 topics.insert(fieldToTopic[*names]);
159 topics.insert(selection[i]);
161 fieldToTopic[*names] = selection[i];
164 }
catch (
const std::runtime_error& e) {
166 throw std::runtime_error(
"Failed to obtain metastruct for incoming.");
170 const vector<SubjectFieldSpec>& aggregation = multitopic->
get_aggregation();
171 if (aggregation.size() == 0) {
173 for (
const char** names = meta.
getFieldNames(); *names; ++names) {
174 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
175 fieldToTopic.find(*names);
176 if (found == fieldToTopic.end()) {
180 ACE_TEXT(
"MultiTopicDataReaderBase::init(), in SELECT * ")
181 ACE_TEXT(
"resulting field %C has no corresponding ")
182 ACE_TEXT(
"incoming field.\n"), *names));
185 query_plans_[found->second].projection_.push_back(
SubjectFieldSpec(*names));
189 for (
size_t i = 0; i < aggregation.size(); ++i) {
190 std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
191 fieldToTopic.find(aggregation[i].incoming_name_);
192 if (found == fieldToTopic.end()) {
193 throw std::runtime_error(
"Projected field " +
194 aggregation[i].incoming_name_ +
" has no incoming field.");
196 query_plans_[found->second].projection_.push_back(aggregation[i]);
201 typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
202 for (iter_t it = joinKeys.begin(); it != joinKeys.end(); ++it) {
204 const set<OPENDDS_STRING>& topics = it->second;
205 for (set<OPENDDS_STRING>::const_iterator it2 = topics.begin(); it2 != topics.end(); ++it2) {
212 for (set<OPENDDS_STRING>::const_iterator it3 = topics.begin(); it3 != topics.end(); ++it3) {
221 OPENDDS_STRING MultiTopicDataReaderBase::topicNameFor(DDS::DataReader_ptr reader)
223 DDS::TopicDescription_var td = reader->get_topicdescription();
229 MultiTopicDataReaderBase::metaStructFor(DDS::DataReader_ptr reader)
231 DDS::TopicDescription_var td = reader->get_topicdescription();
239 throw std::runtime_error(
"Failed to obtain type support for incoming DataReader");
242 void MultiTopicDataReaderBase::data_available(DDS::DataReader_ptr reader)
250 throw runtime_error(
"Incoming DataReader for " + topic +
251 " could not be cast to DataReaderImpl.");
259 throw runtime_error(
"Incoming DataReader for " + topic +
264 const MetaStruct& meta = metaStructFor(reader);
265 const QueryPlan& qp = query_plans_[topic];
269 incoming_sample(gen.
samples_[i], si, topic.c_str(), meta);
272 if (resulting_impl) {
273 set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator iter = qp.
instances_.begin();
280 ACE_TEXT(
" failed to obtain DataReaderImpl.\n")));
284 }
catch (
const std::runtime_error& e) {
298 bool MultiTopicDataReaderBase::have_sample_states(
309 void MultiTopicDataReaderBase::cleanup()
311 DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
312 DDS::DomainParticipant_var participant = sub->get_participant();
313 for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
314 it != query_plans_.end(); ++it) {
315 const DDS::TopicDescription_var topicDescr = it->second.data_reader_->get_topicdescription();
316 const DDS::Topic_var topic = DDS::Topic::_narrow(topicDescr);
317 sub->delete_datareader(it->second.data_reader_);
318 participant->delete_topic(topic);
332 return resulting_reader_->get_instance_handle();
337 return resulting_reader_->enable();
340 DDS::StatusCondition_ptr MultiTopicDataReaderBase::get_statuscondition()
342 return resulting_reader_->get_statuscondition();
347 return resulting_reader_->get_status_changes();
350 DDS::ReadCondition_ptr MultiTopicDataReaderBase::create_readcondition(
354 return resulting_reader_->create_readcondition(sample_states, view_states,
358 #ifndef OPENDDS_NO_QUERY_CONDITION 359 DDS::QueryCondition_ptr MultiTopicDataReaderBase::create_querycondition(
364 return resulting_reader_->create_querycondition(sample_states, view_states,
365 instance_states, query_expression, query_parameters);
370 DDS::ReadCondition_ptr a_condition)
372 return resulting_reader_->delete_readcondition(a_condition);
377 return resulting_reader_->delete_contained_entities();
383 return resulting_reader_->set_qos(qos);
388 return resulting_reader_->get_qos(qos);
394 return resulting_reader_->set_listener(a_listener, mask);
397 DDS::DataReaderListener_ptr MultiTopicDataReaderBase::get_listener()
399 return resulting_reader_->get_listener();
402 DDS::TopicDescription_ptr MultiTopicDataReaderBase::get_topicdescription()
404 return resulting_reader_->get_topicdescription();
407 DDS::Subscriber_ptr MultiTopicDataReaderBase::get_subscriber()
409 return resulting_reader_->get_subscriber();
415 return resulting_reader_->get_sample_rejected_status(status);
421 return resulting_reader_->get_liveliness_changed_status(status);
427 return resulting_reader_->get_requested_deadline_missed_status(status);
433 return resulting_reader_->get_requested_incompatible_qos_status(status);
439 return resulting_reader_->get_subscription_matched_status(status);
445 return resulting_reader_->get_sample_lost_status(status);
451 return resulting_reader_->wait_for_historical_data(max_wait);
457 return resulting_reader_->get_matched_publications(publication_handles);
460 #ifndef DDS_HAS_MINIMUM_BIT 465 return resulting_reader_->get_matched_publication_data(publication_data,
472 resulting_reader_->get_latency_stats(stats);
475 void MultiTopicDataReaderBase::reset_latency_stats()
477 resulting_reader_->reset_latency_stats();
482 return resulting_reader_->statistics_enabled();
485 void MultiTopicDataReaderBase::statistics_enabled(
488 resulting_reader_->statistics_enabled(statistics_enabled);
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
bool have_sample_states(DDS::SampleStateMask sample_states) const
sequence< InstanceHandle_t > InstanceHandleSeq
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
const DDS::StatusMask ALL_STATUS_MASK
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
InstanceStateKind instance_state
const std::vector< OPENDDS_STRING > & get_selection() const
OPENDDS_STRING incoming_name_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
void init(TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
virtual const MetaStruct & getMetaStructForType() const =0
std::set< std::pair< DDS::InstanceHandle_t, DDS::InstanceHandle_t > > instances_
const std::vector< SubjectFieldSpec > & get_aggregation() const
void set_instance_state(DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint ×tamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
unsigned long InstanceStateMask
OpenDDS::DCPS::TypeSupport_ptr get_type_support()
DDS::DataReader_var data_reader_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
#define DATAREADER_QOS_USE_TOPIC_QOS
sequence< LatencyStatistics > LatencyStatisticsSeq
std::vector< OPENDDS_STRING > keys_projected_out_
InstanceHandle_t instance_handle
DataCollector< double >::OnFull & raw_latency_buffer_type()
Configure the type of the raw data collection buffer.
virtual const char ** getFieldNames() const =0
const InstanceStateMask ANY_INSTANCE_STATE
Implements the DDS::DataReader interface.
const ViewStateMask ANY_VIEW_STATE
void enable_multi_topic(MultiTopicImpl *mt)
virtual DDS::ReturnCode_t read_generic(GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)=0
unsigned long SampleStateMask
HANDLE_TYPE_NATIVE InstanceHandle_t
const ReturnCode_t RETCODE_NO_DATA
unsigned int & raw_latency_buffer_size()
Configure the size of the raw data collection buffer.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
virtual DDS::DomainParticipant_ptr get_participant()
virtual DDS::DataReader_ptr create_datareader(DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
Implements the DDS::TopicDescription interface.
const char * retcode_to_string(DDS::ReturnCode_t value)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
const character_type * in(void) const
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
std::multimap< OPENDDS_STRING, OPENDDS_STRING > adjacent_joins_
void remove_from_datareader_set(DataReaderImpl *reader)
The Internal API and Implementation of OpenDDS.
unsigned long ViewStateMask
const InstanceStateKind ALIVE_INSTANCE_STATE
sequence< string > StringSeq
std::vector< SubjectFieldSpec > projection_