OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Protected Types | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::MultiTopicDataReaderBase Class Referenceabstract

#include <MultiTopicDataReaderBase.h>

Inheritance diagram for OpenDDS::DCPS::MultiTopicDataReaderBase:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::MultiTopicDataReaderBase:
Collaboration graph
[legend]

Classes

struct  QueryPlan
 

Public Member Functions

 MultiTopicDataReaderBase ()
 
void init (const DDS::DataReaderQos &dr_qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask, SubscriberImpl *parent, MultiTopicImpl *multitopic)
 
void data_available (DDS::DataReader_ptr reader)
 
void set_status_changed_flag (DDS::StatusKind status, bool flag)
 
bool have_sample_states (DDS::SampleStateMask sample_states) const
 
void cleanup ()
 
DDS::InstanceHandle_t get_instance_handle ()
 
DDS::ReturnCode_t enable ()
 
DDS::StatusCondition_ptr get_statuscondition ()
 
DDS::StatusMask get_status_changes ()
 
DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
 
DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
 
DDS::ReturnCode_t delete_contained_entities ()
 
DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
 
DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
 
DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
 
DDS::DataReaderListener_ptr get_listener ()
 
DDS::TopicDescription_ptr get_topicdescription ()
 
DDS::Subscriber_ptr get_subscriber ()
 
DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
 
DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
 
DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
 
DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
 
DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
 
DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
 
DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
 
DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
 
DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
 
void get_latency_stats (LatencyStatisticsSeq &stats)
 
void reset_latency_stats ()
 Clear any intermediate statistical values. More...
 
CORBA::Boolean statistics_enabled ()
 
void statistics_enabled (CORBA::Boolean statistics_enabled)
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderEx
void get_latency_stats (inout LatencyStatisticsSeq stats)
 Obtain a sequence of statistics summaries. More...
 
- Public Member Functions inherited from DDS::DataReader
ReadCondition create_readcondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states)
 
QueryCondition create_querycondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states, in string query_expression, in StringSeq query_parameters)
 
ReturnCode_t delete_readcondition (in ReadCondition a_condition)
 
ReturnCode_t set_qos (in DataReaderQos qos)
 
ReturnCode_t get_qos (inout DataReaderQos qos)
 
ReturnCode_t set_listener (in DataReaderListener a_listener, in StatusMask mask)
 
ReturnCode_t get_sample_rejected_status (inout SampleRejectedStatus status)
 
ReturnCode_t get_liveliness_changed_status (inout LivelinessChangedStatus status)
 
ReturnCode_t get_requested_deadline_missed_status (inout RequestedDeadlineMissedStatus status)
 
ReturnCode_t get_requested_incompatible_qos_status (inout RequestedIncompatibleQosStatus status)
 
ReturnCode_t get_subscription_matched_status (inout SubscriptionMatchedStatus status)
 
ReturnCode_t get_sample_lost_status (inout SampleLostStatus status)
 
ReturnCode_t wait_for_historical_data (in Duration_t max_wait)
 
ReturnCode_t get_matched_publications (inout InstanceHandleSeq publication_handles)
 
ReturnCode_t get_matched_publication_data (inout PublicationBuiltinTopicData publication_data, in InstanceHandle_t publication_handle)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Types

typedef MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec
 

Protected Member Functions

OPENDDS_STRING topicNameFor (DDS::DataReader_ptr dr)
 
const MetaStructmetaStructFor (DDS::DataReader_ptr dr)
 
 OPENDDS_MAP (OPENDDS_STRING, QueryPlan) query_plans_
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Protected Attributes

ACE_RW_Thread_Mutex qp_lock_
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 

Private Member Functions

virtual void init_typed (DataReaderEx *dr)=0
 
virtual const MetaStructgetResultingMeta ()=0
 
virtual void incoming_sample (void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)=0
 

Private Attributes

unique_ptr< OpenDDS::DCPS::LocalObject< DDS::DataReaderListener > > listener_
 
DataReaderEx_var resulting_reader_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
typedef DataReaderEx ::_ptr_type _ptr_type
 
typedef DataReaderEx ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Public Attributes inherited from OpenDDS::DCPS::DataReaderEx
attribute boolean statistics_enabled
 Statistics gathering enable state. More...
 

Detailed Description

Definition at line 30 of file MultiTopicDataReaderBase.h.

Member Typedef Documentation

◆ SubjectFieldSpec

Definition at line 139 of file MultiTopicDataReaderBase.h.

Constructor & Destructor Documentation

◆ MultiTopicDataReaderBase()

OpenDDS::DCPS::MultiTopicDataReaderBase::MultiTopicDataReaderBase ( )
inline

Definition at line 33 of file MultiTopicDataReaderBase.h.

References init(), instance_states, sample_states, and view_states.

33 {}

Member Function Documentation

◆ cleanup()

void OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup ( void  )

Definition at line 309 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::DataReaderImpl::cleanup(), and OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set().

Referenced by OpenDDS::DCPS::SubscriberImpl::delete_datareader().

310 {
311  DDS::Subscriber_var sub = resulting_reader_->get_subscriber();
312  DDS::DomainParticipant_var participant = sub->get_participant();
313  for (std::map<OPENDDS_STRING, QueryPlan>::iterator it = query_plans_.begin();
314  it != query_plans_.end(); ++it) {
315  const DDS::TopicDescription_var topicDescr = it->second.data_reader_->get_topicdescription();
316  const DDS::Topic_var topic = DDS::Topic::_narrow(topicDescr);
317  sub->delete_datareader(it->second.data_reader_);
318  participant->delete_topic(topic);
319  }
320  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
321  SubscriberImpl* si = dynamic_cast<SubscriberImpl*>(sub.in());
322  if (dri) {
323  if (si) {
324  si->remove_from_datareader_set(dri);
325  }
326  dri->cleanup();
327  }
328 }

◆ create_querycondition()

DDS::QueryCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::create_querycondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const char *  query_expression,
const DDS::StringSeq query_parameters 
)

Definition at line 359 of file MultiTopicDataReaderBase.cpp.

363 {
364  return resulting_reader_->create_querycondition(sample_states, view_states,
365  instance_states, query_expression, query_parameters);
366 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ create_readcondition()

DDS::ReadCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::create_readcondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Definition at line 350 of file MultiTopicDataReaderBase.cpp.

353 {
354  return resulting_reader_->create_readcondition(sample_states, view_states,
356 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ data_available()

void OpenDDS::DCPS::MultiTopicDataReaderBase::data_available ( DDS::DataReader_ptr  reader)

Definition at line 242 of file MultiTopicDataReaderBase.cpp.

References ACE_ERROR, ACE_TEXT(), DDS::ALIVE_INSTANCE_STATE, DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, OpenDDS::DCPS::DataReaderImpl::GenericBundle::info_, DDS::SampleInfo::instance_handle, DDS::SampleInfo::instance_state, OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::instances_, LM_ERROR, DDS::NOT_READ_SAMPLE_STATE, OPENDDS_STRING, OpenDDS::DCPS::DataReaderImpl::read_generic(), DDS::RETCODE_NO_DATA, DDS::RETCODE_OK, OpenDDS::DCPS::retcode_to_string(), OpenDDS::DCPS::DataReaderImpl::GenericBundle::samples_, OpenDDS::DCPS::DataReaderImpl::set_instance_state(), and DDS::SampleInfo::valid_data.

243 {
244  using namespace std;
245  using namespace DDS;
246 
247  const OPENDDS_STRING topic = topicNameFor(reader);
248  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(reader);
249  if (!dri) {
250  throw runtime_error("Incoming DataReader for " + topic +
251  " could not be cast to DataReaderImpl.");
252  }
253  DataReaderImpl::GenericBundle gen;
254  const ReturnCode_t rc = dri->read_generic(gen,
256  if (rc == RETCODE_NO_DATA) {
257  return;
258  } else if (rc != RETCODE_OK) {
259  throw runtime_error("Incoming DataReader for " + topic +
260  " could not be read: " + retcode_to_string(rc));
261  }
262 
263  try {
264  const MetaStruct& meta = metaStructFor(reader);
265  const QueryPlan& qp = query_plans_[topic];
266  for (CORBA::ULong i = 0; i < gen.samples_.size(); ++i) {
267  const SampleInfo& si = gen.info_[i];
268  if (si.valid_data) {
269  incoming_sample(gen.samples_[i], si, topic.c_str(), meta);
270  } else if (si.instance_state != ALIVE_INSTANCE_STATE) {
271  DataReaderImpl* resulting_impl = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
272  if (resulting_impl) {
273  set<pair<InstanceHandle_t, InstanceHandle_t> >::const_iterator iter = qp.instances_.begin();
274  while (iter != qp.instances_.end() && iter->first != si.instance_handle) ++iter;
275  for (; iter != qp.instances_.end() && iter->first == si.instance_handle; ++iter) {
276  resulting_impl->set_instance_state(iter->second, si.instance_state);
277  }
278  } else {
279  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available:")
280  ACE_TEXT(" failed to obtain DataReaderImpl.\n")));
281  }
282  }
283  }
284  } catch (const std::runtime_error& e) {
285  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::data_available: %C\n"), e.what()));
286  }
287 }
#define ACE_ERROR(X)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
InstanceHandle_t instance_handle
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_NO_DATA
const InstanceStateKind ALIVE_INSTANCE_STATE
virtual void incoming_sample(void *sample, const DDS::SampleInfo &info, const char *topic, const MetaStruct &meta)=0
const MetaStruct & metaStructFor(DDS::DataReader_ptr dr)
const InstanceStateMask ANY_INSTANCE_STATE
#define OPENDDS_STRING
STL namespace.
ACE_CDR::ULong ULong
InstanceStateKind instance_state
ACE_TEXT("TCP_Factory")
const ViewStateMask ANY_VIEW_STATE
const SampleStateKind NOT_READ_SAMPLE_STATE
OPENDDS_STRING topicNameFor(DDS::DataReader_ptr dr)
The End User API.

◆ delete_contained_entities()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::delete_contained_entities ( )

Implements DDS::DataReader.

Definition at line 375 of file MultiTopicDataReaderBase.cpp.

376 {
377  return resulting_reader_->delete_contained_entities();
378 }

◆ delete_readcondition()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::delete_readcondition ( DDS::ReadCondition_ptr  a_condition)

Definition at line 369 of file MultiTopicDataReaderBase.cpp.

371 {
372  return resulting_reader_->delete_readcondition(a_condition);
373 }

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::enable ( )

Implements DDS::Entity.

Definition at line 335 of file MultiTopicDataReaderBase.cpp.

336 {
337  return resulting_reader_->enable();
338 }

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_instance_handle ( )

Implements DDS::Entity.

Definition at line 330 of file MultiTopicDataReaderBase.cpp.

331 {
332  return resulting_reader_->get_instance_handle();
333 }

◆ get_latency_stats()

void OpenDDS::DCPS::MultiTopicDataReaderBase::get_latency_stats ( LatencyStatisticsSeq stats)

Definition at line 470 of file MultiTopicDataReaderBase.cpp.

471 {
472  resulting_reader_->get_latency_stats(stats);
473 }

◆ get_listener()

DDS::DataReaderListener_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_listener ( )

Implements DDS::DataReader.

Definition at line 397 of file MultiTopicDataReaderBase.cpp.

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

398 {
399  return resulting_reader_->get_listener();
400 }

◆ get_liveliness_changed_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_liveliness_changed_status ( DDS::LivelinessChangedStatus status)

Definition at line 418 of file MultiTopicDataReaderBase.cpp.

420 {
421  return resulting_reader_->get_liveliness_changed_status(status);
422 }

◆ get_matched_publication_data()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_matched_publication_data ( DDS::PublicationBuiltinTopicData publication_data,
DDS::InstanceHandle_t  publication_handle 
)

Definition at line 461 of file MultiTopicDataReaderBase.cpp.

464 {
465  return resulting_reader_->get_matched_publication_data(publication_data,
466  publication_handle);
467 }

◆ get_matched_publications()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_matched_publications ( DDS::InstanceHandleSeq publication_handles)

Definition at line 454 of file MultiTopicDataReaderBase.cpp.

456 {
457  return resulting_reader_->get_matched_publications(publication_handles);
458 }

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_qos ( DDS::DataReaderQos qos)

Definition at line 386 of file MultiTopicDataReaderBase.cpp.

387 {
388  return resulting_reader_->get_qos(qos);
389 }

◆ get_requested_deadline_missed_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_requested_deadline_missed_status ( DDS::RequestedDeadlineMissedStatus status)

Definition at line 424 of file MultiTopicDataReaderBase.cpp.

426 {
427  return resulting_reader_->get_requested_deadline_missed_status(status);
428 }

◆ get_requested_incompatible_qos_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_requested_incompatible_qos_status ( DDS::RequestedIncompatibleQosStatus status)

Definition at line 430 of file MultiTopicDataReaderBase.cpp.

432 {
433  return resulting_reader_->get_requested_incompatible_qos_status(status);
434 }

◆ get_sample_lost_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_sample_lost_status ( DDS::SampleLostStatus status)

Definition at line 442 of file MultiTopicDataReaderBase.cpp.

444 {
445  return resulting_reader_->get_sample_lost_status(status);
446 }

◆ get_sample_rejected_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_sample_rejected_status ( DDS::SampleRejectedStatus status)

Definition at line 412 of file MultiTopicDataReaderBase.cpp.

414 {
415  return resulting_reader_->get_sample_rejected_status(status);
416 }

◆ get_status_changes()

DDS::StatusMask OpenDDS::DCPS::MultiTopicDataReaderBase::get_status_changes ( )

Implements DDS::Entity.

Definition at line 345 of file MultiTopicDataReaderBase.cpp.

346 {
347  return resulting_reader_->get_status_changes();
348 }

◆ get_statuscondition()

DDS::StatusCondition_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_statuscondition ( )

Implements DDS::Entity.

Definition at line 340 of file MultiTopicDataReaderBase.cpp.

341 {
342  return resulting_reader_->get_statuscondition();
343 }

◆ get_subscriber()

DDS::Subscriber_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_subscriber ( )

Implements DDS::DataReader.

Definition at line 407 of file MultiTopicDataReaderBase.cpp.

408 {
409  return resulting_reader_->get_subscriber();
410 }

◆ get_subscription_matched_status()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::get_subscription_matched_status ( DDS::SubscriptionMatchedStatus status)

Definition at line 436 of file MultiTopicDataReaderBase.cpp.

438 {
439  return resulting_reader_->get_subscription_matched_status(status);
440 }

◆ get_topicdescription()

DDS::TopicDescription_ptr OpenDDS::DCPS::MultiTopicDataReaderBase::get_topicdescription ( )

Implements DDS::DataReader.

Definition at line 402 of file MultiTopicDataReaderBase.cpp.

403 {
404  return resulting_reader_->get_topicdescription();
405 }

◆ getResultingMeta()

virtual const MetaStruct& OpenDDS::DCPS::MultiTopicDataReaderBase::getResultingMeta ( )
privatepure virtual

◆ have_sample_states()

bool OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states ( DDS::SampleStateMask  sample_states) const

Definition at line 298 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::DataReaderImpl::have_sample_states().

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

300 {
301  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
302  if (dri) {
303  return dri->have_sample_states(sample_states);
304  } else {
305  return false;
306  }
307 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66

◆ incoming_sample()

virtual void OpenDDS::DCPS::MultiTopicDataReaderBase::incoming_sample ( void *  sample,
const DDS::SampleInfo info,
const char *  topic,
const MetaStruct meta 
)
privatepure virtual

◆ init()

void OpenDDS::DCPS::MultiTopicDataReaderBase::init ( const DDS::DataReaderQos dr_qos,
DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask,
SubscriberImpl parent,
MultiTopicImpl multitopic 
)

Definition at line 91 of file MultiTopicDataReaderBase.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_WRITE_GUARD, OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::adjacent_joins_, OpenDDS::DCPS::ALL_STATUS_MASK, OpenDDS::DCPS::SubscriberImpl::create_datareader(), OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::data_reader_, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataReaderImpl::enable_multi_topic(), OpenDDS::DCPS::MultiTopicImpl::get_aggregation(), OpenDDS::DCPS::SubscriberImpl::get_participant(), OpenDDS::DCPS::MultiTopicImpl::get_selection(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), OpenDDS::DCPS::MetaStruct::getFieldNames(), OpenDDS::DCPS::DataReaderImpl::init(), OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::keys_projected_out_, LM_ERROR, LM_WARNING, OPENDDS_STRING, OpenDDS::DCPS::MultiTopicDataReaderBase::QueryPlan::projection_, OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size(), OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type(), and OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type().

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

94 {
95  using namespace std;
96  DDS::DataReader_var dr = multitopic->get_type_support()->create_datareader();
97  resulting_reader_ = DataReaderEx::_narrow(dr);
98  DataReaderImpl* resulting_impl =
99  dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
100 
101  if (!resulting_impl) {
102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
103  ACE_TEXT("Failed to get DataReaderImpl.\n")));
104  return;
105  }
106 
107  resulting_impl->enable_multi_topic(multitopic);
108  resulting_impl->raw_latency_buffer_size() = parent->raw_latency_buffer_size();
109  resulting_impl->raw_latency_buffer_type() = parent->raw_latency_buffer_type();
110 
111  DDS::DomainParticipant_var participant = parent->get_participant();
112  DomainParticipantImpl* dpi = dynamic_cast<DomainParticipantImpl*>(participant.in());
113  if (!dpi) {
114  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: ")
115  ACE_TEXT("Failed to get DomainParticipantImpl.\n")));
116  return;
117  }
118  resulting_impl->init(multitopic, dr_qos, a_listener, mask, dpi, parent);
119 
121 
122  std::map<OPENDDS_STRING, OPENDDS_STRING> fieldToTopic;
123 
124  // key: name of field that's a key for the 'join'
125  // mapped: set of topicNames that have this key in common
126  std::map<OPENDDS_STRING, set<OPENDDS_STRING> > joinKeys;
127 
128  listener_.reset(new Listener(this));
129 
130  const vector<OPENDDS_STRING>& selection = multitopic->get_selection();
131  for (size_t i = 0; i < selection.size(); ++i) {
132 
133  const DDS::Duration_t no_wait = {0, 0};
134  DDS::Topic_var t = participant->find_topic(selection[i].c_str(), no_wait);
135  if (!t.in()) {
136  throw runtime_error("Topic: " + selection[i] + " not found.");
137  }
138 
139 
140  QueryPlan& qp = query_plans_[selection[i]];
141  {
143  qp.data_reader_ =
144  parent->create_datareader(t, DATAREADER_QOS_USE_TOPIC_QOS,
146  }
147  if (!qp.data_reader_.in()) {
148  throw runtime_error("Could not create incoming DataReader "
149  + selection[i]);
150  }
151 
152  try {
153  const MetaStruct& meta = metaStructFor(qp.data_reader_);
154 
155  for (const char** names = meta.getFieldNames(); *names; ++names) {
156  if (fieldToTopic.count(*names)) { // already seen this field name
157  set<OPENDDS_STRING>& topics = joinKeys[*names];
158  topics.insert(fieldToTopic[*names]);
159  topics.insert(selection[i]);
160  } else {
161  fieldToTopic[*names] = selection[i];
162  }
163  }
164  } catch (const std::runtime_error& e) {
165  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: MultiTopicDataReaderBase::init: %C\n"), e.what()));
166  throw std::runtime_error("Failed to obtain metastruct for incoming.");
167  }
168  }
169 
170  const vector<SubjectFieldSpec>& aggregation = multitopic->get_aggregation();
171  if (aggregation.size() == 0) { // "SELECT * FROM ..."
172  const MetaStruct& meta = getResultingMeta();
173  for (const char** names = meta.getFieldNames(); *names; ++names) {
174  std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
175  fieldToTopic.find(*names);
176  if (found == fieldToTopic.end()) {
177  if (DCPS_debug_level > 1) {
178  ACE_DEBUG((LM_WARNING,
179  ACE_TEXT("(%P|%t) WARNING: ")
180  ACE_TEXT("MultiTopicDataReaderBase::init(), in SELECT * ")
181  ACE_TEXT("resulting field %C has no corresponding ")
182  ACE_TEXT("incoming field.\n"), *names));
183  }
184  } else {
185  query_plans_[found->second].projection_.push_back(SubjectFieldSpec(*names));
186  }
187  }
188  } else { // "SELECT A, B FROM ..."
189  for (size_t i = 0; i < aggregation.size(); ++i) {
190  std::map<OPENDDS_STRING, OPENDDS_STRING>::const_iterator found =
191  fieldToTopic.find(aggregation[i].incoming_name_);
192  if (found == fieldToTopic.end()) {
193  throw std::runtime_error("Projected field " +
194  aggregation[i].incoming_name_ + " has no incoming field.");
195  } else {
196  query_plans_[found->second].projection_.push_back(aggregation[i]);
197  }
198  }
199  }
200 
201  typedef std::map<OPENDDS_STRING, set<OPENDDS_STRING> >::const_iterator iter_t;
202  for (iter_t it = joinKeys.begin(); it != joinKeys.end(); ++it) {
203  const OPENDDS_STRING& field = it->first;
204  const set<OPENDDS_STRING>& topics = it->second;
205  for (set<OPENDDS_STRING>::const_iterator it2 = topics.begin(); it2 != topics.end(); ++it2) {
206  const OPENDDS_STRING& topic = *it2;
207  QueryPlan& qp = query_plans_[topic];
208  if (find_if(qp.projection_.begin(), qp.projection_.end(), MatchesIncomingName(field))
209  == qp.projection_.end()) {
210  qp.keys_projected_out_.push_back(field);
211  }
212  for (set<OPENDDS_STRING>::const_iterator it3 = topics.begin(); it3 != topics.end(); ++it3) {
213  if (topic != *it3) { // other topics
214  qp.adjacent_joins_.insert(make_pair(*it3, field));
215  }
216  }
217  }
218  }
219 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
MultiTopicImpl::SubjectFieldSpec SubjectFieldSpec
const MetaStruct & metaStructFor(DDS::DataReader_ptr dr)
virtual void init_typed(DataReaderEx *dr)=0
#define OPENDDS_STRING
STL namespace.
unique_ptr< OpenDDS::DCPS::LocalObject< DDS::DataReaderListener > > listener_
#define DATAREADER_QOS_USE_TOPIC_QOS
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual const MetaStruct & getResultingMeta()=0
ACE_TEXT("TCP_Factory")
const DDS::StatusMask ALL_STATUS_MASK
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)

◆ init_typed()

virtual void OpenDDS::DCPS::MultiTopicDataReaderBase::init_typed ( DataReaderEx dr)
privatepure virtual

◆ metaStructFor()

const MetaStruct & OpenDDS::DCPS::MultiTopicDataReaderBase::metaStructFor ( DDS::DataReader_ptr  dr)
protected

Definition at line 229 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), and OpenDDS::DCPS::TypeSupportImpl::getMetaStructForType().

230 {
231  DDS::TopicDescription_var td = reader->get_topicdescription();
232  TopicDescriptionImpl* tdi = dynamic_cast<TopicDescriptionImpl*>(td.in());
233  if (tdi) {
234  TypeSupportImpl* ts = dynamic_cast<TypeSupportImpl*>(tdi->get_type_support());
235  if (ts) {
236  return ts->getMetaStructForType();
237  }
238  }
239  throw std::runtime_error("Failed to obtain type support for incoming DataReader");
240 }

◆ OPENDDS_MAP()

OpenDDS::DCPS::MultiTopicDataReaderBase::OPENDDS_MAP ( OPENDDS_STRING  ,
QueryPlan   
)
protected

◆ reset_latency_stats()

void OpenDDS::DCPS::MultiTopicDataReaderBase::reset_latency_stats ( )

Clear any intermediate statistical values.

Implements OpenDDS::DCPS::DataReaderEx.

Definition at line 475 of file MultiTopicDataReaderBase.cpp.

476 {
477  resulting_reader_->reset_latency_stats();
478 }

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::set_listener ( DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
)

Definition at line 391 of file MultiTopicDataReaderBase.cpp.

393 {
394  return resulting_reader_->set_listener(a_listener, mask);
395 }

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::set_qos ( const DDS::DataReaderQos qos)

Definition at line 380 of file MultiTopicDataReaderBase.cpp.

382 {
383  return resulting_reader_->set_qos(qos);
384 }

◆ set_status_changed_flag()

void OpenDDS::DCPS::MultiTopicDataReaderBase::set_status_changed_flag ( DDS::StatusKind  status,
bool  flag 
)

Definition at line 289 of file MultiTopicDataReaderBase.cpp.

References OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

Referenced by OpenDDS::DCPS::SubscriberImpl::notify_datareaders().

291 {
292  DataReaderImpl* dri = dynamic_cast<DataReaderImpl*>(resulting_reader_.in());
293  if (dri) {
294  dri->set_status_changed_flag(status, flag);
295  }
296 }

◆ statistics_enabled() [1/2]

CORBA::Boolean OpenDDS::DCPS::MultiTopicDataReaderBase::statistics_enabled ( )

Definition at line 480 of file MultiTopicDataReaderBase.cpp.

481 {
482  return resulting_reader_->statistics_enabled();
483 }

◆ statistics_enabled() [2/2]

void OpenDDS::DCPS::MultiTopicDataReaderBase::statistics_enabled ( CORBA::Boolean  statistics_enabled)

◆ topicNameFor()

OPENDDS_STRING OpenDDS::DCPS::MultiTopicDataReaderBase::topicNameFor ( DDS::DataReader_ptr  dr)
protected

Definition at line 221 of file MultiTopicDataReaderBase.cpp.

References TAO::String_var< charT >::in().

222 {
223  DDS::TopicDescription_var td = reader->get_topicdescription();
224  CORBA::String_var topic = td->get_name();
225  return topic.in();
226 }
const character_type * in(void) const

◆ wait_for_historical_data()

DDS::ReturnCode_t OpenDDS::DCPS::MultiTopicDataReaderBase::wait_for_historical_data ( const DDS::Duration_t max_wait)

Definition at line 448 of file MultiTopicDataReaderBase.cpp.

450 {
451  return resulting_reader_->wait_for_historical_data(max_wait);
452 }

Member Data Documentation

◆ listener_

unique_ptr<OpenDDS::DCPS::LocalObject<DDS::DataReaderListener> > OpenDDS::DCPS::MultiTopicDataReaderBase::listener_
private

Definition at line 131 of file MultiTopicDataReaderBase.h.

◆ qp_lock_

ACE_RW_Thread_Mutex OpenDDS::DCPS::MultiTopicDataReaderBase::qp_lock_
mutableprotected

Definition at line 149 of file MultiTopicDataReaderBase.h.

◆ resulting_reader_

DataReaderEx_var OpenDDS::DCPS::MultiTopicDataReaderBase::resulting_reader_
private

Definition at line 132 of file MultiTopicDataReaderBase.h.


The documentation for this class was generated from the following files: