OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Static Public Member Functions | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::SubscriberImpl Class Reference

#include <SubscriberImpl.h>

Inheritance diagram for OpenDDS::DCPS::SubscriberImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::SubscriberImpl:
Collaboration graph
[legend]

Public Member Functions

 SubscriberImpl (DDS::InstanceHandle_t handle, const DDS::SubscriberQos &qos, DDS::SubscriberListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
 
virtual ~SubscriberImpl ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
bool contains_reader (DDS::InstanceHandle_t a_handle)
 
virtual DDS::DataReader_ptr create_datareader (DDS::TopicDescription_ptr a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::ReturnCode_t delete_datareader (DDS::DataReader_ptr a_datareader)
 
virtual DDS::ReturnCode_t delete_contained_entities ()
 
virtual DDS::DataReader_ptr lookup_datareader (const char *topic_name)
 
virtual DDS::ReturnCode_t get_datareaders (DDS::DataReaderSeq &readers, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t notify_datareaders ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::SubscriberQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::SubscriberQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::SubscriberListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::SubscriberListener_ptr get_listener ()
 
virtual DDS::ReturnCode_t begin_access ()
 
virtual DDS::ReturnCode_t end_access ()
 
virtual DDS::DomainParticipant_ptr get_participant ()
 
virtual DDS::ReturnCode_t set_default_datareader_qos (const DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t get_default_datareader_qos (DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t copy_from_topic_qos (DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
 
virtual DDS::ReturnCode_t enable ()
 
bool is_clean (String *leftover_entities=0) const
 
void data_received (DataReaderImpl *reader)
 
DDS::ReturnCode_t reader_enabled (const char *topic_name, DataReaderImpl *reader)
 
DDS::ReturnCode_t multitopic_reader_enabled (DDS::DataReader_ptr reader)
 
void remove_from_datareader_set (DataReaderImpl *reader)
 
DDS::SubscriberListener_ptr listener_for (DDS::StatusKind kind)
 
typedef OPENDDS_VECTOR (GUID_t) SubscriptionIdVec
 
void get_subscription_ids (SubscriptionIdVec &subs)
 
void update_ownership_strength (const GUID_t &pub_id, const CORBA::Long &ownership_strength)
 
void coherent_change_received (const GUID_t &publisher_id, DataReaderImpl *reader, Coherent_State &group_state)
 
virtual RcHandle< EntityImplparent () const
 
Raw Latency Statistics Configuration Interfaces
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer. More...
 
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer. More...
 
- Public Member Functions inherited from DDS::Subscriber
DataReader create_datareader (in TopicDescription a_topic, in DataReaderQos qos, in DataReaderListener a_listener, in StatusMask mask)
 
ReturnCode_t delete_datareader (in DataReader a_datareader)
 
DataReader lookup_datareader (in string topic_name)
 
ReturnCode_t get_datareaders (inout DataReaderSeq readers, in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states)
 
ReturnCode_t set_qos (in SubscriberQos qos)
 
ReturnCode_t get_qos (inout SubscriberQos qos)
 
ReturnCode_t set_listener (in SubscriberListener a_listener, in StatusMask mask)
 
ReturnCode_t set_default_datareader_qos (in DataReaderQos qos)
 
ReturnCode_t get_default_datareader_qos (inout DataReaderQos qos)
 
ReturnCode_t copy_from_topic_qos (inout DataReaderQos a_datareader_qos, in TopicQos a_topic_qos)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 

Static Public Member Functions

static bool validate_datareader_qos (const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Subscriber >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 

Private Member Functions

typedef OPENDDS_MULTIMAP (OPENDDS_STRING, DataReaderImpl_rch) DataReaderMap
 
typedef OPENDDS_SET (DataReaderImpl_rch) DataReaderSet
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::DataReaderQos, GUID_tKeyLessThan) DrIdToQosMap
 DataReader id to qos map. More...
 
typedef OPENDDS_MAP (String, DDS::DataReader_var) MultitopicReaderMap
 

Private Attributes

DDS::InstanceHandle_t handle_
 
DDS::SubscriberQos qos_
 
DDS::DataReaderQos default_datareader_qos_
 
ACE_Thread_Mutex listener_mutex_
 
DDS::StatusMask listener_mask_
 
DDS::SubscriberListener_var listener_
 
DataReaderSet readers_not_enabled_
 
DataReaderMap datareader_map_
 
DataReaderSet datareader_set_
 
MultitopicReaderMap multitopic_reader_map_
 
WeakRcHandle< DomainParticipantImplparticipant_
 
DDS::DomainId_t domain_id_
 
GUID_t dp_id_
 
unsigned int raw_latency_buffer_size_
 Bound (or initial reservation) of raw latency buffers. More...
 
DataCollector< double >::OnFull raw_latency_buffer_type_
 Type of raw latency data buffers. More...
 
ACE_Recursive_Thread_Mutex dr_set_lock_
 
ACE_Recursive_Thread_Mutex si_lock_
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
int access_depth_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Subscriber >
typedef DDS::Subscriber ::_ptr_type _ptr_type
 
typedef DDS::Subscriber ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Detailed Description

Definition at line 36 of file SubscriberImpl.h.

Constructor & Destructor Documentation

◆ SubscriberImpl()

OpenDDS::DCPS::SubscriberImpl::SubscriberImpl ( DDS::InstanceHandle_t  handle,
const DDS::SubscriberQos qos,
DDS::SubscriberListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant 
)

Definition at line 41 of file SubscriberImpl.cpp.

References listener_, monitor_, and TheServiceParticipant.

46  : handle_(handle),
47  qos_(qos),
48  default_datareader_qos_(TheServiceParticipant->initial_DataReaderQos()),
49  listener_mask_(mask),
50  participant_(*participant),
51  domain_id_(participant->get_domain_id()),
54  access_depth_ (0)
55 {
56  //Note: OK to duplicate a nil.
57  listener_ = DDS::SubscriberListener::_duplicate(a_listener);
58 
59  monitor_.reset(TheServiceParticipant->monitor_factory_->create_subscriber_monitor(this));
60 }
DDS::DataReaderQos default_datareader_qos_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.
DDS::InstanceHandle_t handle_
DDS::SubscriberListener_var listener_
WeakRcHandle< DomainParticipantImpl > participant_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
#define TheServiceParticipant

◆ ~SubscriberImpl()

OpenDDS::DCPS::SubscriberImpl::~SubscriberImpl ( )
virtual

Definition at line 62 of file SubscriberImpl.cpp.

References ACE_ERROR, handle_, is_clean(), LM_WARNING, OpenDDS::DCPS::log_level, participant_, and OpenDDS::DCPS::LogLevel::Warning.

63 {
64  const RcHandle<DomainParticipantImpl> participant = participant_.lock();
65  if (participant) {
66  participant->return_handle(handle_);
67  }
68 
69  // The datareaders should be deleted already before calling delete
70  // subscriber.
71  String leftover_entities;
72  if (!is_clean(&leftover_entities)) {
74  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: SubscriberImpl::~SubscriberImpl: "
75  "%C still exist\n", leftover_entities.c_str()));
76  }
77  }
78 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
DDS::InstanceHandle_t handle_
WeakRcHandle< DomainParticipantImpl > participant_
std::string String
bool is_clean(String *leftover_entities=0) const

Member Function Documentation

◆ begin_access()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::begin_access ( )
virtual

Implements DDS::Subscriber.

Definition at line 728 of file SubscriberImpl.cpp.

References access_depth_, DDS::PresentationQosPolicy::access_scope, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::DCPS_debug_level, dr_set_lock_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and si_lock_.

729 {
730  DataReaderSet to_call;
731  {
733  si_guard,
734  si_lock_,
736  if (!enabled_) {
737  if (DCPS_debug_level > 0) {
738  ACE_ERROR((LM_ERROR,
739  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::begin_access:")
740  ACE_TEXT(" Subscriber is not enabled!\n")));
741  }
743  }
744 
746  return DDS::RETCODE_OK;
747  }
748 
749  ++access_depth_;
750  // We should only notify subscription on the first
751  // and last change to the current change set:
752  if (access_depth_ == 1) {
754  dr_set_guard,
755  dr_set_lock_,
757  to_call = datareader_set_;
758  }
759  }
760 
761  for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
762  (*it)->begin_access();
763  }
764  return DDS::RETCODE_OK;
765 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PresentationQosPolicy presentation
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex dr_set_lock_
PresentationQosPolicyAccessScopeKind access_scope
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ coherent_change_received()

void OpenDDS::DCPS::SubscriberImpl::coherent_change_received ( const GUID_t publisher_id,
DataReaderImpl reader,
Coherent_State group_state 
)

Definition at line 1038 of file SubscriberImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::COMPLETED, datareader_set_, dr_set_lock_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, and state.

1041 {
1042  DataReaderSet localdrs;
1043  {
1045  guard,
1046  this->dr_set_lock_);
1047  localdrs = datareader_set_;
1048  }
1049  // Verify if all readers complete the coherent changes. The result
1050  // is either COMPLETED or REJECTED.
1051  group_state = COMPLETED;
1052  for (DataReaderSet::const_iterator iter = localdrs.begin();
1053  iter != localdrs.end(); ++iter) {
1054 
1056  (*iter)->coherent_change_received (publisher_id, state);
1057  if (state == NOT_COMPLETED_YET) {
1058  group_state = NOT_COMPLETED_YET;
1059  return;
1060  }
1061  else if (state == REJECTED) {
1062  group_state = REJECTED;
1063  }
1064  }
1065 
1066  GUID_t writerId = GUID_UNKNOWN;
1067  for (DataReaderSet::const_iterator iter = localdrs.begin();
1068  iter != localdrs.end(); ++iter) {
1069  if (group_state == COMPLETED) {
1070  (*iter)->accept_coherent (writerId, publisher_id);
1071  }
1072  else { //REJECTED
1073  (*iter)->reject_coherent (writerId, publisher_id);
1074  }
1075  }
1076 
1077  if (group_state == COMPLETED) {
1078  for (DataReaderSet::const_iterator iter = localdrs.begin();
1079  iter != localdrs.end(); ++iter) {
1080  (*iter)->coherent_changes_completed (reader);
1081  (*iter)->reset_coherent_info (writerId, publisher_id);
1082  }
1083  }
1084 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
ACE_Recursive_Thread_Mutex dr_set_lock_

◆ contains_reader()

bool OpenDDS::DCPS::SubscriberImpl::contains_reader ( DDS::InstanceHandle_t  a_handle)

Definition at line 87 of file SubscriberImpl.cpp.

References ACE_GUARD_RETURN, datareader_map_, and si_lock_.

88 {
90  guard,
91  this->si_lock_,
92  false);
93 
94  for (DataReaderMap::iterator it(datareader_map_.begin());
95  it != datareader_map_.end(); ++it) {
96  if (a_handle == it->second->get_instance_handle()) {
97  return true;
98  }
99  }
100 
101  return false;
102 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex si_lock_

◆ copy_from_topic_qos()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::copy_from_topic_qos ( DDS::DataReaderQos a_datareader_qos,
const DDS::TopicQos a_topic_qos 
)
virtual

Definition at line 846 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.

849 {
850  if (Qos_Helper::copy_from_topic_qos(a_datareader_qos, a_topic_qos) ) {
851  return DDS::RETCODE_OK;
852 
853  } else {
855  }
856 }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)

◆ create_datareader()

DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::create_datareader ( DDS::TopicDescription_ptr  a_topic_desc,
const DDS::DataReaderQos qos,
DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 105 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DDS::EntityFactoryQosPolicy::autoenable_created_entities, OpenDDS::DCPS::DCPS_debug_level, default_datareader_qos_, OpenDDS::DCPS::DataReaderImpl::enable(), OpenDDS::DCPS::DataReaderImpl::enable_filtering(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::ContentFilteredTopicImpl::get_related_topic(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), TAO::String_var< charT >::in(), OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::MultiTopicDataReaderBase::init(), OpenDDS::DCPS::DataReaderImpl::init(), CORBA::is_nil(), LM_ERROR, LM_WARNING, multitopic_reader_enabled(), name, participant_, qos_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size(), raw_latency_buffer_size_, OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type(), raw_latency_buffer_type_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, DDS::RETCODE_OK, si_lock_, and validate_datareader_qos().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

110 {
111  if (CORBA::is_nil(a_topic_desc)) {
112  if (DCPS_debug_level > 0) {
113  ACE_ERROR((LM_ERROR,
114  ACE_TEXT("(%P|%t) ERROR: ")
115  ACE_TEXT("SubscriberImpl::create_datareader, ")
116  ACE_TEXT("topic desc is nil.\n")));
117  }
118  return DDS::DataReader::_nil();
119  }
120 
121  DDS::DataReaderQos dr_qos;
122  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
123  if (!participant)
124  return DDS::DataReader::_nil();
125 
126  TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic_desc);
127 
128 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
129  ContentFilteredTopicImpl* cft = 0;
130 #endif
131 #ifndef OPENDDS_NO_MULTI_TOPIC
132  MultiTopicImpl* mt = 0;
133 #else
134  bool mt = false;
135 #endif
136 
137  if (!topic_servant) {
138 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
139  cft = dynamic_cast<ContentFilteredTopicImpl*>(a_topic_desc);
140  if (cft) {
141  DDS::Topic_var related;
142  related = cft->get_related_topic();
143  topic_servant = dynamic_cast<TopicImpl*>(related.in());
144  }
145  else
146 #endif
147  {
148 #ifndef OPENDDS_NO_MULTI_TOPIC
149  mt = dynamic_cast<MultiTopicImpl*>(a_topic_desc);
150 #endif
151  }
152  }
153 
154  if (!validate_datareader_qos (qos, default_datareader_qos_, topic_servant, dr_qos, mt))
155  return DDS::DataReader::_nil();
156 
157 #ifndef OPENDDS_NO_MULTI_TOPIC
158  if (mt) {
159  try {
160  DDS::DataReader_var dr =
161  mt->get_type_support()->create_multitopic_datareader();
162  MultiTopicDataReaderBase* mtdr =
163  dynamic_cast<MultiTopicDataReaderBase*>(dr.in());
164  mtdr->init(dr_qos, a_listener, mask, this, mt);
166  if (dr->enable() != DDS::RETCODE_OK) {
167  if (DCPS_debug_level > 0) {
168  ACE_ERROR((LM_ERROR,
169  ACE_TEXT("(%P|%t) ERROR: ")
170  ACE_TEXT("SubscriberImpl::create_datareader, ")
171  ACE_TEXT("enable of MultiTopicDataReader failed.\n")));
172  }
173  return DDS::DataReader::_nil();
174  }
176  }
177  return dr._retn();
178  } catch (const std::exception& e) {
179  if (DCPS_debug_level > 0) {
180  ACE_ERROR((LM_ERROR,
181  ACE_TEXT("(%P|%t) ERROR: ")
182  ACE_TEXT("SubscriberImpl::create_datareader, ")
183  ACE_TEXT("creation of MultiTopicDataReader failed: %C.\n"),
184  e.what()));
185  }
186  }
187  return DDS::DataReader::_nil();
188  }
189 #endif
190 
191  OpenDDS::DCPS::TypeSupport_ptr typesupport =
192  topic_servant->get_type_support();
193 
194  if (0 == typesupport) {
195  CORBA::String_var name = a_topic_desc->get_name();
196  if (DCPS_debug_level > 0) {
197  ACE_ERROR((LM_ERROR,
198  ACE_TEXT("(%P|%t) ERROR: ")
199  ACE_TEXT("SubscriberImpl::create_datareader, ")
200  ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
201  name.in()));
202  }
203  return DDS::DataReader::_nil();
204  }
205 
206  DDS::DataReader_var dr_obj = typesupport->create_datareader();
207 
208  DataReaderImpl* dr_servant =
209  dynamic_cast<DataReaderImpl*>(dr_obj.in());
210 
211  if (dr_servant == 0) {
212  if (DCPS_debug_level > 0) {
213  ACE_ERROR((LM_ERROR,
214  ACE_TEXT("(%P|%t) ERROR: ")
215  ACE_TEXT("SubscriberImpl::create_datareader, ")
216  ACE_TEXT("servant is nil.\n")));
217  }
218  return DDS::DataReader::_nil();
219  }
220 
221 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
222  if (cft) {
223  dr_servant->enable_filtering(cft);
224  }
225 #endif
226 
227  // Propagate the latency buffer data collection configuration.
228  // @TODO: Determine whether we want to exclude the Builtin Topic
229  // readers from data gathering.
230  dr_servant->raw_latency_buffer_size() = this->raw_latency_buffer_size_;
231  dr_servant->raw_latency_buffer_type() = this->raw_latency_buffer_type_;
232 
233 
234  dr_servant->init(topic_servant,
235  dr_qos,
236  a_listener,
237  mask,
238  participant.in(),
239  this);
240 
242  const DDS::ReturnCode_t ret = dr_servant->enable();
243 
244  if (ret != DDS::RETCODE_OK) {
245  if (DCPS_debug_level > 0) {
246  ACE_ERROR((LM_WARNING,
247  ACE_TEXT("(%P|%t) WARNING: ")
248  ACE_TEXT("SubscriberImpl::create_datareader, ")
249  ACE_TEXT("enable failed.\n")));
250  }
251  return DDS::DataReader::_nil();
252  }
253  } else {
255  readers_not_enabled_.insert(rchandle_from(dr_servant));
256  }
257 
258  // add created data reader to this' data reader container -
259  // done in enable_reader
260  return DDS::DataReader::_duplicate(dr_obj.in());
261 }
#define ACE_ERROR(X)
DDS::DataReaderQos default_datareader_qos_
const ReturnCode_t RETCODE_OK
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
WeakRcHandle< DomainParticipantImpl > participant_
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
static bool validate_datareader_qos(const DDS::DataReaderQos &qos, const DDS::DataReaderQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataReaderQos &result_qos, bool mt)
EntityFactoryQosPolicy entity_factory
const character_type * in(void) const
DDS::ReturnCode_t multitopic_reader_enabled(DDS::DataReader_ptr reader)
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
Boolean is_nil(T x)

◆ data_received()

void OpenDDS::DCPS::SubscriberImpl::data_received ( DataReaderImpl reader)

Definition at line 917 of file SubscriberImpl.cpp.

References ACE_GUARD, datareader_set_, dr_set_lock_, and OpenDDS::DCPS::rchandle_from().

918 {
920  guard,
921  this->dr_set_lock_);
922  datareader_set_.insert(rchandle_from(reader));
923 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_Recursive_Thread_Mutex dr_set_lock_

◆ delete_contained_entities()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_contained_entities ( )
virtual

Implements DDS::Subscriber.

Definition at line 395 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, delete_datareader(), LM_ERROR, multitopic_reader_map_, OPENDDS_VECTOR(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_deleted(), and si_lock_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber().

396 {
397  // mark that the entity is being deleted
398  set_deleted(true);
399 
401 
402 #ifndef OPENDDS_NO_MULTI_TOPIC
403  {
405  guard,
406  this->si_lock_,
408  for (MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.begin();
409  mt_iter != multitopic_reader_map_.end(); ++mt_iter) {
410  drs.push_back(mt_iter->second);
411  }
412  }
413 
414  for (size_t i = 0; i < drs.size(); ++i) {
415  DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
416  if (ret == DDS::RETCODE_OK) {
417  ret = delete_datareader(drs[i]);
418  }
419  if (ret != DDS::RETCODE_OK) {
420  if (DCPS_debug_level > 0) {
421  ACE_ERROR((LM_ERROR,
422  ACE_TEXT("(%P|%t) ERROR: ")
423  ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
424  ACE_TEXT("failed to delete datareader\n")));
425  }
426  return ret;
427  }
428  }
429  drs.clear();
430 #endif
431 
432  {
434  guard,
435  this->si_lock_,
437  DataReaderMap::iterator it;
438  DataReaderMap::iterator itEnd = datareader_map_.end();
439 
440  for (it = datareader_map_.begin(); it != itEnd; ++it) {
441  drs.push_back(it->second.in());
442  }
443  }
444 
445  for (size_t i = 0; i < drs.size(); ++i) {
446  DDS::ReturnCode_t ret = drs[i]->delete_contained_entities();
447  if (ret == DDS::RETCODE_OK) {
448  ret = delete_datareader(drs[i]);
449  }
450  if (ret != DDS::RETCODE_OK) {
451  if (DCPS_debug_level > 0) {
452  ACE_ERROR((LM_ERROR,
453  ACE_TEXT("(%P|%t) ERROR: ")
454  ACE_TEXT("SubscriberImpl::delete_contained_entities, ")
455  ACE_TEXT("failed to delete datareader\n")));
456  }
457  return ret;
458  }
459  }
460 
461  // the subscriber can now start creating new publications
462  set_deleted(false);
463 
464  return DDS::RETCODE_OK;
465 }
MultitopicReaderMap multitopic_reader_map_
#define ACE_ERROR(X)
virtual DDS::ReturnCode_t delete_datareader(DDS::DataReader_ptr a_datareader)
const ReturnCode_t RETCODE_OK
typedef OPENDDS_VECTOR(GUID_t) SubscriptionIdVec
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ delete_datareader()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::delete_datareader ( DDS::DataReader_ptr  a_datareader)
virtual

Definition at line 264 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup(), datareader_map_, datareader_set_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dr_set_lock_, OpenDDS::DCPS::RcHandle< T >::get(), TAO::String_var< charT >::in(), LM_ERROR, LM_NOTICE, OpenDDS::DCPS::log_level, monitor_, multitopic_reader_map_, OpenDDS::DCPS::LogLevel::Notice, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::retcode_to_string(), si_lock_, and TheServiceParticipant.

Referenced by delete_contained_entities().

265 {
266  DBG_ENTRY_LVL("SubscriberImpl", "delete_datareader", 6);
267 
268  DataReaderImpl_rch dr_servant = rchandle_from(dynamic_cast<DataReaderImpl*>(a_datareader));
269 
270  if (dr_servant) { // for MultiTopic this will be false
271  const char* reason = " (ERROR: unknown reason)";
273  RcHandle<SubscriberImpl> dr_subscriber = dr_servant->get_subscriber_servant();
274  if (dr_subscriber.get() != this) {
275  reason = "doesn't belong to this subscriber.";
277  } else if (dr_servant->has_zero_copies()) {
278  reason = "has outstanding zero-copy samples loaned out.";
280  } else if (!dr_servant->read_conditions_.empty()) {
281  reason = "has read conditions attached.";
283  }
284  if (rc != DDS::RETCODE_OK) {
285  if (log_level >= LogLevel::Notice) {
286  DDS::TopicDescription_var topic = a_datareader->get_topicdescription();
287  CORBA::String_var topic_name = topic->get_name();
288  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: SubscriberImpl::delete_datareader: "
289  "on reader %C (topic \"%C\") will return \"%C\" because it %C\n",
290  LogGuid(dr_servant->get_id()).c_str(), topic_name.in(),
291  retcode_to_string(rc), reason));
292  }
293  return rc;
294  }
295 
296  // marks entity as deleted and stops future associating
297  dr_servant->prepare_to_delete();
298  }
299 
300  {
302  si_guard,
303  this->si_lock_,
305 
306  DataReaderMap::iterator it;
307  for (it = datareader_map_.begin(); it != datareader_map_.end(); ++it) {
308  if (it->second == dr_servant) {
309  break;
310  }
311  }
312 
313  if (it == datareader_map_.end()) {
314  DDS::TopicDescription_var td = a_datareader->get_topicdescription();
315  CORBA::String_var topic_name = td->get_name();
316 #ifndef OPENDDS_NO_MULTI_TOPIC
317  MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name.in());
318  if (mt_iter != multitopic_reader_map_.end()) {
319  DDS::DataReader_ptr ptr = mt_iter->second;
320  MultiTopicDataReaderBase* mtdrb = dynamic_cast<MultiTopicDataReaderBase*>(ptr);
321  if (!mtdrb) {
322  if (DCPS_debug_level > 0) {
323  ACE_ERROR((LM_ERROR,
324  ACE_TEXT("(%P|%t) ERROR: ")
325  ACE_TEXT("SubscriberImpl::delete_datareader: ")
326  ACE_TEXT("datareader(topic_name=%C)")
327  ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n"),
328  topic_name.in()));
329  }
331  }
332  mtdrb->cleanup();
333  multitopic_reader_map_.erase(mt_iter);
334  return DDS::RETCODE_OK;
335  }
336 #endif
337  if (!dr_servant) {
338  if (DCPS_debug_level > 0) {
339  ACE_ERROR((LM_ERROR,
340  ACE_TEXT("(%P|%t) ERROR: ")
341  ACE_TEXT("SubscriberImpl::delete_datareader: ")
342  ACE_TEXT("datareader(topic_name=%C)")
343  ACE_TEXT("for unknown repo id not found.\n"),
344  topic_name.in()));
345  }
347  }
348  if (DCPS_debug_level > 0) {
349  GUID_t id = dr_servant->get_guid();
350  ACE_ERROR((LM_ERROR,
351  ACE_TEXT("(%P|%t) ERROR: ")
352  ACE_TEXT("SubscriberImpl::delete_datareader: ")
353  ACE_TEXT("datareader(topic_name=%C) %C not found.\n"),
354  topic_name.in(),
355  LogGuid(id).c_str()));
356  }
358  }
359 
360  datareader_map_.erase(it);
361 
363  dr_set_guard,
364  this->dr_set_lock_,
366  datareader_set_.erase(dr_servant);
367  }
368 
369  if (this->monitor_) {
370  this->monitor_->report();
371  }
372 
373  const GUID_t subscription_id = dr_servant->get_guid();
374  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
375  if (!disco->remove_subscription(this->domain_id_,
376  this->dp_id_,
377  subscription_id)) {
378  if (DCPS_debug_level > 0) {
379  ACE_ERROR((LM_ERROR,
380  ACE_TEXT("(%P|%t) ERROR: ")
381  ACE_TEXT("SubscriberImpl::delete_datareader: ")
382  ACE_TEXT(" could not remove subscription from discovery.\n")));
383  }
385  }
386 
387  // Call remove association before unregistering the datareader from the transport,
388  // otherwise some callbacks resulted from remove_association may be lost.
389  dr_servant->remove_all_associations();
390  dr_servant->cleanup();
391  return DDS::RETCODE_OK;
392 }
OpenDDS_Dcps_Export LogLevel log_level
MultitopicReaderMap multitopic_reader_map_
#define ACE_ERROR(X)
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Recursive_Thread_Mutex dr_set_lock_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
const character_type * in(void) const
#define TheServiceParticipant
RcHandle< DataReaderImpl > DataReaderImpl_rch

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::enable ( )
virtual

Implements DDS::Entity.

Definition at line 859 of file SubscriberImpl.cpp.

References ACE_GUARD_RETURN, DDS::EntityFactoryQosPolicy::autoenable_created_entities, dp_id_, DDS::SubscriberQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), monitor_, participant_, qos_, readers_not_enabled_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and si_lock_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_subscriber().

860 {
861  //According spec:
862  // - Calling enable on an already enabled Entity returns OK and has no
863  // effect.
864  // - Calling enable on an Entity whose factory is not enabled will fail
865  // and return PRECONDITION_NOT_MET.
866 
867  if (this->is_enabled()) {
868  return DDS::RETCODE_OK;
869  }
870 
871  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
872  if (!participant || !participant->is_enabled()) {
874  }
875 
876  dp_id_ = participant->get_id();
877 
878  if (this->monitor_) {
879  this->monitor_->report();
880  }
881 
882  this->set_enabled();
883 
885  DataReaderSet readers;
886  {
888  readers_not_enabled_.swap(readers);
889  }
890  for (DataReaderSet::iterator it = readers.begin(); it != readers.end(); ++it) {
891  (*it)->enable();
892  }
893  }
894 
895  return DDS::RETCODE_OK;
896 }
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
WeakRcHandle< DomainParticipantImpl > participant_
ACE_Recursive_Thread_Mutex si_lock_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
EntityFactoryQosPolicy entity_factory

◆ end_access()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::end_access ( )
virtual

Implements DDS::Subscriber.

Definition at line 768 of file SubscriberImpl.cpp.

References access_depth_, DDS::PresentationQosPolicy::access_scope, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datareader_set_, OpenDDS::DCPS::DCPS_debug_level, dr_set_lock_, OpenDDS::DCPS::EntityImpl::enabled_, DDS::GROUP_PRESENTATION_QOS, LM_ERROR, DDS::SubscriberQos::presentation, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and si_lock_.

769 {
770  DataReaderSet to_call;
771  {
773  si_guard,
774  si_lock_,
776  if (!enabled_) {
777  if (DCPS_debug_level > 0) {
778  ACE_ERROR((LM_ERROR,
779  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
780  ACE_TEXT(" Publisher is not enabled!\n")));
781  }
783  }
784 
786  return DDS::RETCODE_OK;
787  }
788 
789  if (access_depth_ == 0) {
790  if (DCPS_debug_level > 0) {
791  ACE_ERROR((LM_ERROR,
792  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::end_access:")
793  ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
794  }
796  }
797 
798  --access_depth_;
799  // We should only notify subscription on the first
800  // and last change to the current change set:
801  if (access_depth_ == 0) {
803  dr_set_guard,
804  dr_set_lock_,
806  to_call = datareader_set_;
807  }
808  }
809 
810  for (DataReaderSet::iterator it = to_call.begin(); it != to_call.end(); ++it) {
811  (*it)->end_access();
812  }
813  return DDS::RETCODE_OK;
814 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PresentationQosPolicy presentation
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex dr_set_lock_
PresentationQosPolicyAccessScopeKind access_scope
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ get_datareaders()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_datareaders ( DDS::DataReaderSeq readers,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
virtual

Definition at line 504 of file SubscriberImpl.cpp.

References access_depth_, DDS::PresentationQosPolicy::access_scope, ACE_GUARD_RETURN, DDS::PresentationQosPolicy::coherent_access, datareader_set_, dr_set_lock_, OpenDDS::DCPS::GroupRakeData::get_datareaders(), DDS::GROUP_PRESENTATION_QOS, DDS::PresentationQosPolicy::ordered_access, DDS::SubscriberQos::presentation, OpenDDS::DCPS::push_back(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

509 {
510  DataReaderSet localreaders;
511  {
513  guard,
514  this->dr_set_lock_,
516  localreaders = datareader_set_;
517  }
518 
519 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
520  // If access_scope is GROUP and ordered_access is true then return readers as
521  // list which may contain same readers multiple times. Otherwise return readers
522  // as set.
524  if (this->access_depth_ == 0 && this->qos_.presentation.coherent_access) {
526  }
527  if (this->qos_.presentation.ordered_access) {
528 
529  GroupRakeData data;
530  for (DataReaderSet::const_iterator pos = localreaders.begin();
531  pos != localreaders.end(); ++pos) {
532  (*pos)->get_ordered_data(data, sample_states, view_states, instance_states);
533  }
534 
535  // Return list of readers in the order of the source timestamp of the received
536  // samples from readers.
537  data.get_datareaders(readers);
538  return DDS::RETCODE_OK;
539  }
540  }
541 #endif
542 
543  // Return set of datareaders.
544  readers.length(0);
545  for (DataReaderSet::const_iterator pos = localreaders.begin();
546  pos != localreaders.end(); ++pos) {
547  if ((*pos)->have_sample_states(sample_states) &&
548  (*pos)->have_view_states(view_states) &&
549  (*pos)->have_instance_states(instance_states)) {
550  push_back(readers, DDS::DataReader::_duplicate(pos->in()));
551  }
552  }
553 
554  return DDS::RETCODE_OK;
555 }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
PresentationQosPolicy presentation
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
ACE_Recursive_Thread_Mutex dr_set_lock_
PresentationQosPolicyAccessScopeKind access_scope

◆ get_default_datareader_qos()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_default_datareader_qos ( DDS::DataReaderQos qos)
virtual

Definition at line 838 of file SubscriberImpl.cpp.

References default_datareader_qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::RTPS::RtpsDiscovery::init_bit(), and OpenDDS::DCPS::StaticDiscovery::init_bit().

840 {
842  return DDS::RETCODE_OK;
843 }
DDS::DataReaderQos default_datareader_qos_
const ReturnCode_t RETCODE_OK

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::SubscriberImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 81 of file SubscriberImpl.cpp.

References handle_.

Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().

82 {
83  return handle_;
84 }
DDS::InstanceHandle_t handle_

◆ get_listener()

DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::get_listener ( )
virtual

Implements DDS::Subscriber.

Definition at line 719 of file SubscriberImpl.cpp.

References listener_, and listener_mutex_.

720 {
722  return DDS::SubscriberListener::_duplicate(listener_.in());
723 }
ACE_Thread_Mutex listener_mutex_
DDS::SubscriberListener_var listener_

◆ get_participant()

DDS::DomainParticipant_ptr OpenDDS::DCPS::SubscriberImpl::get_participant ( )
virtual

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::get_qos ( DDS::SubscriberQos qos)
virtual

Definition at line 699 of file SubscriberImpl.cpp.

References qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::DataReaderImpl::init().

701 {
702  qos = qos_;
703  return DDS::RETCODE_OK;
704 }
const ReturnCode_t RETCODE_OK

◆ get_subscription_ids()

void OpenDDS::DCPS::SubscriberImpl::get_subscription_ids ( SubscriptionIdVec &  subs)

Populates a std::vector with the SubscriptionIds (GUIDs) of this Subscriber's Data Readers

Definition at line 1000 of file SubscriberImpl.cpp.

References ACE_GUARD_RETURN, datareader_map_, and si_lock_.

Referenced by OpenDDS::DCPS::SubscriberMonitorImpl::report().

1001 {
1003  guard,
1004  this->si_lock_,
1005  );
1006 
1007  subs.reserve(datareader_map_.size());
1008  for (DataReaderMap::iterator iter = datareader_map_.begin();
1009  iter != datareader_map_.end();
1010  ++iter) {
1011  subs.push_back(iter->second->get_guid());
1012  }
1013 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex si_lock_

◆ is_clean()

bool OpenDDS::DCPS::SubscriberImpl::is_clean ( String leftover_entities = 0) const

This method is not defined in the IDL and is defined for internal use. Check if there is any datareader associated with it.

Definition at line 898 of file SubscriberImpl.cpp.

References datareader_map_, OpenDDS::DCPS::NUMBER_OF_BUILT_IN_TOPICS, TheTransientKludge, and OpenDDS::DCPS::to_dds_string().

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_subscriber(), and ~SubscriberImpl().

899 {
900  if (leftover_entities) {
901  leftover_entities->clear();
902  }
903 
904  size_t reader_count = datareader_map_.size();
905  if (reader_count && !TheTransientKludge->is_enabled()) {
906  // BIT datareaders.
907  reader_count = reader_count == NUMBER_OF_BUILT_IN_TOPICS ? 0 : reader_count;
908  }
909  if (leftover_entities && reader_count) {
910  *leftover_entities += to_dds_string(reader_count) + " reader(s)";
911  }
912 
913  return reader_count == 0;
914 }
#define TheTransientKludge
const size_t NUMBER_OF_BUILT_IN_TOPICS
String to_dds_string(unsigned short to_convert)

◆ listener_for()

DDS::SubscriberListener_ptr OpenDDS::DCPS::SubscriberImpl::listener_for ( DDS::StatusKind  kind)

Definition at line 968 of file SubscriberImpl.cpp.

References CORBA::is_nil(), listener_, listener_mask_, listener_mutex_, participant_, and ACE_Guard< ACE_LOCK >::release().

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::finish_store_instance_data().

969 {
970  // per 2.1.4.3.1 Listener Access to Plain Communication Status
971  // use this entities factory if listener is mask not enabled
972  // for this kind.
973  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
974  if (! participant)
975  return 0;
976 
978  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
979  g.release();
980  return participant->listener_for(kind);
981 
982  } else {
983  return DDS::SubscriberListener::_duplicate(listener_.in());
984  }
985 }
ACE_Thread_Mutex listener_mutex_
DDS::SubscriberListener_var listener_
WeakRcHandle< DomainParticipantImpl > participant_
Boolean is_nil(T x)

◆ lookup_datareader()

DDS::DataReader_ptr OpenDDS::DCPS::SubscriberImpl::lookup_datareader ( const char *  topic_name)
virtual

Definition at line 468 of file SubscriberImpl.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, multitopic_reader_map_, and si_lock_.

470 {
472  guard,
473  this->si_lock_,
474  DDS::DataReader::_nil());
475 
476  // If multiple entries whose key is "topic_name" then which one is
477  // returned ? Spec does not limit which one should give.
478  DataReaderMap::iterator it = datareader_map_.find(topic_name);
479 
480  if (it == datareader_map_.end()) {
481 #ifndef OPENDDS_NO_MULTI_TOPIC
482  MultitopicReaderMap::iterator mt_iter = multitopic_reader_map_.find(topic_name);
483  if (mt_iter != multitopic_reader_map_.end()) {
484  return DDS::DataReader::_duplicate(mt_iter->second);
485  }
486 #endif
487 
488  if (DCPS_debug_level >= 2) {
489  ACE_DEBUG((LM_DEBUG,
490  ACE_TEXT("(%P|%t) ")
491  ACE_TEXT("SubscriberImpl::lookup_datareader, ")
492  ACE_TEXT("The datareader(topic_name=%C) is not found\n"),
493  topic_name));
494  }
495 
496  return DDS::DataReader::_nil();
497 
498  } else {
499  return DDS::DataReader::_duplicate(it->second.in());
500  }
501 }
#define ACE_DEBUG(X)
MultitopicReaderMap multitopic_reader_map_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ multitopic_reader_enabled()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::multitopic_reader_enabled ( DDS::DataReader_ptr  reader)

Definition at line 951 of file SubscriberImpl.cpp.

References TAO::String_var< charT >::in(), multitopic_reader_map_, and DDS::RETCODE_OK.

Referenced by create_datareader().

952 {
953  DDS::TopicDescription_var td = reader->get_topicdescription();
954  CORBA::String_var topic = td->get_name();
955  multitopic_reader_map_[topic.in()] = DDS::DataReader::_duplicate(reader);
956  return DDS::RETCODE_OK;
957 }
MultitopicReaderMap multitopic_reader_map_
const ReturnCode_t RETCODE_OK
const character_type * in(void) const

◆ notify_datareaders()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::notify_datareaders ( )
virtual

Implements DDS::Subscriber.

Definition at line 558 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DDS::DATA_AVAILABLE_STATUS, datareader_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::MultiTopicDataReaderBase::get_listener(), OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states(), CORBA::is_nil(), LM_ERROR, multitopic_reader_map_, DDS::NOT_READ_SAMPLE_STATE, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::MultiTopicDataReaderBase::set_status_changed_flag(), si_lock_, and TheServiceParticipant.

559 {
560  DataReaderMap localreadermap;
561  {
563  guard,
564  this->si_lock_,
566  localreadermap = datareader_map_;
567  }
568  for (DataReaderMap::iterator it = localreadermap.begin(); it != localreadermap.end(); ++it) {
569  if (it->second->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
570  DDS::DataReaderListener_var listener = it->second->get_listener();
571  if (!it->second->is_bit()) {
572  it->second->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
573  if (listener) {
574  listener->on_data_available(it->second.in());
575  }
576  } else {
577  TheServiceParticipant->job_queue()->enqueue(make_rch<DataReaderImpl::OnDataAvailable>(listener, it->second, listener, true, false));
578  }
579  }
580  }
581 
582 #ifndef OPENDDS_NO_MULTI_TOPIC
583  MultitopicReaderMap localmtr;
584  {
586  guard,
587  this->si_lock_,
589  localmtr = multitopic_reader_map_;
590  }
591 
592  for (MultitopicReaderMap::iterator it = localmtr.begin();
593  it != localmtr.end(); ++it) {
594  MultiTopicDataReaderBase* dri =
595  dynamic_cast<MultiTopicDataReaderBase*>(it->second.in());
596 
597  if (!dri) {
598  if (DCPS_debug_level > 0) {
599  ACE_ERROR((LM_ERROR,
600  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::notify_datareaders: ")
601  ACE_TEXT("failed to obtain MultiTopicDataReaderBase.\n")));
602  }
604  }
605 
606  if (dri->have_sample_states(DDS::NOT_READ_SAMPLE_STATE)) {
607  DDS::DataReaderListener_var listener = dri->get_listener();
608  dri->set_status_changed_flag(DDS::DATA_AVAILABLE_STATUS, false);
609  if (!CORBA::is_nil(listener)) {
610  listener->on_data_available(dri);
611  }
612  }
613  }
614 #endif
615 
616  return DDS::RETCODE_OK;
617 }
MultitopicReaderMap multitopic_reader_map_
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const StatusKind DATA_AVAILABLE_STATUS
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const SampleStateKind NOT_READ_SAMPLE_STATE
#define TheServiceParticipant
Boolean is_nil(T x)

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP ( String  ,
DDS::DataReader_var   
)
private

◆ OPENDDS_MAP_CMP()

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::DataReaderQos  ,
GUID_tKeyLessThan   
)
private

DataReader id to qos map.

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_MULTIMAP ( OPENDDS_STRING  ,
DataReaderImpl_rch   
)
private

Keep track of all the DataReaders attached to this Subscriber: key is the topic_name

◆ OPENDDS_SET()

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_SET ( DataReaderImpl_rch  )
private

Keep track of DataReaders with data std::set for now, want to encapsulate this so we can switch between a set or list depending on Presentation QoS.

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::SubscriberImpl::OPENDDS_VECTOR ( GUID_t  )

◆ parent()

RcHandle< EntityImpl > OpenDDS::DCPS::SubscriberImpl::parent ( void  ) const
virtual

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1088 of file SubscriberImpl.cpp.

References participant_.

1089 {
1090  return this->participant_.lock();
1091 }
WeakRcHandle< DomainParticipantImpl > participant_

◆ raw_latency_buffer_size()

unsigned int & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size ( )

Configure the size of the raw data collection buffer.

Definition at line 988 of file SubscriberImpl.cpp.

References raw_latency_buffer_size_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

989 {
990  return this->raw_latency_buffer_size_;
991 }
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffers.

◆ raw_latency_buffer_type()

DataCollector< double >::OnFull & OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type ( )

Configure the type of the raw data collection buffer.

Definition at line 994 of file SubscriberImpl.cpp.

References raw_latency_buffer_type_.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

995 {
996  return this->raw_latency_buffer_type_;
997 }
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffers.

◆ reader_enabled()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::reader_enabled ( const char *  topic_name,
DataReaderImpl reader 
)

Definition at line 926 of file SubscriberImpl.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, monitor_, OpenDDS::DCPS::rchandle_from(), readers_not_enabled_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and si_lock_.

928 {
929  if (DCPS_debug_level >= 4) {
930  ACE_DEBUG((LM_DEBUG,
931  ACE_TEXT("(%P|%t) SubscriberImpl::reader_enabled, ")
932  ACE_TEXT("datareader(topic_name=%C) enabled\n"),
933  topic_name));
934  }
935 
937  DataReaderImpl_rch reader = rchandle_from(reader_ptr);
938  readers_not_enabled_.erase(reader);
939 
940  this->datareader_map_.insert(DataReaderMap::value_type(topic_name, reader));
941 
942  if (this->monitor_) {
943  this->monitor_->report();
944  }
945 
946  return DDS::RETCODE_OK;
947 }
#define ACE_DEBUG(X)
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
unique_ptr< Monitor > monitor_
Monitor object for this entity.
RcHandle< DataReaderImpl > DataReaderImpl_rch

◆ remove_from_datareader_set()

void OpenDDS::DCPS::SubscriberImpl::remove_from_datareader_set ( DataReaderImpl reader)

Definition at line 960 of file SubscriberImpl.cpp.

References ACE_GUARD, datareader_set_, dr_set_lock_, and OpenDDS::DCPS::rchandle_from().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().

961 {
963  datareader_set_.erase(rchandle_from(reader));
964 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_Recursive_Thread_Mutex dr_set_lock_

◆ set_default_datareader_qos()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_default_datareader_qos ( const DDS::DataReaderQos qos)
virtual

Definition at line 825 of file SubscriberImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), default_datareader_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().

827 {
828  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
830  return DDS::RETCODE_OK;
831 
832  } else {
834  }
835 }
DDS::DataReaderQos default_datareader_qos_
const ReturnCode_t RETCODE_OK
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_listener ( DDS::SubscriberListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 707 of file SubscriberImpl.cpp.

References listener_, listener_mask_, listener_mutex_, and DDS::RETCODE_OK.

710 {
712  listener_mask_ = mask;
713  //note: OK to duplicate a nil object ref
714  listener_ = DDS::SubscriberListener::_duplicate(a_listener);
715  return DDS::RETCODE_OK;
716 }
ACE_Thread_Mutex listener_mutex_
const ReturnCode_t RETCODE_OK
DDS::SubscriberListener_var listener_

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::SubscriberImpl::set_qos ( const DDS::SubscriberQos qos)
virtual

Definition at line 620 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), datareader_map_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, dp_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, si_lock_, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

622 {
623 
625 
626  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
627  if (qos_ == qos)
628  return DDS::RETCODE_OK;
629 
630  // for the not changeable qos, it can be changed before enable
631  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
633 
634  } else {
635  qos_ = qos;
636 
637  DrIdToQosMap idToQosMap;
638  {
640  guard,
641  this->si_lock_,
643  // after FaceCTS bug 619 is fixed, make endIter and iter const iteratorsx
644  DataReaderMap::iterator endIter = datareader_map_.end();
645 
646  for (DataReaderMap::iterator iter = datareader_map_.begin();
647  iter != endIter; ++iter) {
648  DataReaderImpl_rch reader = iter->second;
649  reader->set_subscriber_qos (qos);
650  DDS::DataReaderQos qos = reader->qos_;
651  GUID_t id = reader->get_guid();
652  std::pair<DrIdToQosMap::iterator, bool> pair
653  = idToQosMap.insert(DrIdToQosMap::value_type(id, qos));
654 
655  if (!pair.second) {
656  if (DCPS_debug_level > 0) {
657  ACE_ERROR((LM_ERROR,
658  ACE_TEXT("(%P|%t) ERROR: SubscriberImpl::set_qos: ")
659  ACE_TEXT("insert %C to DrIdToQosMap failed.\n"),
660  LogGuid(id).c_str()));
661  }
663  }
664  }
665  }
666 
667  DrIdToQosMap::iterator iter = idToQosMap.begin();
668 
669  while (iter != idToQosMap.end()) {
670  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
671  const bool status
672  = disco->update_subscription_qos(this->domain_id_,
673  this->dp_id_,
674  iter->first,
675  iter->second,
676  this->qos_);
677 
678  if (!status) {
679  if (DCPS_debug_level > 0) {
680  ACE_ERROR((LM_ERROR,
681  ACE_TEXT("(%P|%t) SubscriberImpl::set_qos, ")
682  ACE_TEXT("failed.\n")));
683  }
684  return DDS::RETCODE_ERROR;
685  }
686 
687  ++iter;
688  }
689  }
690 
691  return DDS::RETCODE_OK;
692 
693  } else {
695  }
696 }
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
ACE_Recursive_Thread_Mutex si_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const ReturnCode_t RETCODE_UNSUPPORTED
#define TheServiceParticipant
RcHandle< DataReaderImpl > DataReaderImpl_rch
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82

◆ update_ownership_strength()

void OpenDDS::DCPS::SubscriberImpl::update_ownership_strength ( const GUID_t pub_id,
const CORBA::Long ownership_strength 
)

Definition at line 1017 of file SubscriberImpl.cpp.

References ACE_GUARD_RETURN, datareader_map_, and si_lock_.

1019 {
1021  guard,
1022  this->si_lock_,
1023  );
1024 
1025  for (DataReaderMap::iterator iter = datareader_map_.begin();
1026  iter != datareader_map_.end();
1027  ++iter) {
1028  if (!iter->second->is_bit()) {
1029  iter->second->update_ownership_strength(pub_id, ownership_strength);
1030  }
1031  }
1032 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex si_lock_

◆ validate_datareader_qos()

bool OpenDDS::DCPS::SubscriberImpl::validate_datareader_qos ( const DDS::DataReaderQos qos,
const DDS::DataReaderQos default_qos,
DDS::Topic_ptr  a_topic,
DDS::DataReaderQos result_qos,
bool  mt 
)
static

Definition at line 1094 of file SubscriberImpl.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DATAREADER_QOS_DEFAULT, DATAREADER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().

Referenced by create_datareader(), and OpenDDS::DCPS::DomainParticipantImpl::create_recorder().

1099 {
1100 
1101 
1102  if (qos == DATAREADER_QOS_DEFAULT) {
1103  dr_qos = default_qos;
1104 
1105  } else if (qos == DATAREADER_QOS_USE_TOPIC_QOS) {
1106 
1107 #ifndef OPENDDS_NO_MULTI_TOPIC
1108  if (mt) {
1109  if (DCPS_debug_level > 0) {
1110  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
1111  ACE_TEXT("SubscriberImpl::create_datareader, ")
1112  ACE_TEXT("DATAREADER_QOS_USE_TOPIC_QOS can not be used ")
1113  ACE_TEXT("to create a MultiTopic DataReader.\n")));
1114  }
1115  return false;
1116  }
1117 #else
1118  ACE_UNUSED_ARG(mt);
1119 #endif
1120  DDS::TopicQos topic_qos;
1121  a_topic->get_qos(topic_qos);
1122 
1123  dr_qos = default_qos;
1124 
1126  topic_qos);
1127 
1128  } else {
1129  dr_qos = qos;
1130  }
1131 
1135 
1136  if (!Qos_Helper::valid(dr_qos)) {
1137  if (DCPS_debug_level > 0) {
1138  ACE_ERROR((LM_ERROR,
1139  ACE_TEXT("(%P|%t) ERROR: ")
1140  ACE_TEXT("SubscriberImpl::create_datareader, ")
1141  ACE_TEXT("invalid qos.\n")));
1142  }
1143  return false;
1144  }
1145 
1146  if (!Qos_Helper::consistent(dr_qos)) {
1147  if (DCPS_debug_level > 0) {
1148  ACE_ERROR((LM_ERROR,
1149  ACE_TEXT("(%P|%t) ERROR: ")
1150  ACE_TEXT("SubscriberImpl::create_datareader, ")
1151  ACE_TEXT("inconsistent qos.\n")));
1152  }
1153  return false;
1154  }
1155 
1156  return true;
1157 }
#define ACE_ERROR(X)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define DATAREADER_QOS_DEFAULT
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define DATAREADER_QOS_USE_TOPIC_QOS
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)

Member Data Documentation

◆ access_depth_

int OpenDDS::DCPS::SubscriberImpl::access_depth_
private

Definition at line 219 of file SubscriberImpl.h.

Referenced by begin_access(), end_access(), and get_datareaders().

◆ datareader_map_

DataReaderMap OpenDDS::DCPS::SubscriberImpl::datareader_map_
private

◆ datareader_set_

DataReaderSet OpenDDS::DCPS::SubscriberImpl::datareader_set_
private

◆ default_datareader_qos_

DDS::DataReaderQos OpenDDS::DCPS::SubscriberImpl::default_datareader_qos_
private

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::SubscriberImpl::domain_id_
private

Definition at line 198 of file SubscriberImpl.h.

Referenced by delete_datareader(), and set_qos().

◆ dp_id_

GUID_t OpenDDS::DCPS::SubscriberImpl::dp_id_
private

Definition at line 199 of file SubscriberImpl.h.

Referenced by enable(), and set_qos().

◆ dr_set_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::SubscriberImpl::dr_set_lock_
private

This lock protects datareader_set_. Only this lock needs to be acquired if only datareader_set_ is accessed.

Definition at line 209 of file SubscriberImpl.h.

Referenced by begin_access(), coherent_change_received(), data_received(), delete_datareader(), end_access(), get_datareaders(), and remove_from_datareader_set().

◆ handle_

DDS::InstanceHandle_t OpenDDS::DCPS::SubscriberImpl::handle_
private

Definition at line 178 of file SubscriberImpl.h.

Referenced by get_instance_handle(), and ~SubscriberImpl().

◆ listener_

DDS::SubscriberListener_var OpenDDS::DCPS::SubscriberImpl::listener_
private

Definition at line 185 of file SubscriberImpl.h.

Referenced by get_listener(), listener_for(), set_listener(), and SubscriberImpl().

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::SubscriberImpl::listener_mask_
private

Definition at line 184 of file SubscriberImpl.h.

Referenced by listener_for(), and set_listener().

◆ listener_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::SubscriberImpl::listener_mutex_
private

Definition at line 183 of file SubscriberImpl.h.

Referenced by get_listener(), listener_for(), and set_listener().

◆ monitor_

unique_ptr<Monitor> OpenDDS::DCPS::SubscriberImpl::monitor_
private

Monitor object for this entity.

Definition at line 217 of file SubscriberImpl.h.

Referenced by delete_datareader(), enable(), reader_enabled(), and SubscriberImpl().

◆ multitopic_reader_map_

MultitopicReaderMap OpenDDS::DCPS::SubscriberImpl::multitopic_reader_map_
private

◆ participant_

WeakRcHandle<DomainParticipantImpl> OpenDDS::DCPS::SubscriberImpl::participant_
private

◆ qos_

DDS::SubscriberQos OpenDDS::DCPS::SubscriberImpl::qos_
private

◆ raw_latency_buffer_size_

unsigned int OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_size_
private

Bound (or initial reservation) of raw latency buffers.

Definition at line 202 of file SubscriberImpl.h.

Referenced by create_datareader(), and raw_latency_buffer_size().

◆ raw_latency_buffer_type_

DataCollector<double>::OnFull OpenDDS::DCPS::SubscriberImpl::raw_latency_buffer_type_
private

Type of raw latency data buffers.

Definition at line 205 of file SubscriberImpl.h.

Referenced by create_datareader(), and raw_latency_buffer_type().

◆ readers_not_enabled_

DataReaderSet OpenDDS::DCPS::SubscriberImpl::readers_not_enabled_
private

Definition at line 187 of file SubscriberImpl.h.

Referenced by create_datareader(), enable(), and reader_enabled().

◆ si_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::SubscriberImpl::si_lock_
private

General lock protects the data structures in this class. If datareader_set_ is accessed together with other data members, acquire dr_set_lock_ in the scope of this lock.

Definition at line 214 of file SubscriberImpl.h.

Referenced by begin_access(), contains_reader(), create_datareader(), delete_contained_entities(), delete_datareader(), enable(), end_access(), get_subscription_ids(), lookup_datareader(), notify_datareaders(), reader_enabled(), set_qos(), and update_ownership_strength().


The documentation for this class was generated from the following files: