OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Protected Types | Protected Member Functions | Static Protected Member Functions | Protected Attributes | Static Protected Attributes | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::DataReaderImpl Class Referenceabstract

Implements the DDS::DataReader interface. More...

#include <DataReaderImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl:
Collaboration graph
[legend]

Classes

struct  GenericBundle
 
class  LivelinessTimer
 
class  OnDataAvailable
 
class  OnDataOnReaders
 
class  OwnershipManagerPtr
 
struct  OwnershipManagerScopedAccess
 

Public Types

typedef std::pair< GUID_t, WriterInfo::WriterStateWriterStatePair
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
typedef DataReaderEx ::_ptr_type _ptr_type
 
typedef DataReaderEx ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

typedef OPENDDS_MAP (DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType
 
typedef OPENDDS_SET (DDS::InstanceHandle_t) InstanceSet
 
typedef OPENDDS_SET (SubscriptionInstance_rch) SubscriptionInstanceSet
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType
 Type of collection of statistics for writers to this reader. More...
 
 DataReaderImpl ()
 
virtual ~DataReaderImpl ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
virtual void add_association (const GUID_t &yourId, const WriterAssociation &writer, bool active)
 
virtual void transport_assoc_done (int flags, const GUID_t &remote_id)
 
virtual void remove_associations (const WriterIdSeq &writers, bool callback)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void signal_liveliness (const GUID_t &remote_participant)
 
DDS::DataReaderListener_ptr listener_for (DDS::StatusKind kind)
 
void writer_became_alive (WriterInfo &info, const MonotonicTimePoint &when)
 
void writer_became_dead (WriterInfo &info)
 
void writer_removed (WriterInfo &info)
 
virtual void cleanup ()
 
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
 
virtual DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
 
virtual DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t delete_contained_entities ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::DataReaderListener_ptr get_listener ()
 
virtual DDS::TopicDescription_ptr get_topicdescription ()
 
virtual DDS::Subscriber_ptr get_subscriber ()
 
virtual DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
 
virtual DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
 
virtual DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
 
virtual DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
 
virtual DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
 
virtual DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
 
virtual DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
 
virtual DDS::ReturnCode_t enable ()
 
virtual void get_latency_stats (LatencyStatisticsSeq &stats)
 
virtual void reset_latency_stats ()
 Clear any intermediate statistical values. More...
 
virtual CORBA::Boolean statistics_enabled ()
 
virtual void statistics_enabled (CORBA::Boolean statistics_enabled)
 
void writer_activity (const DataSampleHeader &header)
 update liveliness info for this writer. More...
 
virtual void data_received (const ReceivedDataSample &sample)
 process a message that has been received - could be control or a data sample. More...
 
void transport_discovery_change ()
 
virtual bool check_transport_qos (const TransportInst &inst)
 
bool have_sample_states (DDS::SampleStateMask sample_states) const
 
bool have_view_states (DDS::ViewStateMask view_states) const
 
bool have_instance_states (DDS::InstanceStateMask instance_states) const
 
bool contains_sample (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)=0
 
virtual RcHandle< MessageHolderdds_demarshal (const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type, bool full_copy)=0
 
virtual void dispose_unregister (const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance)
 
void process_latency (const ReceivedDataSample &sample)
 
void notify_latency (GUID_t writer)
 
size_t get_depth () const
 
size_t get_n_chunks () const
 
void liveliness_lost ()
 
void remove_all_associations ()
 
void notify_subscription_disconnected (const WriterIdSeq &pubids)
 
void notify_subscription_reconnected (const WriterIdSeq &pubids)
 
void notify_subscription_lost (const WriterIdSeq &pubids)
 
void notify_liveliness_change ()
 
bool is_bit () const
 
bool has_zero_copies ()
 
void release_instance (DDS::InstanceHandle_t handle)
 Release the instance with the handle. More...
 
void state_updated (DDS::InstanceHandle_t handle)
 
virtual void release_all_instances ()=0
 Release all instances held by the reader. More...
 
ACE_Reactor_Timer_Interfaceget_reactor ()
 
GUID_t get_topic_id ()
 
GUID_t get_dp_id ()
 
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
 
void get_instance_handles (InstanceHandleVec &instance_handles)
 
typedef OPENDDS_VECTOR (WriterStatePair) WriterStatePairVec
 
void get_writer_states (WriterStatePairVec &writer_states)
 
void update_ownership_strength (const GUID_t &pub_id, const CORBA::Long &ownership_strength)
 
OwnershipManagerPtr ownership_manager ()
 
virtual void lookup_instance (const ReceivedDataSample &sample, SubscriptionInstance_rch &instance)=0
 
void enable_filtering (ContentFilteredTopicImpl *cft)
 
DDS::ContentFilteredTopic_ptr get_cf_topic () const
 
void enable_multi_topic (MultiTopicImpl *mt)
 
void update_subscription_params (const DDS::StringSeq &params) const
 
typedef OPENDDS_VECTOR (void *) GenericSeq
 
virtual DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)=0
 
virtual DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
 
virtual DDS::InstanceHandle_t lookup_instance_generic (const void *data)=0
 
virtual DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
 
virtual DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)=0
 
void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
 
void begin_access ()
 
void end_access ()
 
void get_ordered_data (GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
void accept_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void reject_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void coherent_change_received (const GUID_t &publisher_id, Coherent_State &result)
 
void coherent_changes_completed (DataReaderImpl *reader)
 
void reset_coherent_info (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void set_subscriber_qos (const DDS::SubscriberQos &qos)
 
void reset_ownership (DDS::InstanceHandle_t instance)
 
virtual RcHandle< EntityImplparent () const
 
void disable_transport ()
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
GUID_t get_guid () const
 
void return_handle (DDS::InstanceHandle_t handle)
 
const ValueDispatcherget_value_dispatcher () const
 
Raw Latency Statistics Interfaces
const StatsMapType & raw_latency_statistics () const
 Expose the statistics container. More...
 
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer. More...
 
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderEx
void get_latency_stats (inout LatencyStatisticsSeq stats)
 Obtain a sequence of statistics summaries. More...
 
- Public Member Functions inherited from DDS::DataReader
ReadCondition create_readcondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states)
 
QueryCondition create_querycondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states, in string query_expression, in StringSeq query_parameters)
 
ReturnCode_t delete_readcondition (in ReadCondition a_condition)
 
ReturnCode_t set_qos (in DataReaderQos qos)
 
ReturnCode_t get_qos (inout DataReaderQos qos)
 
ReturnCode_t set_listener (in DataReaderListener a_listener, in StatusMask mask)
 
ReturnCode_t get_sample_rejected_status (inout SampleRejectedStatus status)
 
ReturnCode_t get_liveliness_changed_status (inout LivelinessChangedStatus status)
 
ReturnCode_t get_requested_deadline_missed_status (inout RequestedDeadlineMissedStatus status)
 
ReturnCode_t get_requested_incompatible_qos_status (inout RequestedIncompatibleQosStatus status)
 
ReturnCode_t get_subscription_matched_status (inout SubscriptionMatchedStatus status)
 
ReturnCode_t get_sample_lost_status (inout SampleLostStatus status)
 
ReturnCode_t wait_for_historical_data (in Duration_t max_wait)
 
ReturnCode_t get_matched_publications (inout InstanceHandleSeq publication_handles)
 
ReturnCode_t get_matched_publication_data (inout PublicationBuiltinTopicData publication_data, in InstanceHandle_t publication_handle)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderCallbacks
 DataReaderCallbacks ()
 
virtual ~DataReaderCallbacks ()
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
virtual ~TransportReceiveListener ()
 

Protected Types

typedef ACE_Reverse_Lock< ACE_Recursive_Thread_MutexReverse_Lock_t
 

Protected Member Functions

typedef OPENDDS_SET (DDS::InstanceHandle_t) HandleSet
 
typedef OPENDDS_MAP (CORBA::ULong, HandleSet) LookupMap
 
void initialize_lookup_maps ()
 
void update_lookup_maps (const SubscriptionInstanceMapType::iterator &input)
 
void remove_from_lookup_maps (DDS::InstanceHandle_t handle)
 
const HandleSet & lookup_matching_instances (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
 
DataReaderListener_ptr get_ext_listener ()
 
virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
 
void prepare_to_delete ()
 
DDS::ReturnCode_t setup_deserialization ()
 Setup deserialization options. More...
 
RcHandle< SubscriberImplget_subscriber_servant ()
 
void post_read_or_take ()
 
virtual DDS::ReturnCode_t enable_specific ()=0
 
void sample_info (DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
 
CORBA::Long total_samples () const
 
void set_sample_lost_status (const DDS::SampleLostStatus &status)
 
void set_sample_rejected_status (const DDS::SampleRejectedStatus &status)
 
SubscriptionInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
 
DDS::InstanceHandle_t get_next_handle (const DDS::BuiltinTopicKey_t &key)
 
virtual void purge_data (SubscriptionInstance_rch instance)=0
 
virtual void release_instance_i (DDS::InstanceHandle_t handle)=0
 
virtual void state_updated_i (DDS::InstanceHandle_t handle)=0
 
bool has_readcondition (DDS::ReadCondition_ptr a_condition)
 
bool filter_sample (const DataSampleHeader &header)
 
bool ownership_filter_instance (const SubscriptionInstance_rch &instance, const GUID_t &pubid)
 
bool time_based_filter_instance (const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
 
void accept_sample_processing (const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
 
virtual void qos_change (const DDS::DataReaderQos &qos)
 
void notify_read_conditions ()
 Data has arrived into the cache, unblock waiting ReadConditions. More...
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
typedef OPENDDS_SET (Encoding::Kind) EncodingKinds
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
 TransportReceiveListener ()
 

Static Protected Member Functions

static CORBA::ULong to_combined_states (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
 
static void split_combined_states (CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
 

Protected Attributes

LookupMap combined_state_lookup_
 
SubscriptionInstanceMapType instances_
 : document why the instances_ container is mutable. More...
 
ACE_Recursive_Thread_Mutex instances_lock_
 
bool has_subscription_id_
 
ACE_Thread_Mutex subscription_id_mutex_
 
ConditionVariable< ACE_Thread_Mutexsubscription_id_condition_
 
unique_ptr< ReceivedDataAllocatorrd_allocator_
 
DDS::DataReaderQos qos_
 
DDS::DataReaderQos passed_qos_
 
DDS::SampleRejectedStatus sample_rejected_status_
 
DDS::SampleLostStatus sample_lost_status_
 
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses. More...
 
Reverse_Lock_t reverse_sample_lock_
 
WeakRcHandle< DomainParticipantImplparticipant_servant_
 
TopicDescriptionPtr< TopicImpltopic_servant_
 
TypeSupportImpltype_support_
 
GUID_t topic_id_
 
bool is_exclusive_ownership_
 
ACE_Thread_Mutex content_filtered_topic_mutex_
 
TopicDescriptionPtr< ContentFilteredTopicImplcontent_filtered_topic_
 
TopicDescriptionPtr< MultiTopicImplmulti_topic_
 
bool coherent_
 Is accessing to Group coherent changes ? More...
 
GroupRakeData group_coherent_ordered_data_
 Ordered group samples. More...
 
DDS::SubscriberQos subqos_
 
EncodingKinds decoding_modes_
 
Security::SecurityConfig_rch security_config_
 
DDS::DynamicType_var dynamic_type_
 
TransportMessageBlockAllocator mb_alloc_
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Static Protected Attributes

static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE
 
static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u
 
static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE
 
static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u
 
static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
 
static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u
 
static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS
 
static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS
 

Private Types

typedef PmfSporadicTask< DataReaderImplDRISporadicTask
 
typedef VarLess< DDS::ReadConditionRCCompLess
 

Private Member Functions

virtual void install_type_support (TypeSupportImpl *)
 
virtual void set_instance_state_i (DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &guid)=0
 
void notify_subscription_lost (const DDS::InstanceHandleSeq &handles)
 
void lookup_instance_handles (const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the publication repo ids. More...
 
void instances_liveliness_update (const GUID_t &writer, DDS::InstanceHandle_t publication_handle)
 
bool verify_coherent_changes_completion (WriterInfo *writer)
 
bool coherent_change_received (WriterInfo *writer)
 
RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
DDS::DomainId_t domain_id () const
 
Priority get_priority_value (const AssociationData &data) const
 
DDS::Security::ParticipantCryptoHandle get_crypto_handle () const
 
void resume_sample_processing (const GUID_t &pub_id)
 when done handling historic samples, resume More...
 
bool check_historic (const ReceivedDataSample &sample)
 
void deliver_historic (OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
 deliver samples that were held by check_historic() More...
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
 
typedef OPENDDS_MULTIMAP (MonotonicTimePoint, SubscriptionInstance_rch) DeadlineQueue
 
void schedule_deadline (SubscriptionInstance_rch instance, bool timer_called)
 
void reset_deadline_period (const TimeDuration &deadline_period)
 
void reschedule_deadline (SubscriptionInstance_rch instance, const MonotonicTimePoint &now)
 
void cancel_deadline (SubscriptionInstance_rch instance)
 
void cancel_all_deadlines ()
 
void deadline_task (const MonotonicTimePoint &now)
 
void process_deadline (SubscriptionInstance_rch instance, const MonotonicTimePoint &now, bool timer_called)
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterInfo_rch, GUID_tKeyLessThan) WriterMapType
 publications writing to this reader. More...
 
typedef OPENDDS_SET_CMP (DDS::ReadCondition_var, RCCompLess) ReadConditionSet
 
- Private Member Functions inherited from OpenDDS::DCPS::WriterInfoListener
 WriterInfoListener ()
 
virtual ~WriterInfoListener ()
 
- Private Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
 RcObject ()
 

Private Attributes

DDS::TopicDescription_var topic_desc_
 
ACE_Thread_Mutex listener_mutex_
 
DDS::StatusMask listener_mask_
 
DDS::DataReaderListener_var listener_
 
DDS::DomainId_t domain_id_
 
GUID_t dp_id_
 
WeakRcHandle< SubscriberImplsubscriber_servant_
 
RcHandle< EndHistoricSamplesMissedSweeperend_historic_sweeper_
 
CORBA::Long depth_
 
size_t n_chunks_
 
ACE_Recursive_Thread_Mutex publication_handle_lock_
 
RepoIdToHandleMap publication_id_to_handle_map_
 
DDS::LivelinessChangedStatus liveliness_changed_status_
 
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
 
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
 
DDS::SubscriptionMatchedStatus subscription_match_status_
 
BudgetExceededStatus budget_exceeded_status_
 
SubscriptionLostStatus subscription_lost_status_
 
ACE_Reactor_Timer_Interfacereactor_
 
RcHandle< LivelinessTimerliveliness_timer_
 
CORBA::Long last_deadline_missed_total_count_
 
TimeDuration deadline_period_
 
DeadlineQueue deadline_queue_
 
bool deadline_queue_enabled_
 
RcHandle< DRISporadicTaskdeadline_task_
 
bool is_bit_
 
bool always_get_history_
 
AtomicBool statistics_enabled_
 Flag indicating status of statistics gathering. More...
 
WriterMapType writers_
 
ACE_RW_Thread_Mutex writers_lock_
 RW lock for reading/writing publications. More...
 
StatsMapType statistics_
 Statistics for this reader, collected for each writer. More...
 
ACE_Recursive_Thread_Mutex statistics_lock_
 
unsigned int raw_latency_buffer_size_
 Bound (or initial reservation) of raw latency buffer. More...
 
DataCollector< double >::OnFull raw_latency_buffer_type_
 Type of raw latency data buffer. More...
 
ReadConditionSet read_conditions_
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
unique_ptr< Monitorperiodic_monitor_
 Periodic Monitor object for this entity. More...
 
bool transport_disabled_
 
- Private Attributes inherited from OpenDDS::DCPS::WriterInfoListener
GUID_t subscription_id_
 
TimeDuration liveliness_lease_duration_
 

Friends

class RequestedDeadlineWatchdog
 
class QueryConditionImpl
 
class SubscriberImpl
 
class OwnershipManagerPtr
 
class InstanceState
 
class EndHistoricSamplesMissedSweeper
 
class ::DDS_TEST
 

Additional Inherited Members

- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Public Attributes inherited from OpenDDS::DCPS::DataReaderEx
attribute boolean statistics_enabled
 Statistics gathering enable state. More...
 

Detailed Description

Implements the DDS::DataReader interface.

See the DDS specification, OMG formal/2015-04-10, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datareader which is specific to the data-type associated with the topic.

Definition at line 207 of file DataReaderImpl.h.

Member Typedef Documentation

◆ DRISporadicTask

Definition at line 994 of file DataReaderImpl.h.

◆ RCCompLess

Definition at line 1037 of file DataReaderImpl.h.

◆ Reverse_Lock_t

Definition at line 782 of file DataReaderImpl.h.

◆ WriterStatePair

Definition at line 455 of file DataReaderImpl.h.

Constructor & Destructor Documentation

◆ DataReaderImpl()

OpenDDS::DCPS::DataReaderImpl::DataReaderImpl ( )

Definition at line 60 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count, DDS::LivelinessChangedStatus::alive_count_change, budget_exceeded_status_, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DDS::HANDLE_NIL, OpenDDS::DCPS::BudgetExceededStatus::last_instance_handle, DDS::SampleRejectedStatus::last_instance_handle, DDS::RequestedDeadlineMissedStatus::last_instance_handle, DDS::RequestedIncompatibleQosStatus::last_policy_id, DDS::LivelinessChangedStatus::last_publication_handle, DDS::SubscriptionMatchedStatus::last_publication_handle, DDS::SampleRejectedStatus::last_reason, liveliness_changed_status_, monitor_, DDS::LivelinessChangedStatus::not_alive_count, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::NOT_REJECTED, periodic_monitor_, DDS::RequestedIncompatibleQosStatus::policies, reactor_, requested_deadline_missed_status_, requested_incompatible_qos_status_, sample_lost_status_, sample_rejected_status_, subscription_match_status_, TheServiceParticipant, OpenDDS::DCPS::BudgetExceededStatus::total_count, DDS::SampleLostStatus::total_count, DDS::SampleRejectedStatus::total_count, DDS::RequestedDeadlineMissedStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, DDS::SubscriptionMatchedStatus::total_count, OpenDDS::DCPS::BudgetExceededStatus::total_count_change, DDS::SampleLostStatus::total_count_change, DDS::SampleRejectedStatus::total_count_change, DDS::RequestedDeadlineMissedStatus::total_count_change, DDS::RequestedIncompatibleQosStatus::total_count_change, and DDS::SubscriptionMatchedStatus::total_count_change.

61  : has_subscription_id_(false)
64  , qos_(TheServiceParticipant->initial_DataReaderQos())
66  , topic_servant_(0)
67  , type_support_(0)
69 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
71 #endif
72  , coherent_(false)
73  , subqos_(TheServiceParticipant->initial_SubscriberQos())
74  , topic_desc_(0)
76  , domain_id_(0)
77  , end_historic_sweeper_(make_rch<EndHistoricSamplesMissedSweeper>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
78  , n_chunks_(TheServiceParticipant->n_chunks())
79  , reactor_(0)
80  , liveliness_timer_(make_rch<LivelinessTimer>(TheServiceParticipant->reactor(), TheServiceParticipant->reactor_owner(), this))
83  , deadline_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl::deadline_task))
84  , is_bit_(false)
85  , always_get_history_(false)
86  , statistics_enabled_(false)
89  , transport_disabled_(false)
91 {
93 
100 
105 
110 
117 
120 
125 
129 
130  monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_monitor(this));
131  periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_reader_periodic_monitor(this));
132 }
AtomicBool statistics_enabled_
Flag indicating status of statistics gathering.
RcHandle< LivelinessTimer > liveliness_timer_
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
BudgetExceededStatus budget_exceeded_status_
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
RcHandle< DRISporadicTask > deadline_task_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
const InstanceHandle_t HANDLE_NIL
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const DDS::StatusMask DEFAULT_STATUS_MASK
CORBA::Long last_deadline_missed_total_count_
DDS::LivelinessChangedStatus liveliness_changed_status_
ACE_Thread_Mutex subscription_id_mutex_
void deadline_task(const MonotonicTimePoint &now)
TopicDescriptionPtr< TopicImpl > topic_servant_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
TypeSupportImpl * type_support_
DDS::SampleRejectedStatus sample_rejected_status_
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
DDS::SubscriptionMatchedStatus subscription_match_status_
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::SampleLostStatus sample_lost_status_
DDS::TopicDescription_var topic_desc_
bool coherent_
Is accessing to Group coherent changes ?
#define TheServiceParticipant
ACE_Reactor_Timer_Interface * reactor_
SampleRejectedStatusKind last_reason
TransportMessageBlockAllocator mb_alloc_

◆ ~DataReaderImpl()

OpenDDS::DCPS::DataReaderImpl::~DataReaderImpl ( )
virtual

Definition at line 136 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, deadline_task_, participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

137 {
138  DBG_ENTRY_LVL("DataReaderImpl", "~DataReaderImpl", 6);
139 
140  deadline_task_->cancel();
141 
142 #ifndef OPENDDS_SAFETY_PROFILE
143  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
144  if (participant) {
145  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
146  if (type_lookup_service) {
147  type_lookup_service->remove_guid_from_dynamic_map(subscription_id_);
148  }
149  }
150 #endif
151 }
RcHandle< DRISporadicTask > deadline_task_
DCPS::RcHandle< TypeLookupService > TypeLookupService_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
WeakRcHandle< DomainParticipantImpl > participant_servant_

Member Function Documentation

◆ accept_coherent()

void OpenDDS::DCPS::DataReaderImpl::accept_coherent ( const GUID_t writer_id,
const GUID_t publisher_id 
)

Definition at line 2914 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

2916 {
2917  if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2918  ACE_DEBUG((LM_DEBUG,
2919  ACE_TEXT("(%P|%t) DataReaderImpl::accept_coherent()")
2920  ACE_TEXT(" reader %C writer %C publisher %C\n"),
2921  LogGuid(get_guid()).c_str(),
2922  LogGuid(writer_id).c_str(),
2923  LogGuid(publisher_id).c_str()));
2924  }
2925  SubscriptionInstanceSet localsubs;
2926  {
2927  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2928  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2929  iter != this->instances_.end(); ++iter) {
2930  localsubs.insert(iter->second);
2931  }
2932  }
2934  for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2935  iter != localsubs.end(); iter++) {
2936  (*iter)->rcvd_strategy_->accept_coherent(writer_id, publisher_id);
2937  }
2938 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ accept_sample_processing()

void OpenDDS::DCPS::DataReaderImpl::accept_sample_processing ( const SubscriptionInstance_rch instance,
const DataSampleHeader header,
bool  is_new_instance 
)
protected

Definition at line 3345 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_READ_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::SubscriptionInstance::cur_sample_tv_, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::EntityId_t::entityKind, OpenDDS::DCPS::ENTITYKIND_OPENDDS_NIL_WRITER, OpenDDS::DCPS::DataSampleHeader::group_coherent_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::SubscriptionInstance::last_sample_tv_, LM_WARNING, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, and OpenDDS::DCPS::TimePoint_T< AceClock >::set_to_now().

Referenced by data_received().

3348 {
3349  bool accepted = true;
3350 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3351  bool verify_coherent = false;
3352 #endif
3353  WriterInfo_rch writer;
3354 
3355  if (header.publication_id_.entityId.entityKind != ENTITYKIND_OPENDDS_NIL_WRITER) {
3357 
3358  WriterMapType::iterator where = writers_.find(header.publication_id_);
3359 
3360  if (where != writers_.end()) {
3361  if (header.coherent_change_) {
3362 
3363 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3364  // Received coherent change
3365  where->second->coherent_change(header.group_coherent_, header.publisher_id_);
3366  verify_coherent = true;
3367 #endif
3368  writer = where->second;
3369  }
3370  }
3371  else {
3372  ACE_DEBUG((LM_WARNING,
3373  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::accept_sample_processing - ")
3374  ACE_TEXT("subscription %C failed to find ")
3375  ACE_TEXT("publication data for %C.\n"),
3376  LogGuid(get_guid()).c_str(),
3377  LogGuid(header.publication_id_).c_str()));
3378  }
3379  }
3380 
3381 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
3382  if (verify_coherent) {
3383  accepted = verify_coherent_changes_completion(writer.in());
3384  }
3385 #endif
3386 
3387  if (instance && deadline_queue_enabled_) {
3388  instance->last_sample_tv_ = instance->cur_sample_tv_;
3389  instance->cur_sample_tv_.set_to_now();
3390 
3391  if (is_new_instance) {
3392  schedule_deadline(instance, false);
3393  } else {
3394  process_deadline(instance, MonotonicTimePoint::now(), false);
3395  }
3396  }
3397 
3398  if (accepted) {
3400  }
3401 }
#define ACE_DEBUG(X)
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void process_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now, bool timer_called)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
const octet ENTITYKIND_OPENDDS_NIL_WRITER
Definition: DdsDcpsGuid.idl:53
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
bool verify_coherent_changes_completion(WriterInfo *writer)
ACE_TEXT("TCP_Factory")
void schedule_deadline(SubscriptionInstance_rch instance, bool timer_called)

◆ add_association()

void OpenDDS::DCPS::DataReaderImpl::add_association ( const GUID_t yourId,
const WriterAssociation writer,
bool  active 
)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 237 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), ACE_WRITE_GUARD, OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, DDS::DataWriterQos::durability, DDS::DataReaderQos::durability, OpenDDS::DCPS::Observer::e_ASSOCIATED, OpenDDS::DCPS::EntityImpl::get_deleted(), get_guid(), OpenDDS::DCPS::EntityImpl::get_observer(), OpenDDS::DCPS::GUID_UNKNOWN, has_subscription_id_, is_bit_, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::Observer::on_associated(), OpenDDS::DCPS::AssociationData::participant_discovered_at_, OpenDDS::DCPS::WriterAssociation::participantDiscoveredAt, OpenDDS::DCPS::AssociationData::publication_transport_priority_, qos_, raw_latency_buffer_size_, raw_latency_buffer_type_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, statistics_, statistics_lock_, OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_id_condition_, subscription_id_mutex_, DDS::DataWriterQos::transport_priority, OpenDDS::DCPS::WriterAssociation::transportContext, DDS::TransportPriorityQosPolicy::value, DDS::VOLATILE_DURABILITY_QOS, OpenDDS::DCPS::WriterInfo::waiting_for_end_historic_samples(), OpenDDS::DCPS::WriterAssociation::writerDiscInfo, OpenDDS::DCPS::WriterAssociation::writerId, OpenDDS::DCPS::WriterAssociation::writerQos, writers_, writers_lock_, and OpenDDS::DCPS::WriterAssociation::writerTransInfo.

240 {
241  if (DCPS_debug_level) {
242  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association - ")
243  ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
244  LogGuid(yourId).c_str(),
245  LogGuid(writer.writerId).c_str()));
246  }
247 
248  if (get_deleted()) {
249  if (DCPS_debug_level) {
250  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::add_association")
251  ACE_TEXT(" This is a deleted datareader, ignoring add.\n")));
252  }
253  return;
254  }
255 
256  // We are being called back from the repository before we are done
257  // processing after our call to the repository that caused this call
258  // (from the repository) to be made.
259  {
262  subscription_id_ = yourId;
263  has_subscription_id_ = true;
265  }
266  }
267 
268  // For each writer in the list of writers to associate with, we
269  // create a WriterInfo and a WriterStats object and store them in
270  // our internal maps.
271  //
272  {
273 
275 
276  const GUID_t& writer_id = writer.writerId;
277  WriterInfo_rch info = make_rch<WriterInfo>(rchandle_from<WriterInfoListener>(this), writer_id, writer.writerQos);
278  std::pair<WriterMapType::iterator, bool> bpair = writers_.insert(
279  // This insertion is idempotent.
280  WriterMapType::value_type(
281  writer_id,
282  info));
283 
284  // Schedule timer if necessary
285  // - only need to check reader qos - we know the writer must be >= reader
287  info->waiting_for_end_historic_samples(true);
288  }
289 
290  {
292  statistics_.insert(
293  StatsMapType::value_type(
294  writer_id,
296  }
297 
298  // If this is a durable reader
300  // TODO schedule timer for removing flag from writers
301  }
302 
303  if (DCPS_debug_level > 4) {
304  ACE_DEBUG((LM_DEBUG,
305  "(%P|%t) DataReaderImpl::add_association: "
306  "inserted writer %C.return %d\n",
307  LogGuid(writer_id).c_str(), bpair.second));
308 
309  WriterMapType::iterator iter = writers_.find(writer_id);
310  if (iter != writers_.end()) {
311  // This may not be an error since it could happen that the sample
312  // is delivered to the datareader after the write is dis-associated
313  // with this datareader.
314  ACE_DEBUG((LM_DEBUG,
315  ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
316  ACE_TEXT("reader %C is associated with writer %C.\n"),
317  LogGuid(get_guid()).c_str(),
318  LogGuid(writer_id).c_str()));
319  }
320  }
321  }
322 
323  // Propagate the add_associations processing down into the Transport
324  // layer here. This will establish the transport support and reserve
325  // usage of an existing connection or initiate creation of a new
326  // connection if no suitable connection is available.
327  AssociationData data;
328  data.remote_id_ = writer.writerId;
329  data.remote_data_ = writer.writerTransInfo;
330  data.discovery_locator_ = writer.writerDiscInfo;
331  data.participant_discovered_at_ = writer.participantDiscoveredAt;
332  data.remote_transport_context_ = writer.transportContext;
333  data.publication_transport_priority_ =
334  writer.writerQos.transport_priority.value;
335  data.remote_reliable_ =
336  (writer.writerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
337  data.remote_durable_ =
338  (writer.writerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
339 
340  if (associate(data, active)) {
342  if (observer) {
343  observer->on_associated(this, data.remote_id_);
344  }
345  } else {
346  if (DCPS_debug_level) {
347  ACE_ERROR((LM_ERROR,
348  ACE_TEXT("(%P|%t) DataReaderImpl::add_association: ")
349  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
350  }
351  }
352 }
#define ACE_DEBUG(X)
StatsMapType statistics_
Statistics for this reader, collected for each writer.
#define ACE_ERROR(X)
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Thread_Mutex subscription_id_mutex_
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
DurabilityQosPolicy durability
DurabilityQosPolicyKind kind
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex statistics_lock_
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
bool associate(const AssociationData &peer, bool active)

◆ add_link()

void OpenDDS::DCPS::DataReaderImpl::add_link ( const DataLink_rch link,
const GUID_t peer 
)
protectedvirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 3226 of file DataReaderImpl.cpp.

References ACE_WRITE_GUARD, OpenDDS::DCPS::TransportClient::add_link(), OpenDDS::DCPS::DataLink::impl(), OPENDDS_STRING, OpenDDS::DCPS::TransportImpl::transport_type(), and DDS::VOLATILE_DURABILITY_QOS.

3227 {
3229 
3231 
3232  WriterMapType::iterator it = writers_.find(peer);
3233  if (it != writers_.end()) {
3234  // Schedule timer if necessary
3235  // - only need to check reader qos - we know the writer must be >= reader
3236  end_historic_sweeper_->schedule_timer(it->second);
3237  }
3238  }
3239  TransportClient::add_link(link, peer);
3240  OPENDDS_STRING type;
3241  {
3242  TransportImpl_rch impl = link->impl();
3243  if (impl) {
3244  type = impl->transport_type();
3245  }
3246  }
3247 
3248  if (type == "rtps_udp" || type == "multicast") {
3250  }
3251 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
virtual void add_link(const DataLink_rch &link, const GUID_t &peer)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
#define OPENDDS_STRING
DurabilityQosPolicy durability
DurabilityQosPolicyKind kind
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
void resume_sample_processing(const GUID_t &pub_id)
when done handling historic samples, resume
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
virtual OPENDDS_STRING transport_type() const =0

◆ begin_access()

void OpenDDS::DCPS::DataReaderImpl::begin_access ( )

Definition at line 3068 of file DataReaderImpl.cpp.

References ACE_GUARD.

3069 {
3071  this->coherent_ = true;
3072 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
bool coherent_
Is accessing to Group coherent changes ?

◆ cancel_all_deadlines()

void OpenDDS::DCPS::DataReaderImpl::cancel_all_deadlines ( )
private

Definition at line 3683 of file DataReaderImpl.cpp.

References ACE_GUARD.

Referenced by qos_change().

3684 {
3686  deadline_queue_.clear();
3687  deadline_task_->cancel();
3688 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RcHandle< DRISporadicTask > deadline_task_

◆ cancel_deadline()

void OpenDDS::DCPS::DataReaderImpl::cancel_deadline ( SubscriptionInstance_rch  instance)
private

Definition at line 3597 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::SubscriptionInstance::deadline_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by data_received().

3598 {
3599  // Should be called with sample_lock_.
3600  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3601  for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3602  if (pos->second == instance) {
3603  deadline_queue_.erase(pos);
3604  break;
3605  }
3606  }
3607  instance->deadline_ = MonotonicTimePoint::zero_value;
3608  }
3609 }
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ check_historic()

bool OpenDDS::DCPS::DataReaderImpl::check_historic ( const ReceivedDataSample sample)
private

collect samples received before END_HISTORIC_SAMPLES returns false if normal processing of this sample should be skipped

Definition at line 3195 of file DataReaderImpl.cpp.

References ACE_WRITE_GUARD_RETURN, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::sequence_, and OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by data_received().

3196 {
3198  WriterMapType::iterator iter = writers_.find(sample.header_.publication_id_);
3199  if (iter != writers_.end()) {
3200  const SequenceNumber& seq = sample.header_.sequence_;
3201  SequenceNumber last_historic_seq;
3202  if (iter->second->check_historic(seq, sample, last_historic_seq)) {
3203  return false;
3204  }
3205  if (last_historic_seq != SequenceNumber::SEQUENCENUMBER_UNKNOWN()
3206  && !sample.header_.historic_sample_
3207  && seq <= last_historic_seq) {
3208  // this sample must have been seen before the END_HISTORIC_SAMPLES control msg
3209  return false;
3210  }
3211  }
3212  return true;
3213 }
#define ACE_WRITE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()

◆ check_transport_qos()

bool OpenDDS::DCPS::DataReaderImpl::check_transport_qos ( const TransportInst inst)
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 1684 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportInst::is_reliable(), DDS::ReliabilityQosPolicy::kind, qos_, DDS::DataReaderQos::reliability, and DDS::RELIABLE_RELIABILITY_QOS.

1685 {
1687  return ti.is_reliable();
1688  }
1689  return true;
1690 }
ReliabilityQosPolicyKind kind
ReliabilityQosPolicy reliability

◆ cleanup()

void OpenDDS::DCPS::DataReaderImpl::cleanup ( void  )
virtual

Definition at line 155 of file DataReaderImpl.cpp.

References content_filtered_topic_, content_filtered_topic_mutex_, multi_topic_, OpenDDS::DCPS::NO_STATUS_MASK, ownership_manager(), set_listener(), topic_servant_, and OpenDDS::DCPS::OwnershipManager::unregister_reader().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::cleanup().

156 {
157  // As first step set our listener to nill which will prevent us from calling
158  // back onto the listener at the moment the related DDS entity has been
159  // deleted
161 
162 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
163  OwnershipManagerPtr owner_manager = this->ownership_manager();
164  if (owner_manager) {
165  owner_manager->unregister_reader(topic_servant_->type_name(), this);
166  }
167 #endif
168 
169  topic_servant_ = 0;
170 
171 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
172  {
175  }
176 #endif
177 
178 #ifndef OPENDDS_NO_MULTI_TOPIC
179  multi_topic_ = 0;
180 #endif
181 
182 }
OwnershipManagerPtr ownership_manager()
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
virtual DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
TopicDescriptionPtr< TopicImpl > topic_servant_
ACE_Thread_Mutex content_filtered_topic_mutex_
TopicDescriptionPtr< MultiTopicImpl > multi_topic_
const DDS::StatusMask NO_STATUS_MASK

◆ coherent_change_received() [1/2]

void OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( const GUID_t publisher_id,
Coherent_State result 
)

Definition at line 2987 of file DataReaderImpl.cpp.

References ACE_READ_GUARD, OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::REJECTED, and state.

2988 {
2989  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2990 
2991  result = COMPLETED;
2992  for (WriterMapType::iterator iter = writers_.begin();
2993  iter != writers_.end();
2994  ++iter) {
2995 
2996  if (iter->second->publisher_id() == publisher_id) {
2997  const Coherent_State state = iter->second->coherent_change_received();
2998  if (state == NOT_COMPLETED_YET) {
2999  result = NOT_COMPLETED_YET;
3000  break;
3001  }
3002  else if (state == REJECTED) {
3003  result = REJECTED;
3004  }
3005  }
3006  }
3007 }
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14

◆ coherent_change_received() [2/2]

bool OpenDDS::DCPS::DataReaderImpl::coherent_change_received ( WriterInfo writer)
private

◆ coherent_changes_completed()

void OpenDDS::DCPS::DataReaderImpl::coherent_changes_completed ( DataReaderImpl reader)

Definition at line 3011 of file DataReaderImpl.cpp.

References ACE_GUARD, DDS::DATA_AVAILABLE_STATUS, DDS::DATA_ON_READERS_STATUS, OpenDDS::DCPS::RcHandle< T >::in(), CORBA::is_nil(), OpenDDS::DCPS::rchandle_from(), and TheServiceParticipant.

3012 {
3013  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
3014  if (!subscriber) {
3015  return;
3016  }
3017 
3018  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, true);
3020 
3021  ::DDS::SubscriberListener_var sub_listener =
3022  subscriber->listener_for(::DDS::DATA_ON_READERS_STATUS);
3023  if (!CORBA::is_nil(sub_listener.in()))
3024  {
3025  if (!is_bit()) {
3027  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3028  if (reader == this) {
3029  // Release the sample_lock before listener callback.
3031  sub_listener->on_data_on_readers(subscriber.in());
3032  }
3033  } else {
3034  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(subscriber, sub_listener, rchandle_from(this), reader == this, true));
3035  }
3036  }
3037  else
3038  {
3039  subscriber->notify_status_condition();
3040 
3041  ::DDS::DataReaderListener_var listener =
3043 
3044  if (!CORBA::is_nil(listener.in()))
3045  {
3046  if (!is_bit()) {
3048  subscriber->set_status_changed_flag(::DDS::DATA_ON_READERS_STATUS, false);
3049  if (reader == this) {
3050  // Release the sample_lock before listener callback.
3052  listener->on_data_available(this);
3053  } else {
3054  listener->on_data_available(this);
3055  }
3056  } else {
3057  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(this), reader == this, true, true));
3058  }
3059  }
3060  else
3061  {
3062  this->notify_status_condition();
3063  }
3064  }
3065 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const StatusKind DATA_ON_READERS_STATUS
const StatusKind DATA_AVAILABLE_STATUS
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t
RcHandle< SubscriberImpl > get_subscriber_servant()
#define TheServiceParticipant
Boolean is_nil(T x)

◆ contains_sample()

bool OpenDDS::DCPS::DataReaderImpl::contains_sample ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Fold-in the three separate loops of have_sample_states(), have_view_states(), and have_instance_states(). Takes the sample_lock_.

Definition at line 1743 of file DataReaderImpl.cpp.

References instances_lock_, lookup_matching_instances(), and sample_lock_.

Referenced by OpenDDS::DCPS::ReadConditionImpl::get_trigger_value().

1745 {
1748 
1750 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ contains_sample_filtered()

virtual bool OpenDDS::DCPS::DataReaderImpl::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const FilterEvaluator evaluator,
const DDS::StringSeq params 
)
pure virtual

◆ create_querycondition()

DDS::QueryCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_querycondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const char *  query_expression,
const DDS::StringSeq query_parameters 
)
virtual

Definition at line 771 of file DataReaderImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, QueryConditionImpl, read_conditions_, DDS::RETCODE_OK, and sample_lock_.

777 {
779  try {
780  DDS::QueryCondition_var qc = new QueryConditionImpl(this, sample_states,
781  view_states, instance_states, query_expression);
782  if (qc->set_query_parameters(query_parameters) != DDS::RETCODE_OK) {
783  return 0;
784  }
785  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(qc);
786  read_conditions_.insert(rc);
787  return qc._retn();
788  } catch (const std::exception& e) {
789  if (DCPS_debug_level) {
790  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ")
791  ACE_TEXT("DataReaderImpl::create_querycondition - %C\n"),
792  e.what()));
793  }
794  }
795  return 0;
796 }
#define ACE_ERROR(X)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ create_readcondition()

DDS::ReadCondition_ptr OpenDDS::DCPS::DataReaderImpl::create_readcondition ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
virtual

Definition at line 758 of file DataReaderImpl.cpp.

References ACE_GUARD_RETURN, read_conditions_, and sample_lock_.

762 {
764  DDS::ReadCondition_var rc = new ReadConditionImpl(this, sample_states,
766  read_conditions_.insert(rc);
767  return rc._retn();
768 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ data_received()

void OpenDDS::DCPS::DataReaderImpl::data_received ( const ReceivedDataSample sample)
virtual

process a message that has been received - could be control or a data sample.

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 1371 of file DataReaderImpl.cpp.

References accept_sample_processing(), ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_READ_GUARD, ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::LogGuid::c_str(), cancel_deadline(), check_historic(), OpenDDS::DCPS::ReceivedDataSample::data(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, dds_demarshal(), deadline_queue_enabled_, OpenDDS::DCPS::DISPOSE_INSTANCE, dispose_unregister(), OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, dup(), OpenDDS::DCPS::Observer::e_SAMPLE_RECEIVED, OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::ENDIAN_BIG, OpenDDS::DCPS::ENDIAN_LITTLE, filter_sample(), OpenDDS::DCPS::FULL_MARSHALING, OpenDDS::DCPS::RcHandle< T >::get(), OpenDDS::DCPS::EntityImpl::get_deleted(), get_guid(), OpenDDS::DCPS::EntityImpl::get_observer(), get_subscriber_servant(), get_value_dispatcher(), OpenDDS::DCPS::GUID_UNKNOWN, DDS::HANDLE_NIL, header, OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::instance_state(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, instances_lock_, is_exclusive_ownership_, OpenDDS::DCPS::InstanceState::is_last(), OpenDDS::DCPS::OwnershipManager::is_owner(), OpenDDS::DCPS::DataSampleHeader::key_fields_only_, OpenDDS::DCPS::KEY_ONLY_MARSHALING, OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, lookup_instance(), mb_alloc_, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, notify_read_conditions(), OpenDDS::DCPS::Observer::on_sample_received(), ownership_manager(), process_latency(), publication_handle_lock_, OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_to_handle_map_, OpenDDS::DCPS::TransportClient::repo_id(), OpenDDS::DCPS::RcHandle< T >::reset(), resume_sample_processing(), OpenDDS::DCPS::SAMPLE_DATA, sample_lock_, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::WriterInfo::set_group_info(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, timestamp(), OpenDDS::DCPS::to_string(), OpenDDS::DCPS::UNREGISTER_INSTANCE, verify_coherent_changes_completion(), writer_activity(), writers_, and writers_lock_.

1372 {
1373  DBG_ENTRY_LVL("DataReaderImpl","data_received",6);
1374 
1375  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
1376  {
1378  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(sample.header_.publication_id_);
1379  if (pos != publication_id_to_handle_map_.end()) {
1380  publication_handle = pos->second;
1381  }
1382  }
1383 
1384  // ensure some other thread is not changing the sample container
1385  // or statuses related to samples.
1387 
1388  if (get_deleted()) return;
1389 
1390  if (DCPS_debug_level > 9) {
1391  ACE_DEBUG((LM_DEBUG,
1392  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1393  ACE_TEXT("%C received sample: %C.\n"),
1394  LogGuid(get_guid()).c_str(),
1395  to_string(sample.header_).c_str()));
1396  }
1397 
1398  const ValueDispatcher* vd = get_value_dispatcher();
1400 
1401  RcHandle<MessageHolder> real_data;
1402  SubscriptionInstance_rch instance;
1403  switch (sample.header_.message_id_) {
1404  case SAMPLE_DATA:
1405  case INSTANCE_REGISTRATION: {
1406  if (!check_historic(sample)) break;
1407 
1408  DataSampleHeader const & header = sample.header_;
1409 
1410  this->writer_activity(header);
1411 
1412  // Verify data has not exceeded its lifespan.
1413  if (this->filter_sample(header)) break;
1414 
1415  // This adds the reader to the set/list of readers with data.
1416  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1417  if (subscriber) {
1418  subscriber->data_received(this);
1419  }
1420 
1421  // Only gather statistics about real samples, not registration data, etc.
1422  if (header.message_id_ == SAMPLE_DATA) {
1423  this->process_latency(sample);
1424  }
1425 
1426  // This also adds to the sample container and makes any callbacks
1427  // and condition modifications.
1428 
1429  bool is_new_instance = false;
1430  bool filtered = false;
1431  if (sample.header_.key_fields_only_) {
1432  dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, KEY_ONLY_MARSHALING, false);
1433  } else {
1434  real_data = dds_demarshal(sample, publication_handle, instance, is_new_instance, filtered, FULL_MARSHALING, observer && vd);
1435  }
1436 
1437  // Per sample logging
1438  if (DCPS_debug_level >= 8) {
1439  ACE_DEBUG((LM_DEBUG,
1440  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: reader %C writer %C ")
1441  ACE_TEXT("instance %d is_new_instance %d filtered %d\n"),
1442  LogGuid(get_guid()).c_str(),
1443  LogGuid(header.publication_id_).c_str(),
1444  instance ? instance->instance_handle_ : 0,
1445  is_new_instance, filtered));
1446  }
1447 
1448  if (filtered) break; // sample filtered from instance
1449 
1450  if (instance) accept_sample_processing(instance, header, is_new_instance);
1451  }
1452  break;
1453 
1454 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1455  case END_COHERENT_CHANGES: {
1456  CoherentChangeControl control;
1457 
1458  this->writer_activity(sample.header_);
1459 
1460  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1461  Serializer serializer(
1462  payload.get(), Encoding::KIND_UNALIGNED_CDR,
1463  sample.header_.byte_order_ ? ENDIAN_LITTLE : ENDIAN_BIG);
1464  if (!(serializer >> control)) {
1465  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1466  ACE_TEXT("deserialization coherent change control failed.\n")));
1467  return;
1468  }
1469 
1470  if (DCPS_debug_level > 0) {
1471  std::stringstream buffer;
1472  buffer << control << std::endl;
1473 
1474  ACE_DEBUG((LM_DEBUG,
1475  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1476  ACE_TEXT("END_COHERENT_CHANGES %C\n"),
1477  buffer.str().c_str()));
1478  }
1479 
1480  WriterInfo_rch writer;
1481  {
1482  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1483 
1484  WriterMapType::iterator it =
1485  this->writers_.find(sample.header_.publication_id_);
1486 
1487  if (it == this->writers_.end()) {
1488  ACE_DEBUG((LM_WARNING,
1489  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::data_received() - ")
1490  ACE_TEXT(" subscription %C failed to find ")
1491  ACE_TEXT(" publication data for %C!\n"),
1492  LogGuid(get_guid()).c_str(),
1493  LogGuid(sample.header_.publication_id_).c_str()));
1494  return;
1495  }
1496  else {
1497  writer = it->second;
1498  }
1499  it->second->set_group_info(control);
1500  }
1501 
1502  if (this->verify_coherent_changes_completion(writer.in())) {
1503  this->notify_read_conditions();
1504  }
1505  }
1506  break;
1507 #endif // OPENDDS_NO_OBJECT_MODEL_PROFILE
1508 
1509  case DATAWRITER_LIVELINESS: {
1510  if (DCPS_debug_level >= 4) {
1511  ACE_DEBUG((LM_DEBUG,
1512  ACE_TEXT("(%P|%t) DataReaderImpl::data_received: ")
1513  ACE_TEXT("reader %C got datawriter liveliness from writer %C\n"),
1514  LogGuid(get_guid()).c_str(),
1515  LogGuid(sample.header_.publication_id_).c_str()));
1516  }
1517  this->writer_activity(sample.header_);
1518 
1519  // tell all instances they got a liveliness message
1520  {
1522  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1523  iter != instances_.end();
1524  ++iter) {
1525  if (iter->second->instance_state_->writes_instance(sample.header_.publication_id_)) {
1526  iter->second->instance_state_->lively(sample.header_.publication_id_);
1527  }
1528  }
1529  }
1530 
1531  }
1532  break;
1533 
1534  case DISPOSE_INSTANCE: {
1535  if (!check_historic(sample)) break;
1536  this->writer_activity(sample.header_);
1537  SubscriptionInstance_rch instance;
1538 
1540  // Find the instance first for timer cancellation since
1541  // the instance may be deleted during dispose and can
1542  // not be accessed.
1543  ReceivedDataSample dup(sample);
1544  this->lookup_instance(dup, instance);
1545 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1546  OwnershipManagerPtr owner_manager = this->ownership_manager();
1547 
1548  if (! this->is_exclusive_ownership_
1549  || (owner_manager
1550  && (instance)
1551  && (owner_manager->is_owner(instance->instance_handle_,
1552  sample.header_.publication_id_)))) {
1553 #endif
1554  cancel_deadline(instance);
1555 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1556  }
1557 #endif
1558  }
1559  instance.reset();
1560  this->dispose_unregister(sample, publication_handle, instance);
1561  }
1562  this->notify_read_conditions();
1563  break;
1564 
1565  case UNREGISTER_INSTANCE: {
1566  if (!check_historic(sample)) break;
1567  this->writer_activity(sample.header_);
1568  SubscriptionInstance_rch instance;
1569 
1571  // Find the instance first for timer cancellation since
1572  // the instance may be deleted during dispose and can
1573  // not be accessed.
1574  ReceivedDataSample dup(sample);
1575  this->lookup_instance(dup, instance);
1576  if (instance) {
1577 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1578  if (! this->is_exclusive_ownership_
1579  || (this->is_exclusive_ownership_
1580  && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1581 #endif
1582  cancel_deadline(instance);
1583 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1584  }
1585 #endif
1586  }
1587  }
1588  instance.reset();
1589  this->dispose_unregister(sample, publication_handle, instance);
1590  }
1591  this->notify_read_conditions();
1592  break;
1593 
1595  if (!check_historic(sample)) break;
1596  this->writer_activity(sample.header_);
1597  SubscriptionInstance_rch instance;
1598 
1600  // Find the instance first for timer cancellation since
1601  // the instance may be deleted during dispose and can
1602  // not be accessed.
1603  ReceivedDataSample dup(sample);
1604  this->lookup_instance(dup, instance);
1605 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1606  OwnershipManagerPtr owner_manager = this->ownership_manager();
1607  if (! this->is_exclusive_ownership_
1608  || (owner_manager
1609  && (instance)
1610  && (owner_manager->is_owner (instance->instance_handle_,
1611  sample.header_.publication_id_)))
1613  && (instance)
1614  && instance->instance_state_->is_last(sample.header_.publication_id_))) {
1615 #endif
1616  if (instance) {
1617  cancel_deadline(instance);
1618  }
1619 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1620  }
1621 #endif
1622  }
1623  instance.reset();
1624  this->dispose_unregister(sample, publication_handle, instance);
1625  }
1626  this->notify_read_conditions();
1627  break;
1628 
1629  case END_HISTORIC_SAMPLES: {
1630  if (sample.header_.message_length_ >= sizeof(GUID_t)) {
1631  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1632  Serializer ser(payload.get(), Encoding::KIND_UNALIGNED_CDR);
1633  GUID_t readerId = GUID_UNKNOWN;
1634  if (!(ser >> readerId)) {
1635  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) DataReaderImpl::data_received ")
1636  ACE_TEXT("deserialization reader failed.\n")));
1637  return;
1638  }
1639  const GUID_t repo_id(get_guid());
1640  if (readerId != GUID_UNKNOWN && readerId != repo_id) {
1641  break; // not our message
1642  }
1643  }
1644  if (DCPS_debug_level > 4) {
1645  ACE_DEBUG((LM_INFO, "(%P|%t) Received END_HISTORIC_SAMPLES control message\n"));
1646  }
1647  // Going to acquire writers lock, release samples lock
1648  guard.release();
1649  resume_sample_processing(sample.header_.publication_id_);
1650  if (DCPS_debug_level > 4) {
1651  ACE_DEBUG((
1652  LM_INFO,
1653  "(%P|%t) Resumed sample processing for durable writer %C\n",
1654  LogGuid(sample.header_.publication_id_).c_str()));
1655  }
1656  break;
1657  }
1658 
1659  default:
1660  ACE_ERROR((LM_ERROR,
1661  "(%P|%t) ERROR: DataReaderImpl::data_received"
1662  "unexpected message_id = %d\n",
1663  sample.header_.message_id_));
1664  break;
1665  }
1666 
1667  if (observer && real_data && vd) {
1668  const DDS::Time_t timestamp = {
1669  sample.header_.source_timestamp_sec_,
1670  sample.header_.source_timestamp_nanosec_
1671  };
1672  Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, sample.header_.instance_state(), timestamp, sample.header_.sequence_, real_data->get(), *vd);
1673  observer->on_sample_received(this, s);
1674  }
1675 }
OwnershipManagerPtr ownership_manager()
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
virtual void dispose_unregister(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
RepoIdToHandleMap publication_id_to_handle_map_
const InstanceHandle_t HANDLE_NIL
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
bool filter_sample(const DataSampleHeader &header)
ACE_Recursive_Thread_Mutex instances_lock_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
virtual void lookup_instance(const ReceivedDataSample &sample, SubscriptionInstance_rch &instance)=0
bool check_historic(const ReceivedDataSample &sample)
const char * to_string(MessageId value)
void process_latency(const ReceivedDataSample &sample)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
bool verify_coherent_changes_completion(WriterInfo *writer)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void cancel_deadline(SubscriptionInstance_rch instance)
const ValueDispatcher * get_value_dispatcher() const
virtual RcHandle< MessageHolder > dds_demarshal(const ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, SubscriptionInstance_rch &instance, bool &is_new_instance, bool &filtered, MarshalingType marshaling_type, bool full_copy)=0
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex publication_handle_lock_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
void resume_sample_processing(const GUID_t &pub_id)
when done handling historic samples, resume
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
RcHandle< SubscriberImpl > get_subscriber_servant()
void accept_sample_processing(const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
ACE_HANDLE dup(ACE_HANDLE handle)
TransportMessageBlockAllocator mb_alloc_
void writer_activity(const DataSampleHeader &header)
update liveliness info for this writer.
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)

◆ dds_demarshal()

virtual RcHandle<MessageHolder> OpenDDS::DCPS::DataReaderImpl::dds_demarshal ( const ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
SubscriptionInstance_rch instance,
bool &  is_new_instance,
bool &  filtered,
MarshalingType  marshaling_type,
bool  full_copy 
)
pure virtual

◆ deadline_task()

void OpenDDS::DCPS::DataReaderImpl::deadline_task ( const MonotonicTimePoint now)
private

Definition at line 3739 of file DataReaderImpl.cpp.

References ACE_GUARD, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and TheServiceParticipant.

3740 {
3741  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
3742 
3744  for (DeadlineQueue::iterator pos = deadline_queue_.begin(), limit = deadline_queue_.end(); pos != limit && pos->first <= now;) {
3745  SubscriptionInstance_rch instance = pos->second;
3746  deadline_queue_.erase(pos++);
3747  // pos is no longer valid.
3748  process_deadline(instance, now, true);
3749  }
3750 
3751  if (!deadline_queue_.empty()) {
3752  deadline_task_->schedule(deadline_queue_.begin()->first - now);
3753  }
3754 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RcHandle< DRISporadicTask > deadline_task_
void process_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now, bool timer_called)
#define TheServiceParticipant

◆ delete_contained_entities()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_contained_entities ( )
virtual

Implements DDS::DataReader.

Definition at line 816 of file DataReaderImpl.cpp.

References ACE_GUARD_RETURN, read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, and sample_lock_.

817 {
820  read_conditions_.clear();
821  return DDS::RETCODE_OK;
822 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES

◆ delete_readcondition()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::delete_readcondition ( DDS::ReadCondition_ptr  a_condition)
virtual

Definition at line 806 of file DataReaderImpl.cpp.

References ACE_GUARD_RETURN, read_conditions_, DDS::RETCODE_OK, DDS::RETCODE_OUT_OF_RESOURCES, DDS::RETCODE_PRECONDITION_NOT_MET, and sample_lock_.

808 {
811  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
812  return read_conditions_.erase(rc)
814 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_OUT_OF_RESOURCES

◆ deliver_historic()

void OpenDDS::DCPS::DataReaderImpl::deliver_historic ( OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&  samples)
private

deliver samples that were held by check_historic()

Definition at line 3215 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ReceivedDataSample::header_, OpenDDS::DCPS::DataSampleHeader::historic_sample_, and OpenDDS::DCPS::OPENDDS_MAP().

3216 {
3217  typedef OPENDDS_MAP(SequenceNumber, ReceivedDataSample)::iterator iter_t;
3218  const iter_t end = samples.end();
3219  for (iter_t iter = samples.begin(); iter != end; ++iter) {
3220  iter->second.header_.historic_sample_ = true;
3221  data_received(iter->second);
3222  }
3223 }
virtual void data_received(const ReceivedDataSample &sample)
process a message that has been received - could be control or a data sample.
typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType

◆ disable_transport()

ACE_INLINE void OpenDDS::DCPS::DataReaderImpl::disable_transport ( )

◆ dispose_unregister()

void OpenDDS::DCPS::DataReaderImpl::dispose_unregister ( const ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
SubscriptionInstance_rch instance 
)
virtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >, and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >.

Definition at line 2275 of file DataReaderImpl.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

Referenced by data_received().

2278 {
2279  if (DCPS_debug_level > 0) {
2280  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::dispose_unregister()\n"));
2281  }
2282 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ domain_id()

DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id ( ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 847 of file DataReaderImpl.h.

847 { return this->domain_id_; }

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable ( )
virtual

Implements DDS::Entity.

Definition at line 1115 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_INT32_MAX, ACE_TEXT(), OpenDDS::DCPS::TypeSupportImpl::add_types(), OpenDDS::DCPS::TransportClient::connection_info(), content_filtered_topic_, content_filtered_topic_mutex_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataReaderQos::deadline, deadline_period_, deadline_queue_enabled_, DDS::HistoryQosPolicy::depth, depth_, domain_id_, dp_id_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, dynamic_type_, OpenDDS::DCPS::Observer::e_ENABLED, enable_specific(), OpenDDS::DCPS::TransportClient::enable_transport(), get_guid(), OpenDDS::DCPS::EntityImpl::get_observer(), get_subscriber_servant(), OpenDDS::DCPS::TypeSupportImpl::get_type(), OpenDDS::DCPS::GUID_UNKNOWN, has_subscription_id_, DDS::DataReaderQos::history, TAO::String_var< charT >::in(), install_type_support(), OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, DDS::ReliabilityQosPolicy::kind, DDS::HistoryQosPolicy::kind, DDS::LivelinessQosPolicy::lease_duration, DDS::LENGTH_UNLIMITED, DDS::DataReaderQos::liveliness, OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, LM_DEBUG, LM_ERROR, LM_WARNING, DDS::ResourceLimitsQosPolicy::max_samples, DDS::ResourceLimitsQosPolicy::max_samples_per_instance, monitor_, n_chunks_, name, DDS::Duration_t::nanosec, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), OpenDDS::DCPS::Observer::on_enabled(), participant_servant_, DDS::DeadlineQosPolicy::period, qos_, OpenDDS::DCPS::rchandle_from(), rd_allocator_, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataReaderQos::representation, DDS::DataReaderQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, sample_lock_, DDS::Duration_t::sec, security_config_, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::set_reader_effective_data_rep_qos(), setup_deserialization(), OpenDDS::DCPS::WriterInfoListener::subscription_id_, subscription_id_condition_, subscription_id_mutex_, TheServiceParticipant, OpenDDS::DCPS::TypeSupportImpl::to_type_info(), topic_servant_, transport_disabled_, DDS::DataRepresentationQosPolicy::value, and DDS::VOLATILE_DURABILITY_QOS.

Referenced by OpenDDS::RTPS::RtpsDiscovery::create_bit_dr(), OpenDDS::DCPS::StaticDiscovery::create_bit_dr(), and OpenDDS::DCPS::SubscriberImpl::create_datareader().

1116 {
1117  // According to spec:
1118  // - Calling enable on an already enabled Entity has no effect and returns OK.
1119  // - Calling enable on an Entity whose factory is not enabled will fail
1120  // and return PRECONDITION_NOT_MET.
1121 
1122  if (this->is_enabled()) {
1123  return DDS::RETCODE_OK;
1124  }
1125 
1126  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1127  if (!subscriber) {
1128  return DDS::RETCODE_ERROR;
1129  }
1130 
1131  if (!subscriber->is_enabled()) {
1133  }
1134 
1135  if (topic_servant_ && !topic_servant_->is_enabled()) {
1137  }
1138 
1139  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
1140  if (participant) {
1141  dp_id_ = participant->get_id();
1142  }
1143 
1144  if (topic_servant_) {
1146  if (!topic_servant_->check_data_representation(qos_.representation.value, false)) {
1147  return DDS::RETCODE_ERROR;
1148  }
1149  }
1150 
1152  // The spec says qos_.history.depth is "has no effect"
1153  // when history.kind = KEEP_ALL so use max_samples_per_instance
1155 
1156  } else { // qos_.history.kind == DDS::KEEP_LAST_HISTORY_QOS
1158  }
1159 
1160  if (depth_ == DDS::LENGTH_UNLIMITED) {
1161  // DDS::LENGTH_UNLIMITED is negative so make it a positive
1162  // value that is, for all intents and purposes, unlimited
1163  // and we can use it for comparisons.
1164  // WARNING: The client risks running out of memory in this case.
1166  }
1167 
1170  }
1171 
1172  //else using value from Service_Participant
1173 
1174  // enable the type specific part of this DataReader
1175  this->enable_specific();
1176 
1177  //Note: the QoS used to set n_chunks_ is Changeable=No so
1178  // it is OK that we cannot change the size of our allocators.
1180 
1181  if (DCPS_debug_level >= 2)
1182  ACE_DEBUG((LM_DEBUG,"(%P|%t) DataReaderImpl::enable"
1183  " Cached_Allocator_With_Overflow %x with %d chunks\n",
1184  rd_allocator_.get(), n_chunks_));
1185 
1191  }
1192 
1193  // Setup the requested deadline watchdog if the configured deadline
1194  // period is not the default (infinite).
1195  DDS::Duration_t const deadline_period = this->qos_.deadline.period;
1196 
1198  && (deadline_period.sec != DDS::DURATION_INFINITE_SEC
1199  || deadline_period.nanosec != DDS::DURATION_INFINITE_NSEC)) {
1200  deadline_period_ = TimeDuration(qos_.deadline.period);
1201  deadline_queue_enabled_ = true;
1202  }
1203 
1204  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
1205  disco->pre_reader(this);
1206 
1207  this->set_enabled();
1208 
1210  try {
1212  this->qos_.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
1213  } catch (const Transport::Exception&) {
1214  ACE_ERROR((LM_ERROR,
1215  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::enable, ")
1216  ACE_TEXT("Transport Exception.\n")));
1217  return DDS::RETCODE_ERROR;
1218  }
1219 
1220  const DDS::ReturnCode_t setup_deserialization_result = setup_deserialization();
1221  if (setup_deserialization_result != DDS::RETCODE_OK) {
1222  return setup_deserialization_result;
1223  }
1224 
1225  const TransportLocatorSeq& trans_conf_info = connection_info();
1226 
1227  CORBA::String_var filterClassName = "";
1228  CORBA::String_var filterExpression = "";
1229  DDS::StringSeq exprParams;
1230 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1231  {
1234  filterClassName = content_filtered_topic_->get_filter_class_name();
1235  filterExpression = content_filtered_topic_->get_filter_expression();
1236  content_filtered_topic_->get_expression_parameters(exprParams);
1237  }
1238  }
1239 #endif
1240 
1241  DDS::SubscriberQos sub_qos;
1242  subscriber->get_qos(sub_qos);
1243 
1244  TypeSupportImpl* const typesupport =
1245  dynamic_cast<TypeSupportImpl*>(topic_servant_->get_type_support());
1246  if (!typesupport) {
1247  return DDS::RETCODE_ERROR;
1248  }
1249 
1250  XTypes::TypeInformation type_info;
1251  typesupport->to_type_info(type_info);
1252 
1253  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1254  typesupport->add_types(type_lookup_service);
1255 
1256  install_type_support(typesupport);
1257 
1258  const GUID_t subscription_id =
1259  disco->add_subscription(domain_id_,
1260  dp_id_,
1261  topic_servant_->get_id(),
1262  rchandle_from(this),
1263  qos_,
1264  trans_conf_info,
1265  sub_qos,
1266  filterClassName,
1267  filterExpression,
1268  exprParams,
1269  type_info);
1270 
1271 #if defined(OPENDDS_SECURITY)
1272  {
1274  security_config_ = participant->get_security_config();
1275  dynamic_type_ = typesupport->get_type();
1276  }
1277 #endif
1278 
1279  {
1281  subscription_id_ = subscription_id;
1282  has_subscription_id_ = true;
1284  }
1285 
1286  if (subscription_id == GUID_UNKNOWN) {
1287  if (DCPS_debug_level >= 1) {
1288  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataReaderImpl::enable: "
1289  "add_subscription failed\n"));
1290  }
1291  return DDS::RETCODE_ERROR;
1292  }
1293 
1294  if (DCPS_debug_level >= 2) {
1295  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::enable: "
1296  "got GUID %C, subscribed to topic name \"%C\" type \"%C\"\n",
1297  LogGuid(get_guid()).c_str(),
1298  topic_servant_->topic_name(), topic_servant_->type_name()));
1299  }
1300  }
1301 
1302  DDS::ReturnCode_t return_value = DDS::RETCODE_OK;
1303  if (topic_servant_) {
1304  const CORBA::String_var name = topic_servant_->get_name();
1305  return_value = subscriber->reader_enabled(name.in(), this);
1306 
1307  if (this->monitor_) {
1308  this->monitor_->report();
1309  }
1310  }
1311 
1312  if (return_value == DDS::RETCODE_OK) {
1313  const Observer_rch observer = get_observer(Observer::e_ENABLED);
1314  if (observer) {
1315  observer->on_enabled(this);
1316  }
1317  }
1318 
1319  return return_value;
1320 }
void enable_transport(bool reliable, bool durable)
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
Security::SecurityConfig_rch security_config_
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
ReliabilityQosPolicyKind kind
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
LivelinessQosPolicy liveliness
Cached_Allocator_With_Overflow< ReceivedDataElementMemoryBlock, ACE_Thread_Mutex > ReceivedDataAllocator
sequence< TransportLocator > TransportLocatorSeq
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ResourceLimitsQosPolicy resource_limits
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
const TransportLocatorSeq & connection_info() const
ACE_Thread_Mutex subscription_id_mutex_
bool notify_all()
Unblock all of the threads waiting on this condition.
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
unique_ptr< ReceivedDataAllocator > rd_allocator_
const long LENGTH_UNLIMITED
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DataRepresentationQosPolicy representation
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
DCPS::RcHandle< TypeLookupService > TypeLookupService_rch
ReliabilityQosPolicy reliability
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
HistoryQosPolicyKind kind
TopicDescriptionPtr< TopicImpl > topic_servant_
DataRepresentationIdSeq value
unique_ptr< Monitor > monitor_
Monitor object for this entity.
HistoryQosPolicy history
DDS::ReturnCode_t setup_deserialization()
Setup deserialization options.
DDS::DynamicType_var dynamic_type_
void set_reader_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos)
Definition: DCPS_Utils.cpp:517
ACE_Thread_Mutex content_filtered_topic_mutex_
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
virtual void install_type_support(TypeSupportImpl *)
virtual DDS::ReturnCode_t enable_specific()=0
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
WeakRcHandle< DomainParticipantImpl > participant_servant_
RcHandle< SubscriberImpl > get_subscriber_servant()
DeadlineQosPolicy deadline
const character_type * in(void) const
TimeDuration liveliness_lease_duration_
Definition: WriterInfo.h:52
#define TheServiceParticipant
#define ACE_INT32_MAX

◆ enable_filtering()

void OpenDDS::DCPS::DataReaderImpl::enable_filtering ( ContentFilteredTopicImpl cft)

Definition at line 3124 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::ContentFilteredTopicImpl::add_reader().

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader().

3125 {
3126  cft->add_reader(*this);
3127  {
3130  }
3131 }
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
ACE_Thread_Mutex content_filtered_topic_mutex_

◆ enable_multi_topic()

void OpenDDS::DCPS::DataReaderImpl::enable_multi_topic ( MultiTopicImpl mt)

Definition at line 3143 of file DataReaderImpl.cpp.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::init().

3144 {
3145  multi_topic_ = mt;
3146 }
TopicDescriptionPtr< MultiTopicImpl > multi_topic_

◆ enable_specific()

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::enable_specific ( )
protectedpure virtual

◆ end_access()

void OpenDDS::DCPS::DataReaderImpl::end_access ( )

Definition at line 3075 of file DataReaderImpl.cpp.

References ACE_GUARD.

3076 {
3078  this->coherent_ = false;
3080  this->post_read_or_take();
3081 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
bool coherent_
Is accessing to Group coherent changes ?

◆ filter_sample()

bool OpenDDS::DCPS::DataReaderImpl::filter_sample ( const DataSampleHeader header)
protected

Check if the received data sample or instance should be filtered.

Note
Filtering will only occur if the application configured a finite duration in the Topic's LIFESPAN QoS policy or DataReader's TIME_BASED_FILTER QoS policy.

Definition at line 2583 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataSampleHeader::historic_sample_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), ACE_Time_Value::sec(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, ACE_Time_Value::usec(), OpenDDS::DCPS::TimeDuration::value(), and DDS::VOLATILE_DURABILITY_QOS.

Referenced by data_received().

2584 {
2585  const SystemTimePoint now = SystemTimePoint::now();
2586 
2587  // Expire historic data if QoS indicates VOLATILE.
2588  if (!always_get_history_ && header.historic_sample_
2590  if (DCPS_debug_level >= 8) {
2591  ACE_DEBUG((LM_DEBUG,
2592  ACE_TEXT("(%P|%t) DataReaderImpl::filter_sample: ")
2593  ACE_TEXT("Discarded historic data.\n")));
2594  }
2595 
2596  return true; // Data filtered.
2597  }
2598 
2599  // The LIFESPAN_DURATION_FLAG is set when sample data is sent
2600  // with a non-default LIFESPAN duration value.
2601  if (header.lifespan_duration_) {
2602  // Finite lifespan. Check if data has expired.
2603 
2604  const DDS::Time_t expiration_dds_time = {
2605  header.source_timestamp_sec_ + header.lifespan_duration_sec_,
2606  header.source_timestamp_nanosec_ + header.lifespan_duration_nanosec_
2607  };
2608  const SystemTimePoint expiration_time(expiration_dds_time);
2609 
2610  // We assume that the publisher host's clock and subcriber host's
2611  // clock are synchronized (allowed by the spec).
2612  if (now >= expiration_time) {
2613  if (DCPS_debug_level >= 8) {
2614  const TimeDuration diff(now - expiration_time);
2615  ACE_DEBUG((LM_DEBUG,
2616  ACE_TEXT("(%P|%t) Received data ")
2617  ACE_TEXT("expired by %d seconds, %d microseconds.\n"),
2618  diff.value().sec(),
2619  diff.value().usec()));
2620  }
2621 
2622  return true; // Data filtered.
2623  }
2624  }
2625 
2626  return false;
2627 }
#define ACE_DEBUG(X)
TimePoint_T< SystemClock > SystemTimePoint
Definition: TimeTypes.h:32
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
DurabilityQosPolicy durability
DurabilityQosPolicyKind kind
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ get_builtin_subscriber_proxy()

RcHandle<BitSubscriber> OpenDDS::DCPS::DataReaderImpl::get_builtin_subscriber_proxy ( ) const
inlineprivatevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 837 of file DataReaderImpl.h.

References OpenDDS::DCPS::WeakRcHandle< T >::lock().

838  {
839  RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
840  if (participant_servant) {
841  return participant_servant->get_builtin_subscriber_proxy();
842  }
843 
844  return RcHandle<BitSubscriber>();
845  }
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_cf_topic()

DDS::ContentFilteredTopic_ptr OpenDDS::DCPS::DataReaderImpl::get_cf_topic ( ) const

Definition at line 3134 of file DataReaderImpl.cpp.

3135 {
3137  return DDS::ContentFilteredTopic::_duplicate(content_filtered_topic_.get());
3138 }
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
ACE_Thread_Mutex content_filtered_topic_mutex_

◆ get_crypto_handle()

DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::DataReaderImpl::get_crypto_handle ( ) const
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 3404 of file DataReaderImpl.cpp.

References DDS::HANDLE_NIL.

3405 {
3406  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
3407  return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
3408 }
const InstanceHandle_t HANDLE_NIL
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_depth()

size_t OpenDDS::DCPS::DataReaderImpl::get_depth ( ) const
inline

Definition at line 410 of file DataReaderImpl.h.

411  {
412  return static_cast<size_t>(depth_);
413  }

◆ get_dp_id()

OpenDDS::DCPS::GUID_t OpenDDS::DCPS::DataReaderImpl::get_dp_id ( )

Definition at line 2816 of file DataReaderImpl.cpp.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

2817 {
2818  return dp_id_;
2819 }

◆ get_ext_listener()

DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_ext_listener ( )
protected

Definition at line 926 of file DataReaderImpl.cpp.

References listener_, and listener_mutex_.

927 {
929  return DataReaderListener::_narrow(listener_.in());
930 }
ACE_Thread_Mutex listener_mutex_
DDS::DataReaderListener_var listener_

◆ get_guid()

GUID_t OpenDDS::DCPS::DataReaderImpl::get_guid ( ) const
inlinevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 628 of file DataReaderImpl.h.

References TheServiceParticipant.

Referenced by add_association(), data_received(), enable(), OpenDDS::Federator::ManagerImpl::initialize(), remove_associations(), remove_associations_i(), OpenDDS::DCPS::DRPeriodicMonitorImpl::report(), OpenDDS::DCPS::DRMonitorImpl::report(), transport_assoc_done(), and writer_activity().

629  {
631  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
632  while (!has_subscription_id_ && !get_deleted()) {
633  subscription_id_condition_.wait(thread_status_manager);
634  }
635  return subscription_id_;
636  }
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
ACE_Thread_Mutex subscription_id_mutex_
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
#define TheServiceParticipant

◆ get_handle_instance()

SubscriptionInstance_rch OpenDDS::DCPS::DataReaderImpl::get_handle_instance ( DDS::InstanceHandle_t  handle)
protected

Definition at line 2424 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), and LM_WARNING.

Referenced by release_instance().

2425 {
2427 
2428  SubscriptionInstanceMapType::iterator iter = instances_.find(handle);
2429  if (iter == instances_.end()) {
2430  ACE_DEBUG((LM_WARNING,
2431  ACE_TEXT("(%P|%t) WARNING: ")
2432  ACE_TEXT("DataReaderImpl::get_handle_instance: ")
2433  ACE_TEXT("lookup for 0x%x failed\n"),
2434  handle));
2435  return SubscriptionInstance_rch();
2436  } // if (0 != instances_.find(handle, instance))
2437 
2438  return iter->second;
2439 }
#define ACE_DEBUG(X)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex instances_lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
ACE_TEXT("TCP_Factory")

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::DataReaderImpl::get_ice_endpoint ( )
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3286 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::get_ice_endpoint().

3287 {
3289 }
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 230 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::EntityImpl::get_entity_instance_handle(), participant_servant_, and OpenDDS::DCPS::WriterInfoListener::subscription_id_.

231 {
232  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
233  return get_entity_instance_handle(subscription_id_, participant);
234 }
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_instance_handles()

void OpenDDS::DCPS::DataReaderImpl::get_instance_handles ( InstanceHandleVec &  instance_handles)

Definition at line 2822 of file DataReaderImpl.cpp.

References ACE_GUARD.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

2823 {
2825  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2826 
2827  for (SubscriptionInstanceMapType::iterator iter = instances_.begin(),
2828  end = instances_.end(); iter != end; ++iter) {
2829  instance_handles.push_back(iter->first);
2830  }
2831 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.

◆ get_latency_stats()

void OpenDDS::DCPS::DataReaderImpl::get_latency_stats ( LatencyStatisticsSeq stats)
virtual

Definition at line 2369 of file DataReaderImpl.cpp.

2371 {
2373  stats.length(static_cast<CORBA::ULong>(this->statistics_.size()));
2374  int index = 0;
2375 
2376  for (StatsMapType::const_iterator current = this->statistics_.begin();
2377  current != this->statistics_.end();
2378  ++current, ++index) {
2379  stats[ index] = current->second.get_stats();
2380  stats[ index].publication = current->first;
2381  }
2382 }
StatsMapType statistics_
Statistics for this reader, collected for each writer.
ACE_Recursive_Thread_Mutex statistics_lock_

◆ get_listener()

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::get_listener ( )
virtual

Implements DDS::DataReader.

Definition at line 920 of file DataReaderImpl.cpp.

References listener_, and listener_mutex_.

921 {
923  return DDS::DataReaderListener::_duplicate(listener_.in());
924 }
ACE_Thread_Mutex listener_mutex_
DDS::DataReaderListener_var listener_

◆ get_liveliness_changed_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_liveliness_changed_status ( DDS::LivelinessChangedStatus status)
virtual

Definition at line 963 of file DataReaderImpl.cpp.

References DDS::LivelinessChangedStatus::alive_count_change, DDS::LIVELINESS_CHANGED_STATUS, liveliness_changed_status_, DDS::LivelinessChangedStatus::not_alive_count_change, DDS::RETCODE_OK, sample_lock_, and OpenDDS::DCPS::EntityImpl::set_status_changed_flag().

965 {
967 
969  false);
971 
974 
975  return DDS::RETCODE_OK;
976 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::LivelinessChangedStatus liveliness_changed_status_
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind LIVELINESS_CHANGED_STATUS

◆ get_matched_publication_data()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publication_data ( DDS::PublicationBuiltinTopicData publication_data,
DDS::InstanceHandle_t  publication_handle 
)
virtual

Definition at line 1082 of file DataReaderImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_PUBLICATION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

1085 {
1086  if (!enabled_) {
1087  ACE_ERROR_RETURN((LM_ERROR,
1088  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::")
1089  ACE_TEXT("get_matched_publication_data: ")
1090  ACE_TEXT("Entity is not enabled.\n")),
1092  }
1093 
1094  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1095 
1096  if (!participant)
1097  return DDS::RETCODE_ERROR;
1098 
1099  DDS::PublicationBuiltinTopicDataSeq data;
1100  const DDS::ReturnCode_t ret = instance_handle_to_bit_data<DDS::PublicationBuiltinTopicDataDataReader_var>(
1101  participant.in(),
1103  publication_handle,
1104  data);
1105 
1106  if (ret == DDS::RETCODE_OK) {
1107  publication_data = data[0];
1108  }
1109 
1110  return ret;
1111 }
const ReturnCode_t RETCODE_OK
const char *const BUILT_IN_PUBLICATION_TOPIC
const ReturnCode_t RETCODE_ERROR
ACE_TEXT("TCP_Factory")
WeakRcHandle< DomainParticipantImpl > participant_servant_
#define ACE_ERROR_RETURN(X, Y)
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ get_matched_publications()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_matched_publications ( DDS::InstanceHandleSeq publication_handles)
virtual

Definition at line 1051 of file DataReaderImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, publication_handle_lock_, publication_id_to_handle_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

1053 {
1054  if (!enabled_) {
1055  ACE_ERROR_RETURN((LM_ERROR,
1056  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::get_matched_publications: ")
1057  ACE_TEXT(" Entity is not enabled.\n")),
1059  }
1060 
1062  guard,
1065 
1066  // Copy out the handles for the current set of publications.
1067  int index = 0;
1068  publication_handles.length(static_cast<CORBA::ULong>(this->publication_id_to_handle_map_.size()));
1069 
1070  for (RepoIdToHandleMap::iterator
1071  current = this->publication_id_to_handle_map_.begin();
1072  current != this->publication_id_to_handle_map_.end();
1073  ++current, ++index) {
1074  publication_handles[index] = current->second;
1075  }
1076 
1077  return DDS::RETCODE_OK;
1078 }
const ReturnCode_t RETCODE_OK
RepoIdToHandleMap publication_id_to_handle_map_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex publication_handle_lock_
#define ACE_ERROR_RETURN(X, Y)
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ get_n_chunks()

size_t OpenDDS::DCPS::DataReaderImpl::get_n_chunks ( ) const
inline

Definition at line 414 of file DataReaderImpl.h.

References OpenDDS::DCPS::OPENDDS_VECTOR().

415  {
416  return n_chunks_;
417  }

◆ get_next_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::get_next_handle ( const DDS::BuiltinTopicKey_t key)
protected

Get an instance handle for a new instance.

Definition at line 2442 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::bit_key_to_guid(), and DDS::HANDLE_NIL.

2443 {
2444  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2445  if (!participant)
2446  return DDS::HANDLE_NIL;
2447 
2448  if (is_bit()) {
2449  const GUID_t id = bit_key_to_guid(key);
2450  return participant->assign_handle(id);
2451 
2452  } else {
2453  return participant->assign_handle();
2454  }
2455 }
const InstanceHandle_t HANDLE_NIL
OpenDDS_Dcps_Export GUID_t bit_key_to_guid(const DDS::BuiltinTopicKey_t &key)
Definition: GuidUtils.h:251
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_ordered_data()

void OpenDDS::DCPS::DataReaderImpl::get_ordered_data ( GroupRakeData data,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)

Definition at line 3084 of file DataReaderImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::ReceivedDataElementList::get_next_match(), OpenDDS::DCPS::GroupRakeData::insert_sample(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::InstanceState::match(), and OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_.

3088 {
3089  SubscriptionInstanceSet localsubs;
3090  {
3092  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
3093  iter != instances_.end(); ++iter) {
3094  localsubs.insert(iter->second);
3095  }
3096  }
3097 
3099 
3100  for (SubscriptionInstanceSet::iterator iter = localsubs.begin(); iter != localsubs.end(); ++iter) {
3101  const SubscriptionInstance_rch inst = *iter;
3102  if (inst->instance_state_->match(view_states, instance_states)) {
3103  size_t i(0);
3104  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
3105  item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
3106  data.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3107  group_coherent_ordered_data_.insert_sample(item, &inst->rcvd_samples_, *iter, ++i);
3108  }
3109  }
3110  }
3111 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
bool insert_sample(ReceivedDataElement *sample, ReceivedDataElementList *rdel, SubscriptionInstance_rch i, size_t index_in_instance)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ get_priority_value()

Priority OpenDDS::DCPS::DataReaderImpl::get_priority_value ( const AssociationData data) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 849 of file DataReaderImpl.h.

References OpenDDS::DCPS::OPENDDS_MAP(), and OpenDDS::DCPS::AssociationData::publication_transport_priority_.

849  {
850  return data.publication_transport_priority_;
851  }

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_qos ( DDS::DataReaderQos qos)
virtual

Definition at line 902 of file DataReaderImpl.cpp.

References passed_qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader().

904 {
905  qos = passed_qos_;
906  return DDS::RETCODE_OK;
907 }
const ReturnCode_t RETCODE_OK
DDS::DataReaderQos passed_qos_

◆ get_reactor()

ACE_Reactor_Timer_Interface * OpenDDS::DCPS::DataReaderImpl::get_reactor ( void  )

Definition at line 2804 of file DataReaderImpl.cpp.

2805 {
2806  return this->reactor_;
2807 }
ACE_Reactor_Timer_Interface * reactor_

◆ get_requested_deadline_missed_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_deadline_missed_status ( DDS::RequestedDeadlineMissedStatus status)
virtual

Definition at line 979 of file DataReaderImpl.cpp.

References last_deadline_missed_total_count_, DDS::REQUESTED_DEADLINE_MISSED_STATUS, requested_deadline_missed_status_, DDS::RETCODE_OK, sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::RequestedDeadlineMissedStatus::total_count, and DDS::RequestedDeadlineMissedStatus::total_count_change.

981 {
983 
985  false);
986 
990 
991  // DDS::RequestedDeadlineMissedStatus::last_instance_handle field
992  // is updated by the RequestedDeadlineWatchdog.
993 
994  // Update for next status check.
997 
999 
1000  return DDS::RETCODE_OK;
1001 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
CORBA::Long last_deadline_missed_total_count_
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind REQUESTED_DEADLINE_MISSED_STATUS

◆ get_requested_incompatible_qos_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_requested_incompatible_qos_status ( DDS::RequestedIncompatibleQosStatus status)
virtual

◆ get_sample_lost_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_lost_status ( DDS::SampleLostStatus status)
virtual

Definition at line 1031 of file DataReaderImpl.cpp.

References DDS::RETCODE_OK, sample_lock_, DDS::SAMPLE_LOST_STATUS, sample_lost_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleLostStatus::total_count_change.

1033 {
1035 
1037  status = sample_lost_status_;
1039  return DDS::RETCODE_OK;
1040 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
DDS::SampleLostStatus sample_lost_status_
const StatusKind SAMPLE_LOST_STATUS

◆ get_sample_rejected_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_sample_rejected_status ( DDS::SampleRejectedStatus status)
virtual

Definition at line 951 of file DataReaderImpl.cpp.

References DDS::RETCODE_OK, sample_lock_, DDS::SAMPLE_REJECTED_STATUS, sample_rejected_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::SampleRejectedStatus::total_count_change.

953 {
955 
957  status = sample_rejected_status_;
959  return DDS::RETCODE_OK;
960 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
DDS::SampleRejectedStatus sample_rejected_status_
const StatusKind SAMPLE_REJECTED_STATUS

◆ get_subscriber()

DDS::Subscriber_ptr OpenDDS::DCPS::DataReaderImpl::get_subscriber ( )
virtual

Implements DDS::DataReader.

Definition at line 945 of file DataReaderImpl.cpp.

References get_subscriber_servant().

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_reader(), and OpenDDS::DCPS::DRMonitorImpl::report().

946 {
947  return get_subscriber_servant()._retn();
948 }
RcHandle< SubscriberImpl > get_subscriber_servant()

◆ get_subscriber_servant()

RcHandle< SubscriberImpl > OpenDDS::DCPS::DataReaderImpl::get_subscriber_servant ( )
protected

Definition at line 1712 of file DataReaderImpl.cpp.

References subscriber_servant_.

Referenced by data_received(), enable(), get_subscriber(), listener_for(), and set_qos().

1713 {
1714  return subscriber_servant_.lock();
1715 }
WeakRcHandle< SubscriberImpl > subscriber_servant_

◆ get_subscription_matched_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::get_subscription_matched_status ( DDS::SubscriptionMatchedStatus status)
virtual

Definition at line 1017 of file DataReaderImpl.cpp.

References DDS::SubscriptionMatchedStatus::current_count_change, publication_handle_lock_, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, and DDS::SubscriptionMatchedStatus::total_count_change.

1019 {
1021 
1023  status = subscription_match_status_;
1026 
1027  return DDS::RETCODE_OK;
1028 }
const ReturnCode_t RETCODE_OK
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind SUBSCRIPTION_MATCHED_STATUS
ACE_Recursive_Thread_Mutex publication_handle_lock_
DDS::SubscriptionMatchedStatus subscription_match_status_

◆ get_topic_id()

OpenDDS::DCPS::GUID_t OpenDDS::DCPS::DataReaderImpl::get_topic_id ( )

Definition at line 2810 of file DataReaderImpl.cpp.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

2811 {
2812  return topic_id_;
2813 }

◆ get_topicdescription()

DDS::TopicDescription_ptr OpenDDS::DCPS::DataReaderImpl::get_topicdescription ( )
virtual

Implements DDS::DataReader.

Definition at line 932 of file DataReaderImpl.cpp.

References content_filtered_topic_, content_filtered_topic_mutex_, and topic_desc_.

Referenced by OpenDDS::DCPS::QueryConditionImpl::get_type_support(), and OpenDDS::DCPS::MultiTopicDataReader_T< Sample, TypedDataReader >::join().

933 {
934 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
935  {
938  return DDS::TopicDescription::_duplicate(content_filtered_topic_.get());
939  }
940  }
941 #endif
942  return DDS::TopicDescription::_duplicate(topic_desc_.in());
943 }
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
ACE_Thread_Mutex content_filtered_topic_mutex_
DDS::TopicDescription_var topic_desc_

◆ get_value_dispatcher()

const ValueDispatcher* OpenDDS::DCPS::DataReaderImpl::get_value_dispatcher ( ) const
inline

Definition at line 640 of file DataReaderImpl.h.

Referenced by data_received().

641  {
642  TopicDescriptionPtr<TopicImpl> temp(topic_servant_);
643  return temp ? dynamic_cast<const ValueDispatcher*>(temp->get_type_support()) : 0;
644  }
TopicDescriptionPtr< TopicImpl > topic_servant_

◆ get_writer_states()

void OpenDDS::DCPS::DataReaderImpl::get_writer_states ( WriterStatePairVec &  writer_states)

Definition at line 2834 of file DataReaderImpl.cpp.

References ACE_READ_GUARD.

Referenced by OpenDDS::DCPS::DRMonitorImpl::report().

2835 {
2837  read_guard,
2838  this->writers_lock_);
2839  for (WriterMapType::iterator iter = writers_.begin();
2840  iter != writers_.end();
2841  ++iter) {
2842  writer_states.push_back(WriterStatePair(iter->first,
2843  iter->second->state()));
2844  }
2845 }
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
std::pair< GUID_t, WriterInfo::WriterState > WriterStatePair

◆ has_readcondition()

bool OpenDDS::DCPS::DataReaderImpl::has_readcondition ( DDS::ReadCondition_ptr  a_condition)
protected

Definition at line 799 of file DataReaderImpl.cpp.

References read_conditions_.

800 {
801  //sample lock already held
802  DDS::ReadCondition_var rc = DDS::ReadCondition::_duplicate(a_condition);
803  return read_conditions_.find(rc) != read_conditions_.end();
804 }

◆ has_zero_copies()

bool OpenDDS::DCPS::DataReaderImpl::has_zero_copies ( )

This method is used for a precondition check of delete_datareader.

Return values
trueWe have zero-copy samples loaned out
falseWe have no zero-copy samples loaned out

Definition at line 2726 of file DataReaderImpl.cpp.

References ACE_GUARD_RETURN, OpenDDS::DCPS::ReceivedDataElementList::has_zero_copies(), and OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_.

2727 {
2729  guard,
2730  this->sample_lock_,
2731  true /* assume we have loans */);
2732  ACE_GUARD_RETURN(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_, true);
2733 
2734  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2735  iter != instances_.end();
2736  ++iter) {
2737  SubscriptionInstance_rch ptr = iter->second;
2738 
2739  if (ptr->rcvd_samples_.has_zero_copies()) {
2740  return true;
2741  }
2742  }
2743 
2744  return false;
2745 }
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.

◆ have_instance_states()

bool OpenDDS::DCPS::DataReaderImpl::have_instance_states ( DDS::InstanceStateMask  instance_states) const

!!caller should have acquired sample_lock_

Definition at line 1733 of file DataReaderImpl.cpp.

References DDS::ANY_SAMPLE_STATE, DDS::ANY_VIEW_STATE, instances_lock_, and lookup_matching_instances().

1735 {
1736  //!!!caller should have acquired sample_lock_
1739 }
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
const SampleStateMask ANY_SAMPLE_STATE
const ViewStateMask ANY_VIEW_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ have_sample_states()

bool OpenDDS::DCPS::DataReaderImpl::have_sample_states ( DDS::SampleStateMask  sample_states) const

!!caller should have acquired sample_lock_

Definition at line 1717 of file DataReaderImpl.cpp.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_VIEW_STATE, instances_lock_, and lookup_matching_instances().

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::have_sample_states().

1719 {
1720  //!!!caller should have acquired sample_lock_
1723 }
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
const InstanceStateMask ANY_INSTANCE_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
const ViewStateMask ANY_VIEW_STATE

◆ have_view_states()

bool OpenDDS::DCPS::DataReaderImpl::have_view_states ( DDS::ViewStateMask  view_states) const

!!caller should have acquired sample_lock_

Definition at line 1726 of file DataReaderImpl.cpp.

References DDS::ANY_INSTANCE_STATE, DDS::ANY_SAMPLE_STATE, instances_lock_, and lookup_matching_instances().

1727 {
1728  //!!!caller should have acquired sample_lock_
1731 }
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
const InstanceStateMask ANY_INSTANCE_STATE
const SampleStateMask ANY_SAMPLE_STATE
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66

◆ init()

void OpenDDS::DCPS::DataReaderImpl::init ( TopicDescriptionImpl a_topic_desc,
const DDS::DataReaderQos qos,
DDS::DataReaderListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant,
SubscriberImpl subscriber 
)

Definition at line 184 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), domain_id_, DDS::EXCLUSIVE_OWNERSHIP_QOS, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::SubscriberImpl::get_qos(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_name(), is_bit_, is_exclusive_ownership_, DDS::OwnershipQosPolicy::kind, LM_WARNING, DDS::DataReaderQos::ownership, participant_servant_, passed_qos_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_OK, set_listener(), subscriber_servant_, topic_desc_, topic_id_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_support_.

Referenced by OpenDDS::RTPS::RtpsDiscovery::create_bit_dr(), OpenDDS::DCPS::StaticDiscovery::create_bit_dr(), OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

191 {
192  topic_desc_ = DDS::TopicDescription::_duplicate(topic_desc);
193  if (TopicImpl* topic = dynamic_cast<TopicImpl*>(topic_desc)) {
194  topic_servant_ = topic;
195  type_support_ = dynamic_cast<TypeSupportImpl*>(topic->get_type_support());
196  topic_id_ = topic->get_id();
197  }
198 
199 #ifndef DDS_HAS_MINIMUM_BIT
200  CORBA::String_var topic_name = topic_desc->get_name();
201  CORBA::String_var topic_type_name = topic_desc->get_type_name();
202  is_bit_ = topicIsBIT(topic_name, topic_type_name);
203 #endif // !defined (DDS_HAS_MINIMUM_BIT)
204 
205  qos_ = qos;
206  passed_qos_ = qos;
207 
208 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
210 #endif
211 
212  set_listener(listener, mask);
213 
214  // Only store the participant pointer, since it is our "grand"
215  // parent, we will exist as long as it does
216  participant_servant_ = *participant;
217 
218  domain_id_ = participant->get_domain_id();
219 
220  subscriber_servant_ = rchandle_from(subscriber);
221 
222  if (subscriber->get_qos(this->subqos_) != ::DDS::RETCODE_OK) {
223  ACE_DEBUG((LM_WARNING,
224  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::init() - ")
225  ACE_TEXT("failed to get SubscriberQos\n")));
226  }
227 }
#define ACE_DEBUG(X)
const ReturnCode_t RETCODE_OK
DDS::DataReaderQos passed_qos_
virtual DDS::ReturnCode_t set_listener(DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
OwnershipQosPolicy ownership
TopicDescriptionPtr< TopicImpl > topic_servant_
bool topicIsBIT(const char *name, const char *type)
TypeSupportImpl * type_support_
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::TopicDescription_var topic_desc_
WeakRcHandle< DomainParticipantImpl > participant_servant_
WeakRcHandle< SubscriberImpl > subscriber_servant_
OwnershipQosPolicyKind kind

◆ initialize_lookup_maps()

void OpenDDS::DCPS::DataReaderImpl::initialize_lookup_maps ( )
protected

Definition at line 3533 of file DataReaderImpl.cpp.

3534 {
3535  // These all start at 1 (0 mask is bogus) and include the full mask (any)
3536  for (CORBA::ULong is = 1; is <= MAX_SAMPLE_STATE_MASK; ++is) {
3537  for (CORBA::ULong iv = 1; iv <= MAX_VIEW_STATE_MASK; ++iv) {
3538  for (CORBA::ULong ii = 1; ii <= MAX_INSTANCE_STATE_MASK; ++ii) {
3539  combined_state_lookup_[to_combined_states(is, iv, ii)] = HandleSet();
3540  }
3541  }
3542  }
3543  // catch-all for "bogus" lookups
3544  combined_state_lookup_[0] = HandleSet();
3545 }
static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
ACE_CDR::ULong ULong
static const CORBA::ULong MAX_VIEW_STATE_MASK
static const CORBA::ULong MAX_INSTANCE_STATE_MASK
static const CORBA::ULong MAX_SAMPLE_STATE_MASK

◆ install_type_support()

virtual void OpenDDS::DCPS::DataReaderImpl::install_type_support ( TypeSupportImpl )
inlineprivatevirtual

Reimplemented in OpenDDS::XTypes::DynamicDataReaderImpl.

Definition at line 815 of file DataReaderImpl.h.

References timestamp().

Referenced by enable().

815 {}

◆ instances_liveliness_update()

void OpenDDS::DCPS::DataReaderImpl::instances_liveliness_update ( const GUID_t writer,
DDS::InstanceHandle_t  publication_handle 
)
private

Definition at line 2235 of file DataReaderImpl.cpp.

References ACE_GUARD, DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE, and OpenDDS::DCPS::TimePoint_T< SystemClock >::now().

2237 {
2238  // sample_lock_ must be held.
2239  InstanceSet localinsts;
2240  {
2242  if (instances_.size() == 0) {
2243  return;
2244  }
2245  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
2246  iter != instances_.end(); ++iter) {
2247  if (iter->second->instance_state_->writes_instance(writer)) {
2248  localinsts.insert(iter->first);
2249  }
2250  }
2251  }
2252 
2253  for (InstanceSet::iterator iter = localinsts.begin(); iter != localinsts.end(); ++iter) {
2255  }
2256 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
ACE_Recursive_Thread_Mutex instances_lock_
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
virtual void set_instance_state_i(DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &guid)=0
const InstanceStateKind NOT_ALIVE_NO_WRITERS_INSTANCE_STATE

◆ is_bit()

bool OpenDDS::DCPS::DataReaderImpl::is_bit ( ) const

Definition at line 2720 of file DataReaderImpl.cpp.

2721 {
2722  return this->is_bit_;
2723 }

◆ listener_for()

DDS::DataReaderListener_ptr OpenDDS::DCPS::DataReaderImpl::listener_for ( DDS::StatusKind  kind)

This is used to retrieve the listener for a certain status change. If this datareader has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/subscriber.

Definition at line 1753 of file DataReaderImpl.cpp.

References get_subscriber_servant(), CORBA::is_nil(), listener_, listener_mask_, listener_mutex_, and ACE_Guard< ACE_LOCK >::release().

Referenced by remove_associations_i(), transport_assoc_done(), and update_incompatible_qos().

1754 {
1755  // per 2.1.4.3.1 Listener Access to Plain Communication Status
1756  // use this entities factory if listener is mask not enabled
1757  // for this kind.
1758  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
1760  if (subscriber && (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0)) {
1761  g.release();
1762  return subscriber->listener_for(kind);
1763 
1764  } else {
1765  return DDS::DataReaderListener::_duplicate(listener_.in());
1766  }
1767 }
ACE_Thread_Mutex listener_mutex_
DDS::DataReaderListener_var listener_
RcHandle< SubscriberImpl > get_subscriber_servant()
Boolean is_nil(T x)

◆ liveliness_lost()

void OpenDDS::DCPS::DataReaderImpl::liveliness_lost ( )

◆ lookup_instance()

virtual void OpenDDS::DCPS::DataReaderImpl::lookup_instance ( const ReceivedDataSample sample,
SubscriptionInstance_rch instance 
)
pure virtual

◆ lookup_instance_generic()

virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl::lookup_instance_generic ( const void *  data)
pure virtual

◆ lookup_instance_handles()

void OpenDDS::DCPS::DataReaderImpl::lookup_instance_handles ( const WriterIdSeq ids,
DDS::InstanceHandleSeq hdls 
)
private

Lookup the instance handles by the publication repo ids.

Definition at line 2551 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::conv_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and OPENDDS_STRING.

Referenced by remove_associations_i().

2553 {
2554  CORBA::ULong const num_wrts = ids.length();
2555 
2556  if (DCPS_debug_level > 9) {
2557  const char* separator = "";
2558  OPENDDS_STRING guids;
2559 
2560  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2561  guids += separator;
2562  guids += LogGuid(ids[i]).conv_;
2563  separator = ", ";
2564  }
2565 
2566  ACE_DEBUG((LM_DEBUG,
2567  ACE_TEXT("(%P|%t) DataReaderImpl::lookup_instance_handles: ")
2568  ACE_TEXT("searching for handles for writer Ids: %C.\n"),
2569  guids.c_str()));
2570  }
2571 
2572  hdls.length(num_wrts);
2573 
2574  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2575  if (participant) {
2576  for (CORBA::ULong i = 0; i < num_wrts; ++i) {
2577  hdls[i] = participant->lookup_handle(ids[i]);
2578  }
2579  }
2580 }
#define ACE_DEBUG(X)
#define OPENDDS_STRING
ACE_CDR::ULong ULong
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ lookup_matching_instances()

const DataReaderImpl::HandleSet & OpenDDS::DCPS::DataReaderImpl::lookup_matching_instances ( CORBA::ULong  sample_states,
CORBA::ULong  view_states,
CORBA::ULong  instance_states 
) const
protected

Definition at line 3569 of file DataReaderImpl.cpp.

References OPENDDS_ASSERT.

Referenced by contains_sample(), have_instance_states(), have_sample_states(), and have_view_states().

3570 {
3572  LookupMap::const_iterator ci = combined_state_lookup_.find(combined_states);
3574  return ci->second;
3575 }
static CORBA::ULong to_combined_states(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ACE_CDR::ULong ULong
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ notify_latency()

void OpenDDS::DCPS::DataReaderImpl::notify_latency ( GUID_t  writer)

Definition at line 2337 of file DataReaderImpl.cpp.

References CORBA::is_nil().

2338 {
2339  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2340  // is given to this DataReader then narrow() fails.
2341  DataReaderListener_var listener = get_ext_listener();
2342 
2343  if (!CORBA::is_nil(listener.in())) {
2344  WriterIdSeq writerIds;
2345  writerIds.length(1);
2346  writerIds[ 0] = writer;
2347 
2348  DDS::InstanceHandleSeq handles;
2349  this->lookup_instance_handles(writerIds, handles);
2350 
2351  if (handles.length() >= 1) {
2352  this->budget_exceeded_status_.last_instance_handle = handles[ 0];
2353 
2354  } else {
2356  }
2357 
2360 
2361  listener->on_budget_exceeded(this, this->budget_exceeded_status_);
2362 
2364  }
2365 }
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
BudgetExceededStatus budget_exceeded_status_
DataReaderListener_ptr get_ext_listener()
sequence< GUID_t > WriterIdSeq
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
Boolean is_nil(T x)

◆ notify_liveliness_change()

void OpenDDS::DCPS::DataReaderImpl::notify_liveliness_change ( )

Definition at line 2747 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DDS::LivelinessChangedStatus::alive_count_change, OpenDDS::DCPS::LogGuid::conv_, OpenDDS::DCPS::DCPS_debug_level, CORBA::is_nil(), DDS::LIVELINESS_CHANGED_STATUS, LM_DEBUG, OPENDDS_STRING, and OpenDDS::DCPS::to_dds_string().

2748 {
2749  // sample_lock_ must be held.
2750  // N.B. writers_lock_ should already be acquired when
2751  // this method is called.
2752 
2753  DDS::DataReaderListener_var listener
2755 
2756  if (!CORBA::is_nil(listener.in())) {
2761  listener->on_liveliness_changed(this, status);
2762  }
2764 
2765  if (DCPS_debug_level > 9) {
2767  OPENDDS_STRING output_str;
2768  output_str += "subscription ";
2769  output_str += LogGuid(get_guid()).conv_;
2770  output_str += ", listener at: 0x";
2771  output_str += to_dds_string(this->listener_.in());
2772 
2773  for (WriterMapType::iterator current = this->writers_.begin();
2774  current != this->writers_.end();
2775  ++current) {
2776  const GUID_t id = current->first;
2777  output_str += "\n\tNOTIFY: writer[ ";
2778  output_str += LogGuid(id).conv_;
2779  output_str += "] == ";
2780  output_str += current->second->get_state_str();
2781  }
2782 
2783  ACE_DEBUG((LM_DEBUG,
2784  ACE_TEXT("(%P|%t) DataReaderImpl::notify_liveliness_change: ")
2785  ACE_TEXT("listener at 0x%x, mask 0x%x.\n")
2786  ACE_TEXT("\tNOTIFY: %C\n"),
2787  listener.in(),
2789  output_str.c_str()));
2790  }
2791 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Thread_Mutex listener_mutex_
DDS::LivelinessChangedStatus liveliness_changed_status_
#define OPENDDS_STRING
DDS::DataReaderListener_var listener_
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t
const StatusKind LIVELINESS_CHANGED_STATUS
String to_dds_string(unsigned short to_convert)
Boolean is_nil(T x)

◆ notify_read_conditions()

void OpenDDS::DCPS::DataReaderImpl::notify_read_conditions ( )
protected

Data has arrived into the cache, unblock waiting ReadConditions.

Definition at line 1692 of file DataReaderImpl.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), LM_ERROR, read_conditions_, reverse_sample_lock_, and OpenDDS::DCPS::ConditionImpl::signal_all().

Referenced by data_received().

1693 {
1694  //sample lock is already held
1695  ReadConditionSet local_read_conditions = read_conditions_;
1697 
1698  for (ReadConditionSet::iterator it = local_read_conditions.begin(),
1699  end = local_read_conditions.end(); it != end; ++it) {
1700  ConditionImpl* ci = dynamic_cast<ConditionImpl*>(it->in());
1701  if (ci) {
1702  ci->signal_all();
1703  } else {
1704  ACE_ERROR((LM_ERROR,
1705  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::notify_read_conditions: ")
1706  ACE_TEXT("Failed to obtain ConditionImpl - can't notify.\n")));
1707  }
1708  }
1709 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_TEXT("TCP_Factory")
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t

◆ notify_subscription_disconnected()

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_disconnected ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2466 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, CORBA::is_nil(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

2467 {
2468  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_disconnected",6);
2469 
2470  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2471  // is given to this DataReader then narrow() fails.
2472  DataReaderListener_var the_listener = get_ext_listener();
2473 
2474  if (!CORBA::is_nil(the_listener.in())) {
2475  SubscriptionLostStatus status;
2476 
2477  // Since this callback may come after remove_association which removes
2478  // the writer from id_to_handle map, we can ignore this error.
2479  this->lookup_instance_handles(pubids, status.publication_handles);
2480  the_listener->on_subscription_disconnected(this, status);
2481  }
2482 }
DataReaderListener_ptr get_ext_listener()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
Boolean is_nil(T x)

◆ notify_subscription_lost() [1/2]

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2531 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, CORBA::is_nil(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

Referenced by remove_associations_i().

2532 {
2533  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2534 
2535  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2536  // is given to this DataReader then narrow() fails.
2537  DataReaderListener_var the_listener = get_ext_listener();
2538 
2539  if (!CORBA::is_nil(the_listener.in())) {
2540  SubscriptionLostStatus status;
2541 
2542  // Since this callback may come after remove_association which removes
2543  // the writer from id_to_handle map, we can ignore this error.
2544  this->lookup_instance_handles(pubids, status.publication_handles);
2545  the_listener->on_subscription_lost(this, status);
2546  }
2547 }
DataReaderListener_ptr get_ext_listener()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
Boolean is_nil(T x)

◆ notify_subscription_lost() [2/2]

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_lost ( const DDS::InstanceHandleSeq handles)
private

Definition at line 2506 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, CORBA::is_nil(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

2507 {
2508  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_lost",6);
2509 
2510  if (!this->is_bit_) {
2511  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2512  // is given to this DataReader then narrow() fails.
2513  DataReaderListener_var the_listener = get_ext_listener();
2514 
2515  if (!CORBA::is_nil(the_listener.in())) {
2516  SubscriptionLostStatus status;
2517 
2518  CORBA::ULong len = handles.length();
2519  status.publication_handles.length(len);
2520 
2521  for (CORBA::ULong i = 0; i < len; ++ i) {
2522  status.publication_handles[i] = handles[i];
2523  }
2524 
2525  the_listener->on_subscription_lost(this, status);
2526  }
2527  }
2528 }
ACE_CDR::ULong ULong
DataReaderListener_ptr get_ext_listener()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Boolean is_nil(T x)

◆ notify_subscription_reconnected()

void OpenDDS::DCPS::DataReaderImpl::notify_subscription_reconnected ( const WriterIdSeq pubids)
virtual

Implements OpenDDS::DCPS::TransportReceiveListener.

Definition at line 2485 of file DataReaderImpl.cpp.

References DBG_ENTRY_LVL, CORBA::is_nil(), and OpenDDS::DCPS::SubscriptionLostStatus::publication_handles.

2486 {
2487  DBG_ENTRY_LVL("DataReaderImpl","notify_subscription_reconnected",6);
2488 
2489  if (!this->is_bit_) {
2490  // Narrow to DDS::DCPS::DataReaderListener. If a DDS::DataReaderListener
2491  // is given to this DataReader then narrow() fails.
2492  DataReaderListener_var the_listener = get_ext_listener();
2493 
2494  if (!CORBA::is_nil(the_listener.in())) {
2495  SubscriptionLostStatus status;
2496 
2497  // If it's reconnected then the reader should be in id_to_handle
2498  this->lookup_instance_handles(pubids, status.publication_handles);
2499 
2500  the_listener->on_subscription_reconnected(this, status);
2501  }
2502  }
2503 }
DataReaderListener_ptr get_ext_listener()
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
Boolean is_nil(T x)

◆ OPENDDS_MAP() [1/2]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
SubscriptionInstance_rch   
)

◆ OPENDDS_MAP() [2/2]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP ( CORBA::ULong  ,
HandleSet   
)
protected

◆ OPENDDS_MAP_CMP() [1/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( GUID_t  ,
WriterStats  ,
GUID_tKeyLessThan   
)

Type of collection of statistics for writers to this reader.

◆ OPENDDS_MAP_CMP() [2/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [3/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MAP_CMP ( GUID_t  ,
WriterInfo_rch  ,
GUID_tKeyLessThan   
)
private

publications writing to this reader.

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_MULTIMAP ( MonotonicTimePoint  ,
SubscriptionInstance_rch   
)
private

◆ OPENDDS_SET() [1/4]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET ( DDS::InstanceHandle_t  )

◆ OPENDDS_SET() [2/4]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET ( SubscriptionInstance_rch  )

◆ OPENDDS_SET() [3/4]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET ( DDS::InstanceHandle_t  )
protected

◆ OPENDDS_SET() [4/4]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET ( Encoding::Kind  )
protected

◆ OPENDDS_SET_CMP()

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_SET_CMP ( DDS::ReadCondition_var  ,
RCCompLess   
)
private

◆ OPENDDS_VECTOR() [1/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t  )

Referenced by signal_liveliness().

◆ OPENDDS_VECTOR() [2/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( WriterStatePair  )

◆ OPENDDS_VECTOR() [3/3]

typedef OpenDDS::DCPS::DataReaderImpl::OPENDDS_VECTOR ( void *  )

◆ ownership_filter_instance()

bool OpenDDS::DCPS::DataReaderImpl::ownership_filter_instance ( const SubscriptionInstance_rch instance,
const GUID_t pubid 
)
protected

Definition at line 2630 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_WRITE_GUARD_RETURN, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::InstanceState::get_owner(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, LM_DEBUG, and OpenDDS::DCPS::OwnershipManager::select_owner().

2632 {
2633 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2634  if (this->is_exclusive_ownership_) {
2635 
2637  WriterMapType::iterator iter = writers_.find(pubid);
2638 
2639  if (iter == writers_.end()) {
2640  if (DCPS_debug_level > 4) {
2641  // This may not be an error since it could happen that the sample
2642  // is delivered to the datareader after the write is dis-associated
2643  // with this datareader.
2644  ACE_DEBUG((LM_DEBUG,
2645  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2646  ACE_TEXT("reader %C is not associated with writer %C.\n"),
2647  LogGuid(get_guid()).c_str(),
2648  LogGuid(pubid).c_str()));
2649  }
2650  return true;
2651  }
2652 
2653 
2654  // Evaulate the owner of the instance if not selected and filter
2655  // current message if it's not from owner writer.
2656  if ( instance->instance_state_->get_owner() == GUID_UNKNOWN
2657  || ! iter->second->is_owner_evaluated(instance->instance_handle_)) {
2658  OwnershipManagerPtr owner_manager = this->ownership_manager();
2659 
2660  bool is_owner = owner_manager && owner_manager->select_owner (
2661  instance->instance_handle_,
2662  iter->second->writer_id(),
2663  iter->second->writer_qos_ownership_strength(),
2664  instance->instance_state_);
2665  iter->second->set_owner_evaluated(instance->instance_handle_, true);
2666 
2667  if (! is_owner) {
2668  if (DCPS_debug_level >= 1) {
2669  ACE_DEBUG((LM_DEBUG,
2670  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2671  ACE_TEXT("reader %C writer %C is not elected as owner %C\n"),
2672  LogGuid(get_guid()).c_str(),
2673  LogGuid(pubid).c_str(),
2674  LogGuid(instance->instance_state_->get_owner()).c_str()));
2675  }
2676  return true;
2677  }
2678  }
2679  else if (! (instance->instance_state_->get_owner() == pubid)) {
2680  if (DCPS_debug_level >= 1) {
2681  ACE_DEBUG((LM_DEBUG,
2682  ACE_TEXT("(%P|%t) DataReaderImpl::ownership_filter_instance: ")
2683  ACE_TEXT("reader %C writer %C is not owner %C\n"),
2684  LogGuid(get_guid()).c_str(),
2685  LogGuid(pubid).c_str(),
2686  LogGuid(instance->instance_state_->get_owner()).c_str()));
2687  }
2688  return true;
2689  }
2690  }
2691 #else
2692  ACE_UNUSED_ARG(pubid);
2693  ACE_UNUSED_ARG(instance);
2694 #endif
2695  return false;
2696 }
OwnershipManagerPtr ownership_manager()
#define ACE_DEBUG(X)
#define ACE_WRITE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ ownership_manager()

OwnershipManagerPtr OpenDDS::DCPS::DataReaderImpl::ownership_manager ( )
inline

Definition at line 511 of file DataReaderImpl.h.

References lookup_instance(), and OpenDDS::DCPS::OPENDDS_VECTOR().

Referenced by cleanup(), data_received(), and release_instance().

511 { return OwnershipManagerPtr(this); }

◆ parent()

RcHandle< EntityImpl > OpenDDS::DCPS::DataReaderImpl::parent ( void  ) const
virtual

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1678 of file DataReaderImpl.cpp.

References subscriber_servant_.

1679 {
1680  return subscriber_servant_.lock();
1681 }
WeakRcHandle< SubscriberImpl > subscriber_servant_

◆ post_read_or_take()

void OpenDDS::DCPS::DataReaderImpl::post_read_or_take ( )
protected

Definition at line 2793 of file DataReaderImpl.cpp.

References DDS::DATA_AVAILABLE_STATUS, and DDS::DATA_ON_READERS_STATUS.

2794 {
2796  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2797  if (subscriber) {
2798  subscriber->set_status_changed_flag(
2800  }
2801 }
const StatusKind DATA_ON_READERS_STATUS
const StatusKind DATA_AVAILABLE_STATUS
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
RcHandle< SubscriberImpl > get_subscriber_servant()

◆ prepare_to_delete()

void OpenDDS::DCPS::DataReaderImpl::prepare_to_delete ( )
protected

Definition at line 2410 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::Observer::e_DELETED, and OpenDDS::DCPS::Observer::on_deleted().

2411 {
2412  const Observer_rch observer = get_observer(Observer::e_DELETED);
2413  if (observer) {
2414  observer->on_deleted(this);
2415  }
2416 
2417  this->set_deleted(true);
2418  this->stop_associating();
2419  this->send_final_acks();
2421 }
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
ConditionVariable< ACE_Thread_Mutex > subscription_id_condition_
bool notify_all()
Unblock all of the threads waiting on this condition.
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
void set_deleted(bool state)
Definition: EntityImpl.cpp:83

◆ process_deadline()

void OpenDDS::DCPS::DataReaderImpl::process_deadline ( SubscriptionInstance_rch  instance,
const MonotonicTimePoint now,
bool  timer_called 
)
private

Definition at line 3611 of file DataReaderImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::SubscriptionInstance::cur_sample_tv_, OpenDDS::DCPS::SubscriptionInstance::deadline_, OpenDDS::DCPS::SubscriptionInstance::instance_handle_, OpenDDS::DCPS::SubscriptionInstance::instance_state_, OpenDDS::DCPS::InstanceState::is_exclusive(), CORBA::is_nil(), OpenDDS::DCPS::TimePoint_T< AceClock >::is_zero(), OpenDDS::DCPS::SubscriptionInstance::last_sample_tv_, OpenDDS::DCPS::OwnershipManager::remove_writers(), DDS::REQUESTED_DEADLINE_MISSED_STATUS, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

3614 {
3615  // Should be called with sample_lock_.
3616 
3617  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3618  bool missed = false;
3619 
3620  if (instance->cur_sample_tv_.is_zero()) { // not received any sample.
3621  missed = true;
3622 
3623  } else if (timer_called) { // handle_timeout is called
3624  missed = (now - instance->cur_sample_tv_) >= deadline_period_;
3625 
3626  } else { // upon receiving sample.
3627  missed = (instance->cur_sample_tv_ - instance->last_sample_tv_) > deadline_period_;
3628  }
3629 
3630  if (missed) {
3632  // Only update the status upon timer is called and not
3633  // when receiving a sample after the interval.
3634  // Otherwise the counter is doubled.
3635  if (timer_called) {
3639  requested_deadline_missed_status_.last_instance_handle = instance->instance_handle_;
3640 
3642 
3643  DDS::DataReaderListener_var listener = listener_for(DDS::REQUESTED_DEADLINE_MISSED_STATUS);
3644 
3645 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
3646  if (instance->instance_state_->is_exclusive()) {
3648  if (owner_manager)
3649  owner_manager->remove_writers (instance->instance_handle_);
3650  }
3651 #endif
3652 
3653  if (!CORBA::is_nil(listener.in())) {
3654  // Copy before releasing the lock.
3656 
3657  // Release the lock during the upcall.
3659  // @todo Will this operation ever throw? If so we may want to
3660  // catch all exceptions, and act accordingly.
3661  listener->on_requested_deadline_missed(this, status);
3662 
3663  // We need to update the last total count value to our current total
3664  // so that the next time we will calculate the correct total_count_change;
3666  }
3667 
3669  }
3670  }
3671 
3672  // This next part is without status_lock_ held to avoid reactor deadlock.
3673  if (timer_called) {
3674  instance->deadline_ = MonotonicTimePoint::zero_value;
3675  schedule_deadline(instance, timer_called);
3676  } else {
3677  cancel_deadline(instance);
3678  schedule_deadline(instance, timer_called);
3679  }
3680  }
3681 }
OwnershipManagerPtr ownership_manager()
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::RequestedDeadlineMissedStatus requested_deadline_missed_status_
CORBA::Long last_deadline_missed_total_count_
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
void cancel_deadline(SubscriptionInstance_rch instance)
void schedule_deadline(SubscriptionInstance_rch instance, bool timer_called)
ACE_Reverse_Lock< ACE_Recursive_Thread_Mutex > Reverse_Lock_t
const StatusKind REQUESTED_DEADLINE_MISSED_STATUS
Boolean is_nil(T x)
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ process_latency()

void OpenDDS::DCPS::DataReaderImpl::process_latency ( const ReceivedDataSample sample)

NB: This message is generated contemporaneously with a similar message from writer_activity(). That message is not marked as an error, so we follow that lead and leave this as an informational message, guarded by debug level. This seems to be due to late samples (samples delivered after an association has been torn down). We may want to promote this to a warning if other conditions causing this symptom are discovered.

Definition at line 2284 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DCPS_debug_level, DDS::DURATION_ZERO_NSEC, DDS::DURATION_ZERO_SEC, OpenDDS::DCPS::ReceivedDataSample::header_, LM_DEBUG, OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TimeDuration::str(), and timestamp().

Referenced by data_received().

2285 {
2287  StatsMapType::iterator location = this->statistics_.find(sample.header_.publication_id_);
2288 
2289  if (location != this->statistics_.end()) {
2291 
2292  // Only when the user has specified a latency budget or statistics
2293  // are enabled we need to calculate our latency
2294  if ((this->statistics_enabled()) ||
2295  (this->qos_.latency_budget.duration > zero)) {
2296  const DDS::Time_t timestamp = {
2297  sample.header_.source_timestamp_sec_,
2298  sample.header_.source_timestamp_nanosec_
2299  };
2300  const TimeDuration latency = SystemTimePoint::now() - SystemTimePoint(timestamp);
2301 
2302  if (this->statistics_enabled()) {
2303  location->second.add_stat(latency);
2304  }
2305 
2306  if (DCPS_debug_level > 9) {
2307  ACE_DEBUG((LM_DEBUG,
2308  ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2309  ACE_TEXT("measured latency of %C for current sample.\n"),
2310  latency.str().c_str()));
2311  }
2312 
2313  if (this->qos_.latency_budget.duration > zero) {
2314  // Check latency against the budget.
2315  if (latency > TimeDuration(this->qos_.latency_budget.duration)) {
2316  this->notify_latency(sample.header_.publication_id_);
2317  }
2318  }
2319  }
2320  } else if (DCPS_debug_level > 0) {
2321  /// NB: This message is generated contemporaneously with a similar
2322  /// message from writer_activity(). That message is not marked
2323  /// as an error, so we follow that lead and leave this as an
2324  /// informational message, guarded by debug level. This seems
2325  /// to be due to late samples (samples delivered after an
2326  /// association has been torn down). We may want to promote this
2327  /// to a warning if other conditions causing this symptom are
2328  /// discovered.
2329  ACE_DEBUG((LM_DEBUG,
2330  ACE_TEXT("(%P|%t) DataReaderImpl::process_latency() - ")
2331  ACE_TEXT("reader %C is not associated with writer %C (late sample?).\n"),
2332  LogGuid(get_guid()).c_str(),
2333  LogGuid(sample.header_.publication_id_).c_str()));
2334  }
2335 }
#define ACE_DEBUG(X)
StatsMapType statistics_
Statistics for this reader, collected for each writer.
TimePoint_T< SystemClock > SystemTimePoint
Definition: TimeTypes.h:32
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
void notify_latency(GUID_t writer)
virtual CORBA::Boolean statistics_enabled()
LatencyBudgetQosPolicy latency_budget
const long DURATION_ZERO_SEC
Definition: DdsDcpsCore.idl:75
const unsigned long DURATION_ZERO_NSEC
Definition: DdsDcpsCore.idl:76
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex statistics_lock_
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)

◆ purge_data()

virtual void OpenDDS::DCPS::DataReaderImpl::purge_data ( SubscriptionInstance_rch  instance)
protectedpure virtual

◆ qos_change()

void OpenDDS::DCPS::DataReaderImpl::qos_change ( const DDS::DataReaderQos qos)
protectedvirtual

Reimplemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >, and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >.

Definition at line 882 of file DataReaderImpl.cpp.

References cancel_all_deadlines(), DDS::DataReaderQos::deadline, deadline_period_, deadline_queue_enabled_, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, DDS::Duration_t::nanosec, DDS::DeadlineQosPolicy::period, qos_, reset_deadline_period(), and DDS::Duration_t::sec.

Referenced by OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >::qos_change(), and set_qos().

883 {
884  // Reset the deadline timer if the period has changed.
885  if (qos_.deadline.period.sec != qos.deadline.period.sec ||
889  deadline_period_ = TimeDuration(qos.deadline.period);
891  } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC &&
894  deadline_queue_enabled_ = false;
895  } else {
896  reset_deadline_period(TimeDuration(qos.deadline.period));
897  }
898  }
899 }
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
void reset_deadline_period(const TimeDuration &deadline_period)
DeadlineQosPolicy deadline

◆ raw_latency_buffer_size()

ACE_INLINE unsigned int & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size ( )

Configure the size of the raw data collection buffer.

Definition at line 19 of file DataReaderImpl.inl.

References ACE_INLINE, and raw_latency_buffer_size_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

20 {
21  return this->raw_latency_buffer_size_;
22 }
unsigned int raw_latency_buffer_size_
Bound (or initial reservation) of raw latency buffer.

◆ raw_latency_buffer_type()

ACE_INLINE OpenDDS::DCPS::DataCollector< double >::OnFull & OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type ( )

Configure the type of the raw data collection buffer.

Definition at line 26 of file DataReaderImpl.inl.

References ACE_INLINE, and raw_latency_buffer_type_.

Referenced by OpenDDS::DCPS::SubscriberImpl::create_datareader(), and OpenDDS::DCPS::MultiTopicDataReaderBase::init().

27 {
28  return this->raw_latency_buffer_type_;
29 }
DataCollector< double >::OnFull raw_latency_buffer_type_
Type of raw latency data buffer.

◆ raw_latency_statistics()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE const OpenDDS::DCPS::DataReaderImpl::StatsMapType & OpenDDS::DCPS::DataReaderImpl::raw_latency_statistics ( ) const

Expose the statistics container.

Definition at line 12 of file DataReaderImpl.inl.

References ACE_INLINE, and statistics_.

13 {
14  return this->statistics_;
15 }
StatsMapType statistics_
Statistics for this reader, collected for each writer.

◆ read_generic()

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count 
)
pure virtual

◆ read_instance_generic()

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
pure virtual

◆ read_next_instance_generic()

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
pure virtual

◆ register_for_writer()

void OpenDDS::DCPS::DataReaderImpl::register_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
virtual

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3254 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_writer().

3259 {
3260  TransportClient::register_for_writer(participant, readerid, writerid, locators, listener);
3261 }
void register_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)

◆ reject_coherent()

void OpenDDS::DCPS::DataReaderImpl::reject_coherent ( const GUID_t writer_id,
const GUID_t publisher_id 
)

Definition at line 2941 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

2943 {
2944  if (::OpenDDS::DCPS::DCPS_debug_level > 0) {
2945  ACE_DEBUG((LM_DEBUG,
2946  ACE_TEXT("(%P|%t) DataReaderImpl::reject_coherent()")
2947  ACE_TEXT(" reader %C writer %C publisher %C\n"),
2948  LogGuid(get_guid()).c_str(),
2949  LogGuid(writer_id).c_str(),
2950  LogGuid(publisher_id).c_str()));
2951  }
2952 
2953  SubscriptionInstanceSet localsubs;
2954  {
2955  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
2956  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
2957  iter != this->instances_.end(); ++iter) {
2958  localsubs.insert(iter->second);
2959  }
2960  }
2962  for (SubscriptionInstanceSet::iterator iter = localsubs.begin();
2963  iter != localsubs.end(); iter++) {
2964  (*iter)->rcvd_strategy_->reject_coherent(writer_id, publisher_id);
2965  }
2966  this->reset_coherent_info(writer_id, publisher_id);
2967 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
void reset_coherent_info(const GUID_t &writer_id, const GUID_t &publisher_id)

◆ release_all_instances()

virtual void OpenDDS::DCPS::DataReaderImpl::release_all_instances ( )
pure virtual

Release all instances held by the reader.

Implemented in OpenDDS::DCPS::DataReaderImpl_T< MessageType >, and OpenDDS::DCPS::DataReaderImpl_T< DynamicSample >.

◆ release_instance()

void OpenDDS::DCPS::DataReaderImpl::release_instance ( DDS::InstanceHandle_t  handle)

Release the instance with the handle.

Definition at line 1944 of file DataReaderImpl.cpp.

References ACE_ERROR, ACE_GUARD, get_handle_instance(), instances_, instances_lock_, LM_ERROR, monitor_, ownership_manager(), purge_data(), release_instance_i(), OpenDDS::DCPS::OwnershipManager::remove_writers(), and sample_lock_.

1945 {
1946 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1947  OwnershipManagerPtr owner_manager = this->ownership_manager();
1948  if (owner_manager) {
1949  owner_manager->remove_writers(handle);
1950  }
1951 #endif
1952 
1954  SubscriptionInstance_rch instance = this->get_handle_instance(handle);
1955 
1956  if (!instance) {
1957  ACE_ERROR((LM_ERROR, "(%P|%t) DataReaderImpl::release_instance "
1958  "could not find the instance by handle 0x%x\n", handle));
1959  return;
1960  }
1961 
1962  this->purge_data(instance);
1963 
1964  {
1966  instances_.erase(handle);
1967  }
1968 
1969  this->release_instance_i(handle);
1970  if (this->monitor_) {
1971  this->monitor_->report();
1972  }
1973 }
OwnershipManagerPtr ownership_manager()
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ACE_Recursive_Thread_Mutex instances_lock_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
virtual void purge_data(SubscriptionInstance_rch instance)=0
virtual void release_instance_i(DDS::InstanceHandle_t handle)=0

◆ release_instance_i()

virtual void OpenDDS::DCPS::DataReaderImpl::release_instance_i ( DDS::InstanceHandle_t  handle)
protectedpure virtual

◆ remove_all_associations()

void OpenDDS::DCPS::DataReaderImpl::remove_all_associations ( )

Definition at line 640 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_READ_GUARD, ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, remove_associations(), OpenDDS::DCPS::TransportClient::stop_associating(), OpenDDS::DCPS::TransportClient::transport_stop(), writers_, and writers_lock_.

641 {
642  DBG_ENTRY_LVL("DataReaderImpl","remove_all_associations",6);
644 
646  int size;
647 
648  {
650 
651  size = static_cast<int>(writers_.size());
652  writers.length(size);
653 
654  WriterMapType::iterator curr_writer = writers_.begin();
655  WriterMapType::iterator end_writer = writers_.end();
656 
657  int i = 0;
658 
659  while (curr_writer != end_writer) {
660  writers[i++] = curr_writer->first;
661  ++curr_writer;
662  }
663  }
664 
665  try {
666  if (0 < size) {
667  remove_associations(writers, false);
668  }
669  } catch (const CORBA::Exception&) {
670  ACE_DEBUG((LM_WARNING,
671  ACE_TEXT("(%P|%t) WARNING: DataReaderImpl::remove_all_associations() - ")
672  ACE_TEXT("caught exception from remove_associations.\n")));
673  }
674 
675  transport_stop();
676 }
virtual void remove_associations(const WriterIdSeq &writers, bool callback)
#define ACE_DEBUG(X)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
sequence< GUID_t > WriterIdSeq
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ remove_associations()

void OpenDDS::DCPS::DataReaderImpl::remove_associations ( const WriterIdSeq writers,
bool  callback 
)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 452 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_WRITE_GUARD, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Observer::e_DISASSOCIATED, OpenDDS::DCPS::EntityImpl::get_deleted(), get_guid(), OpenDDS::DCPS::EntityImpl::get_observer(), is_bit_, LM_DEBUG, OpenDDS::DCPS::Observer::on_disassociated(), remove_associations_i(), statistics_, statistics_lock_, OpenDDS::DCPS::TransportClient::stop_associating(), and writers_lock_.

Referenced by remove_all_associations().

454 {
455  DBG_ENTRY_LVL("DataReaderImpl", "remove_associations", 6);
456 
457  if (writers.length() == 0) {
458  return;
459  }
460 
462  if (observer) {
463  for (CORBA::ULong i = 0; i < writers.length(); ++i) {
464  observer->on_disassociated(this, writers[i]);
465  }
466  }
467 
468  if (DCPS_debug_level >= 1) {
469  ACE_DEBUG((LM_DEBUG,
470  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations: ")
471  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
472  is_bit_,
473  LogGuid(get_guid()).c_str(),
474  LogGuid(writers[0]).c_str(),
475  writers.length()));
476  }
477  if (!get_deleted()) {
478  // stop pending associations for these writer ids
479  this->stop_associating(writers.get_buffer(), writers.length());
480 
481  {
482  CORBA::ULong wr_len = writers.length();
484 
485  for (CORBA::ULong i = 0; i < wr_len; i++) {
486  const GUID_t writer_id = writers[i];
487  {
489  statistics_.erase(writer_id);
490  }
491  }
492  }
493  }
494 
495  remove_associations_i(writers, notify_lost);
496 }
#define ACE_DEBUG(X)
StatsMapType statistics_
Statistics for this reader, collected for each writer.
virtual void remove_associations_i(const WriterIdSeq &writers, bool callback)
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
ACE_CDR::ULong ULong
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Recursive_Thread_Mutex statistics_lock_
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)

◆ remove_associations_i()

void OpenDDS::DCPS::DataReaderImpl::remove_associations_i ( const WriterIdSeq writers,
bool  callback 
)
protectedvirtual

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1

Definition at line 499 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_WRITE_GUARD, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), end_historic_sweeper_, get_guid(), is_bit_, CORBA::is_nil(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), LM_DEBUG, lookup_instance_handles(), monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), notify_subscription_lost(), publication_handle_lock_, publication_id_to_handle_map_, OpenDDS::DCPS::push_back(), resume_sample_processing(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.

Referenced by remove_associations().

501 {
502  DBG_ENTRY_LVL("DataReaderImpl", "remove_associations_i", 6);
503 
504  if (writers.length() == 0) {
505  return;
506  }
507 
508  if (DCPS_debug_level >= 1) {
509  ACE_DEBUG((LM_DEBUG,
510  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
511  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
512  is_bit_,
513  LogGuid(get_guid()).c_str(),
514  LogGuid(writers[0]).c_str(),
515  writers.length()));
516  }
517  DDS::InstanceHandleSeq handles;
518 
519  CORBA::ULong wr_len = writers.length();
520 
521  // Flush historic samples and/or allow in-progress delivery of historic samples to complete
522  for (CORBA::ULong i = 0; i < wr_len; i++) {
523  resume_sample_processing(writers[i]);
524  }
525 
526  // This is used to hold the list of writers which were actually
527  // removed, which is a proper subset of the writers which were
528  // requested to be removed.
529  WriterIdSeq updated_writers;
530  WriterMapType removed_writers;
531 
532  //Remove the writers from writer list. If the supplied writer
533  //is not in the cached writers list then it is already removed.
534  //We just need remove the writers in the list that have not been
535  //removed.
536  {
538 
539  for (CORBA::ULong i = 0; i < wr_len; i++) {
540  const GUID_t writer_id = writers[i];
541 
542  WriterMapType::iterator it = this->writers_.find(writer_id);
543 
544  if (it != this->writers_.end()) {
545  removed_writers.insert(*it);
546  end_historic_sweeper_->cancel_timer(it->second);
547  }
548 
549  if (this->writers_.erase(writer_id) == 0) {
550  if (DCPS_debug_level >= 1) {
551  ACE_DEBUG((LM_DEBUG,
552  ACE_TEXT("(%P|%t) DataReaderImpl::remove_associations_i: ")
553  ACE_TEXT("the writer local %C was already removed.\n"),
554  LogGuid(writer_id).c_str()));
555  }
556 
557  } else {
558  push_back(updated_writers, writer_id);
559  }
560  }
561  }
562 
563  for (WriterMapType::iterator it = removed_writers.begin(); it != removed_writers.end(); ++it) {
564  it->second->removed();
565  }
566  removed_writers.clear();
567 
568  wr_len = updated_writers.length();
569 
570  // Return now if the supplied writers have been removed already.
571  if (wr_len == 0) {
572  return;
573  }
574 
575  if (!is_bit_) {
576  // The writer should be in the id_to_handle map at this time.
577  this->lookup_instance_handles(updated_writers, handles);
578 
580 
581  for (CORBA::ULong i = 0; i < wr_len; ++i) {
582  publication_id_to_handle_map_.erase(updated_writers[i]);
583  }
584  }
585 
586  for (CORBA::ULong i = 0; i < updated_writers.length(); ++i) {
587  {
588  this->disassociate(updated_writers[i]);
589  }
590  }
591 
592  // Mirror the add_associations SUBSCRIPTION_MATCHED_STATUS processing.
593  if (!this->is_bit_) {
595 
596  // Derive the change in the number of publications writing to this reader.
597  int matchedPublications = static_cast<int>(this->publication_id_to_handle_map_.size());
599  = matchedPublications - this->subscription_match_status_.current_count;
600 
601  // Only process status if the number of publications has changed.
603  this->subscription_match_status_.current_count = matchedPublications;
604 
605  /// Section 7.1.4.1: total_count will not decrement.
606 
607  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
609  = handles[ wr_len - 1];
610 
612 
613  DDS::DataReaderListener_var listener
615 
616  if (!CORBA::is_nil(listener.in())) {
617  listener->on_subscription_matched(this, this->subscription_match_status_);
618 
619  // Client will look at it so next time it looks the change should be 0
622  }
624  }
625  }
626 
627  // If this remove_association is invoked when the InfoRepo
628  // detects a lost writer then make a callback to notify
629  // subscription lost.
630  if (notify_lost) {
631  this->notify_subscription_lost(handles);
632  }
633 
634  if (this->monitor_) {
635  this->monitor_->report();
636  }
637 }
#define ACE_DEBUG(X)
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
void disassociate(const GUID_t &peerId)
RepoIdToHandleMap publication_id_to_handle_map_
void notify_subscription_lost(const WriterIdSeq &pubids)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
ACE_CDR::ULong ULong
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
sequence< GUID_t > WriterIdSeq
ACE_TEXT("TCP_Factory")
const StatusKind SUBSCRIPTION_MATCHED_STATUS
ACE_Recursive_Thread_Mutex publication_handle_lock_
DDS::SubscriptionMatchedStatus subscription_match_status_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
void resume_sample_processing(const GUID_t &pub_id)
when done handling historic samples, resume
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
void lookup_instance_handles(const WriterIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the publication repo ids.
Boolean is_nil(T x)

◆ remove_from_lookup_maps()

void OpenDDS::DCPS::DataReaderImpl::remove_from_lookup_maps ( DDS::InstanceHandle_t  handle)
protected

Definition at line 3561 of file DataReaderImpl.cpp.

3562 {
3563  for (LookupMap::iterator it = combined_state_lookup_.begin(), the_end = combined_state_lookup_.end(); it != the_end; ++it) {
3564  if (it->first == 0) continue;
3565  it->second.erase(handle);
3566  }
3567 }

◆ reschedule_deadline()

void OpenDDS::DCPS::DataReaderImpl::reschedule_deadline ( SubscriptionInstance_rch  instance,
const MonotonicTimePoint now 
)
private

Definition at line 3709 of file DataReaderImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::SubscriptionInstance::deadline_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

3711 {
3713 
3714  // So the datareader can call back into us.
3715  if (instance->deadline_ != MonotonicTimePoint::zero_value) {
3716 
3717  // Remove.
3718  for (DeadlineQueue::iterator pos = deadline_queue_.lower_bound(instance->deadline_), limit = deadline_queue_.upper_bound(instance->deadline_); pos != limit; ++pos) {
3719  if (pos->second == instance) {
3720  deadline_queue_.erase(pos);
3721  break;
3722  }
3723  }
3724 
3725  instance->deadline_ = now + (deadline_period_ - (instance->deadline_ - now));
3726 
3727  const bool schedule = deadline_queue_.empty();
3728  deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3729  if (schedule) {
3730  deadline_task_->schedule(deadline_period_);
3731  } else if (deadline_queue_.begin()->second == instance) {
3732  // Moved to front.
3733  deadline_task_->cancel();
3734  deadline_task_->schedule(deadline_period_);
3735  }
3736  }
3737 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RcHandle< DRISporadicTask > deadline_task_
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ reset_coherent_info()

void OpenDDS::DCPS::DataReaderImpl::reset_coherent_info ( const GUID_t writer_id,
const GUID_t publisher_id 
)

Definition at line 2970 of file DataReaderImpl.cpp.

References ACE_READ_GUARD.

2972 {
2973  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
2974 
2975  WriterMapType::iterator itEnd = this->writers_.end();
2976  for (WriterMapType::iterator it = this->writers_.begin();
2977  it != itEnd; ++it) {
2978  if (it->second->writer_id() == writer_id
2979  && it->second->publisher_id() == publisher_id) {
2980  it->second->reset_coherent_info();
2981  }
2982  }
2983 }
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.

◆ reset_deadline_period()

void OpenDDS::DCPS::DataReaderImpl::reset_deadline_period ( const TimeDuration deadline_period)
private

Definition at line 3690 of file DataReaderImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

Referenced by qos_change().

3691 {
3692  if (deadline_period_ != deadline_period) {
3693  deadline_period_ = deadline_period;
3694 
3696  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
3698  for (SubscriptionInstanceMapType::iterator iter = this->instances_.begin();
3699  iter != this->instances_.end();
3700  ++iter) {
3701  if (iter->second->deadline_ != MonotonicTimePoint::zero_value) {
3702  reschedule_deadline(iter->second, now);
3703  }
3704  }
3705  }
3706  }
3707 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ACE_Recursive_Thread_Mutex instances_lock_
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
void reschedule_deadline(SubscriptionInstance_rch instance, const MonotonicTimePoint &now)
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ reset_latency_stats()

void OpenDDS::DCPS::DataReaderImpl::reset_latency_stats ( )
virtual

Clear any intermediate statistical values.

Implements OpenDDS::DCPS::DataReaderEx.

Definition at line 2386 of file DataReaderImpl.cpp.

2387 {
2389  for (StatsMapType::iterator current = this->statistics_.begin();
2390  current != this->statistics_.end();
2391  ++current) {
2392  current->second.reset_stats();
2393  }
2394 }
StatsMapType statistics_
Statistics for this reader, collected for each writer.
ACE_Recursive_Thread_Mutex statistics_lock_

◆ reset_ownership()

void OpenDDS::DCPS::DataReaderImpl::reset_ownership ( DDS::InstanceHandle_t  instance)

Definition at line 3163 of file DataReaderImpl.cpp.

References ACE_WRITE_GUARD.

3164 {
3165  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3166  for (WriterMapType::iterator iter = writers_.begin();
3167  iter != writers_.end();
3168  ++iter) {
3169  iter->second->set_owner_evaluated(instance, false);
3170  }
3171 }
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)

◆ resume_sample_processing()

void OpenDDS::DCPS::DataReaderImpl::resume_sample_processing ( const GUID_t pub_id)
private

when done handling historic samples, resume

Definition at line 3174 of file DataReaderImpl.cpp.

References ACE_WRITE_GUARD, OpenDDS::DCPS::WriterInfo::check_end_historic_samples(), OpenDDS::DCPS::WriterInfo::finished_delivering_historic(), and OpenDDS::DCPS::OPENDDS_MAP().

Referenced by data_received(), and remove_associations_i().

3175 {
3176  WriterInfo_rch info;
3177  {
3178  ACE_WRITE_GUARD(ACE_RW_Thread_Mutex, write_guard, this->writers_lock_);
3179  WriterMapType::iterator where = writers_.find(pub_id);
3180  if (writers_.end() != where) {
3181  info = where->second;
3182  }
3183  }
3184 
3185  if (info) {
3186  OPENDDS_MAP(SequenceNumber, ReceivedDataSample) to_deliver;
3187  // Stop filtering these
3188  if (info->check_end_historic_samples(end_historic_sweeper_.in(), to_deliver)) {
3189  deliver_historic(to_deliver);
3190  info->finished_delivering_historic();
3191  }
3192  }
3193 }
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
void deliver_historic(OPENDDS_MAP(SequenceNumber, ReceivedDataSample)&samples)
deliver samples that were held by check_historic()
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
RcHandle< EndHistoricSamplesMissedSweeper > end_historic_sweeper_
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
typedef OPENDDS_MAP(DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType

◆ return_handle()

void OpenDDS::DCPS::DataReaderImpl::return_handle ( DDS::InstanceHandle_t  handle)

Definition at line 2457 of file DataReaderImpl.cpp.

2458 {
2459  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
2460  if (participant) {
2461  participant->return_handle(handle);
2462  }
2463 }
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ sample_info()

void OpenDDS::DCPS::DataReaderImpl::sample_info ( DDS::SampleInfo sample_info,
const ReceivedDataElement ptr 
)
protected

Definition at line 1769 of file DataReaderImpl.cpp.

References DDS::SampleInfo::absolute_generation_rank, DDS::SampleInfo::disposed_generation_count, OpenDDS::DCPS::ReceivedDataElement::disposed_generation_count_, DDS::SampleInfo::generation_rank, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::SampleInfo::no_writers_generation_count, OpenDDS::DCPS::ReceivedDataElement::no_writers_generation_count_, DDS::SampleInfo::opendds_reserved_publication_seq, DDS::SampleInfo::sample_rank, and OpenDDS::DCPS::ReceivedDataElement::sequence_.

1771 {
1772 
1773  sample_info.sample_rank = 0;
1774 
1775  // generation_rank =
1776  // (MRSIC.disposed_generation_count +
1777  // MRSIC.no_writers_generation_count)
1778  // - (S.disposed_generation_count +
1779  // S.no_writers_generation_count)
1780  //
1781  sample_info.generation_rank =
1782  (sample_info.disposed_generation_count +
1783  sample_info.no_writers_generation_count) -
1784  sample_info.generation_rank;
1785 
1786  // absolute_generation_rank =
1787  // (MRS.disposed_generation_count +
1788  // MRS.no_writers_generation_count)
1789  // - (S.disposed_generation_count +
1790  // S.no_writers_generation_count)
1791  //
1792  sample_info.absolute_generation_rank =
1793  (static_cast<CORBA::Long>(ptr->disposed_generation_count_) +
1794  static_cast<CORBA::Long>(ptr->no_writers_generation_count_)) -
1795  sample_info.absolute_generation_rank;
1796 
1797  sample_info.opendds_reserved_publication_seq = ptr->sequence_.getValue();
1798 }
ACE_CDR::Long Long
long absolute_generation_rank
long long opendds_reserved_publication_seq
long no_writers_generation_count
long disposed_generation_count

◆ schedule_deadline()

void OpenDDS::DCPS::DataReaderImpl::schedule_deadline ( SubscriptionInstance_rch  instance,
bool  timer_called 
)
private

Definition at line 3577 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::SubscriptionInstance::deadline_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::zero_value.

3579 {
3580  // Should be called with sample_lock_.
3581  if (instance->deadline_ == MonotonicTimePoint::zero_value) {
3582  instance->deadline_ = MonotonicTimePoint::now() + deadline_period_;
3583  const bool schedule = deadline_queue_.empty();
3584  deadline_queue_.insert(std::make_pair(instance->deadline_, instance));
3585  if (!timer_called) {
3586  if (schedule) {
3587  deadline_task_->schedule(deadline_period_);
3588  } else if (deadline_queue_.begin()->second == instance) {
3589  // Moved to front.
3590  deadline_task_->cancel();
3591  deadline_task_->schedule(deadline_period_);
3592  }
3593  }
3594  }
3595 }
RcHandle< DRISporadicTask > deadline_task_
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
static const TimePoint_T< MonotonicClock > zero_value
Definition: TimePoint_T.h:40

◆ set_instance_state()

void OpenDDS::DCPS::DataReaderImpl::set_instance_state ( DDS::InstanceHandle_t  instance,
DDS::InstanceStateKind  state,
const SystemTimePoint timestamp = SystemTimePoint::now(),
const GUID_t guid = GUID_UNKNOWN 
)
inline

Definition at line 565 of file DataReaderImpl.h.

References ACE_GUARD, DDS::HANDLE_NIL, instance_states, sample_states, timestamp(), and view_states.

Referenced by OpenDDS::DCPS::MultiTopicDataReaderBase::data_available(), OpenDDS::DCPS::BitSubscriber::remove_connection_record(), OpenDDS::DCPS::StaticParticipant::remove_discovered_participant(), OpenDDS::DCPS::BitSubscriber::remove_i(), and OpenDDS::DCPS::BitSubscriber::remove_thread_status().

569  {
570  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
571  {
573  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(guid);
574  if (pos != publication_id_to_handle_map_.end()) {
575  publication_handle = pos->second;
576  }
577  }
578 
580  set_instance_state_i(instance, publication_handle, state, timestamp, guid);
581  }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RepoIdToHandleMap publication_id_to_handle_map_
const InstanceHandle_t HANDLE_NIL
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
virtual void set_instance_state_i(DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &guid)=0
ACE_Recursive_Thread_Mutex publication_handle_lock_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51

◆ set_instance_state_i()

virtual void OpenDDS::DCPS::DataReaderImpl::set_instance_state_i ( DDS::InstanceHandle_t  instance,
DDS::InstanceHandle_t  publication_handle,
DDS::InstanceStateKind  state,
const SystemTimePoint timestamp,
const GUID_t guid 
)
privatepure virtual

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_listener ( DDS::DataReaderListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 909 of file DataReaderImpl.cpp.

References listener_, listener_mask_, listener_mutex_, and DDS::RETCODE_OK.

Referenced by cleanup(), and init().

912 {
914  listener_mask_ = mask;
915  //note: OK to duplicate a nil object ref
916  listener_ = DDS::DataReaderListener::_duplicate(a_listener);
917  return DDS::RETCODE_OK;
918 }
const ReturnCode_t RETCODE_OK
ACE_Thread_Mutex listener_mutex_
DDS::DataReaderListener_var listener_

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::set_qos ( const DDS::DataReaderQos qos)
virtual

Definition at line 824 of file DataReaderImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, dp_id_, OpenDDS::DCPS::Observer::e_QOS_CHANGED, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::EntityImpl::get_observer(), get_subscriber_servant(), LM_ERROR, OpenDDS::DCPS::Observer::on_qos_changed(), OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, passed_qos_, qos_, qos_change(), DDS::DataReaderQos::representation, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, OpenDDS::DCPS::WriterInfoListener::subscription_id_, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and DDS::DataRepresentationQosPolicy::value.

825 {
829 
830  DDS::DataReaderQos new_qos = qos;
832  if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
833 
834  if (qos_ == new_qos)
835  return DDS::RETCODE_OK;
836 
837  if (enabled_) {
838  if (!Qos_Helper::changeable(qos_, new_qos)) {
840 
841  } else {
842  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
843  DDS::SubscriberQos subscriberQos;
844 
845  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
846  bool status = false;
847  if (subscriber) {
848  subscriber->get_qos(subscriberQos);
849  status =
850  disco->update_subscription_qos(
851  domain_id_,
852  dp_id_,
854  new_qos,
855  subscriberQos);
856  }
857  if (!status) {
858  ACE_ERROR_RETURN((LM_ERROR,
859  ACE_TEXT("(%P|%t) DataReaderImpl::set_qos, ")
860  ACE_TEXT("qos not updated.\n")),
862  }
863  }
864  }
865 
866  qos_change(new_qos);
867  qos_ = new_qos;
868  passed_qos_ = qos;
869 
871  if (observer) {
872  observer->on_qos_changed(this);
873  }
874 
875  return DDS::RETCODE_OK;
876 
877  } else {
879  }
880 }
virtual void qos_change(const DDS::DataReaderQos &qos)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
DDS::DataReaderQos passed_qos_
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DataRepresentationQosPolicy representation
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
const ReturnCode_t RETCODE_ERROR
DataRepresentationIdSeq value
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
ACE_TEXT("TCP_Factory")
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
RcHandle< SubscriberImpl > get_subscriber_servant()
#define ACE_ERROR_RETURN(X, Y)
const ReturnCode_t RETCODE_UNSUPPORTED
#define TheServiceParticipant
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)

◆ set_sample_lost_status()

void OpenDDS::DCPS::DataReaderImpl::set_sample_lost_status ( const DDS::SampleLostStatus status)
protected

!!caller should have acquired sample_lock_

Definition at line 2260 of file DataReaderImpl.cpp.

2262 {
2263  //!!!caller should have acquired sample_lock_
2264  sample_lost_status_ = status;
2265 }
DDS::SampleLostStatus sample_lost_status_

◆ set_sample_rejected_status()

void OpenDDS::DCPS::DataReaderImpl::set_sample_rejected_status ( const DDS::SampleRejectedStatus status)
protected

!!caller should have acquired sample_lock_

Definition at line 2268 of file DataReaderImpl.cpp.

2270 {
2271  //!!!caller should have acquired sample_lock_
2272  sample_rejected_status_ = status;
2273 }
DDS::SampleRejectedStatus sample_rejected_status_

◆ set_subscriber_qos()

void OpenDDS::DCPS::DataReaderImpl::set_subscriber_qos ( const DDS::SubscriberQos qos)

Definition at line 3116 of file DataReaderImpl.cpp.

3118 {
3119  this->subqos_ = qos;
3120 }

◆ setup_deserialization()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::setup_deserialization ( )
protected

Setup deserialization options.

Definition at line 3291 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Encoding::kind_to_string(), OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, OpenDDS::DCPS::Encoding::KIND_XCDR1, LM_DEBUG, LM_ERROR, LM_WARNING, OpenDDS::DCPS::MUTABLE, OPENDDS_STRING, OpenDDS::DCPS::repr_to_encoding_kind(), OpenDDS::DCPS::repr_to_string(), DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

Referenced by enable().

3292 {
3293  bool xcdr1_mutable = false;
3294  bool illegal_unaligned = false;
3295  for (CORBA::ULong i = 0; i < qos_.representation.value.length(); ++i) {
3296  Encoding::Kind encoding_kind;
3297  if (repr_to_encoding_kind(qos_.representation.value[i], encoding_kind)) {
3298  if (encoding_kind == Encoding::KIND_XCDR1 && type_support_->max_extensibility() == MUTABLE) {
3299  xcdr1_mutable = true;
3300  } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR && cdr_encapsulation()) {
3301  illegal_unaligned = true;
3302  } else {
3303  decoding_modes_.insert(encoding_kind);
3304  }
3305  } else if (DCPS_debug_level) {
3306  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: "
3307  "DataReaderImpl::setup_deserialization: "
3308  "Encountered unsupported or unknown data representation: %C\n",
3309  repr_to_string(qos_.representation.value[i]).c_str()));
3310  }
3311  }
3312  if (decoding_modes_.empty()) {
3313  if (DCPS_debug_level) {
3314  DCPS::String error_message;
3315  if (xcdr1_mutable) {
3316  error_message = " Unsupported combination of XCDR1 and mutable";
3317  } else if (illegal_unaligned) {
3318  error_message = " Unaligned CDR is not allowed in rtps_udp transport";
3319  }
3320  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: "
3321  "DataReaderImpl::setup_deserialization: "
3322  "Could not find a valid data representation.%C\n",
3323  error_message.c_str()));
3324  }
3325  return DDS::RETCODE_ERROR;
3326  }
3327  if (DCPS_debug_level >= 2) {
3328  OPENDDS_STRING encodings;
3329  EncodingKinds::iterator it = decoding_modes_.begin();
3330  for (; it != decoding_modes_.end(); ++it) {
3331  if (!encodings.empty()) {
3332  encodings += ", ";
3333  }
3334  encodings += Encoding::kind_to_string(*it);
3335  }
3336  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataReaderImpl::setup_deserialization: "
3337  "Setup successfully with the following data representation%C: %C\n",
3338  encodings.size() != 1 ? "s" : "",
3339  encodings.c_str()));
3340  }
3341 
3342  return DDS::RETCODE_OK;
3343 }
#define ACE_DEBUG(X)
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
#define ACE_ERROR(X)
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
const ReturnCode_t RETCODE_OK
#define OPENDDS_STRING
DataRepresentationQosPolicy representation
ACE_CDR::ULong ULong
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
Definition: DCPS_Utils.cpp:473
const ReturnCode_t RETCODE_ERROR
DataRepresentationIdSeq value
TypeSupportImpl * type_support_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
std::string String
virtual Extensibility max_extensibility() const =0

◆ signal_liveliness()

void OpenDDS::DCPS::DataReaderImpl::signal_liveliness ( const GUID_t remote_participant)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 715 of file DataReaderImpl.cpp.

References ACE_GUARD, ACE_READ_GUARD, OpenDDS::DCPS::GUID_t::entityId, OpenDDS::DCPS::equal_guid_prefixes(), OpenDDS::DCPS::SubscriptionInstance::instance_state_, instances_, instances_lock_, OpenDDS::DCPS::InstanceState::lively(), OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OPENDDS_VECTOR(), sample_lock_, writers_, and writers_lock_.

716 {
717  GUID_t prefix = remote_participant;
718  prefix.entityId = EntityId_t();
719 
721 
722  typedef std::pair<GUID_t, WriterInfo_rch> RepoWriterPair;
723  typedef OPENDDS_VECTOR(RepoWriterPair) WriterSet;
724  WriterSet writers;
725 
726  {
728  for (WriterMapType::iterator pos = writers_.lower_bound(prefix),
729  limit = writers_.end();
730  pos != limit && equal_guid_prefixes(pos->first, prefix);
731  ++pos) {
732  writers.push_back(std::make_pair(pos->first, pos->second));
733  }
734  }
735 
737  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
738  pos != limit;
739  ++pos) {
740  pos->second->received_activity(when);
741  }
742 
743  if (!writers.empty()) {
744  ACE_GUARD(ACE_Recursive_Thread_Mutex, instance_guard, this->instances_lock_);
745  for (WriterSet::iterator pos = writers.begin(), limit = writers.end();
746  pos != limit;
747  ++pos) {
748  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
749  iter != instances_.end();
750  ++iter) {
751  SubscriptionInstance_rch ptr = iter->second;
752  ptr->instance_state_->lively(pos->first);
753  }
754  }
755  }
756 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex instances_lock_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
OpenDDS_Dcps_Export bool equal_guid_prefixes(const GuidPrefix_t &lhs, const GuidPrefix_t &rhs)
Definition: GuidUtils.h:132
const InstanceState_rch instance_state_
Instance state for this instance.
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec

◆ split_combined_states()

static void OpenDDS::DCPS::DataReaderImpl::split_combined_states ( CORBA::ULong  combined,
CORBA::ULong sample_states,
CORBA::ULong view_states,
CORBA::ULong instance_states 
)
inlinestaticprotected

Definition at line 682 of file DataReaderImpl.h.

683  {
687  }
static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT
static const CORBA::ULong MAX_VIEW_STATE_MASK
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT
static const CORBA::ULong MAX_INSTANCE_STATE_MASK
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
static const CORBA::ULong MAX_SAMPLE_STATE_MASK

◆ state_updated()

void OpenDDS::DCPS::DataReaderImpl::state_updated ( DDS::InstanceHandle_t  handle)

Definition at line 1976 of file DataReaderImpl.cpp.

References ACE_GUARD, sample_lock_, and state_updated_i().

1977 {
1979  state_updated_i(handle);
1980 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void state_updated_i(DDS::InstanceHandle_t handle)=0
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.

◆ state_updated_i()

virtual void OpenDDS::DCPS::DataReaderImpl::state_updated_i ( DDS::InstanceHandle_t  handle)
protectedpure virtual

◆ statistics_enabled() [1/2]

CORBA::Boolean OpenDDS::DCPS::DataReaderImpl::statistics_enabled ( )
virtual

Definition at line 2397 of file DataReaderImpl.cpp.

2398 {
2399  return statistics_enabled_;
2400 }
AtomicBool statistics_enabled_
Flag indicating status of statistics gathering.

◆ statistics_enabled() [2/2]

void OpenDDS::DCPS::DataReaderImpl::statistics_enabled ( CORBA::Boolean  statistics_enabled)
virtual

Definition at line 2403 of file DataReaderImpl.cpp.

2405 {
2407 }
AtomicBool statistics_enabled_
Flag indicating status of statistics gathering.
virtual CORBA::Boolean statistics_enabled()

◆ take()

virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::take ( AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
pure virtual

◆ time_based_filter_instance()

bool OpenDDS::DCPS::DataReaderImpl::time_based_filter_instance ( const SubscriptionInstance_rch instance,
MonotonicTimePoint now,
MonotonicTimePoint deadline 
)
protected

Definition at line 2699 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TimeDuration::is_zero(), OpenDDS::DCPS::SubscriptionInstance::last_accepted_, and OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now().

2702 {
2703  now = MonotonicTimePoint::now();
2704  const TimeDuration minimum_separation(qos_.time_based_filter.minimum_separation);
2705 
2706  // TIME_BASED_FILTER processing; expire data samples
2707  // if minimum separation is not met for instance.
2708  if (!minimum_separation.is_zero()) {
2709  if (now - instance->last_accepted_ < minimum_separation) {
2710  deadline = now + minimum_separation;
2711  return true; // Data filtered.
2712  }
2713  }
2714 
2715  instance->last_accepted_ = now;
2716 
2717  return false;
2718 }
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
TimeBasedFilterQosPolicy time_based_filter

◆ to_combined_states()

static CORBA::ULong OpenDDS::DCPS::DataReaderImpl::to_combined_states ( CORBA::ULong  sample_states,
CORBA::ULong  view_states,
CORBA::ULong  instance_states 
)
inlinestaticprotected

Definition at line 670 of file DataReaderImpl.h.

671  {
676  // catch-all for "bogus" lookups
677  return 0;
678  }
680  }
static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT
static const CORBA::ULong MAX_VIEW_STATE_MASK
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT
static const CORBA::ULong MAX_INSTANCE_STATE_MASK
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
static const CORBA::ULong MAX_SAMPLE_STATE_MASK

◆ total_samples()

CORBA::Long OpenDDS::DCPS::DataReaderImpl::total_samples ( ) const
protected

!!caller should have acquired sample_lock_

Definition at line 1800 of file DataReaderImpl.cpp.

References ACE_GUARD_RETURN, instances_, instances_lock_, OpenDDS::DCPS::SubscriptionInstance::rcvd_samples_, and OpenDDS::DCPS::ReceivedDataElementList::size().

1801 {
1802  //!!!caller should have acquired sample_lock_
1804 
1805  CORBA::Long count(0);
1806 
1807  for (SubscriptionInstanceMapType::iterator iter = instances_.begin();
1808  iter != instances_.end();
1809  ++iter) {
1810  SubscriptionInstance_rch ptr = iter->second;
1811 
1812  count += static_cast<CORBA::Long>(ptr->rcvd_samples_.size());
1813  }
1814 
1815  return count;
1816 }
ACE_CDR::Long Long
RcHandle< SubscriptionInstance > SubscriptionInstance_rch
ACE_Recursive_Thread_Mutex instances_lock_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.

◆ transport_assoc_done()

void OpenDDS::DCPS::DataReaderImpl::transport_assoc_done ( int  flags,
const GUID_t remote_id 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 355 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), ACE_WRITE_GUARD, OpenDDS::DCPS::TransportClient::ASSOC_OK, DDS::SubscriptionMatchedStatus::current_count, DDS::SubscriptionMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, get_guid(), is_bit_, CORBA::is_nil(), OpenDDS::DCPS::TimeDuration::is_zero(), DDS::SubscriptionMatchedStatus::last_publication_handle, listener_for(), OpenDDS::DCPS::WriterInfoListener::liveliness_lease_duration_, liveliness_timer_, LM_DEBUG, LM_ERROR, monitor_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), participant_servant_, publication_handle_lock_, publication_id_to_handle_map_, sample_lock_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), subscription_match_status_, DDS::SUBSCRIPTION_MATCHED_STATUS, DDS::SubscriptionMatchedStatus::total_count, DDS::SubscriptionMatchedStatus::total_count_change, writers_, and writers_lock_.

356 {
357  if (!(flags & ASSOC_OK)) {
358  if (DCPS_debug_level) {
359  ACE_ERROR((LM_ERROR,
360  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
361  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
362  LogGuid(remote_id).c_str()));
363  }
364  return;
365  }
366 
367  // LIVELINESS policy timers are managed here.
369  if (DCPS_debug_level >= 5) {
370  ACE_DEBUG((LM_DEBUG,
371  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
372  ACE_TEXT("starting/resetting liveliness timer for reader %C\n"),
373  LogGuid(get_guid()).c_str()));
374  }
375  // this call will start the timer if it is not already set
376  liveliness_timer_->check_liveliness();
377  }
378 
379  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
380 
381  if (!participant)
382  return;
383 
384  const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
385 
386  if (!is_bit_) {
387  // We acquire the publication_handle_lock_ for the remainder of our
388  // processing.
389  {
391 
392  // This insertion is idempotent.
393  publication_id_to_handle_map_.insert(RepoIdToHandleMap::value_type(remote_id, handle));
394 
395  if (DCPS_debug_level > 4) {
396  ACE_DEBUG((LM_DEBUG,
397  ACE_TEXT("(%P|%t) DataReaderImpl::transport_assoc_done: ")
398  ACE_TEXT("id_to_handle_map_[ %C] = 0x%x.\n"),
399  LogGuid(remote_id).c_str(),
400  handle));
401  }
402 
403  // We need to adjust these after the insertions have all completed
404  // since insertions are not guaranteed to increase the number of
405  // currently matched publications.
406  const int matchedPublications = static_cast<int>(publication_id_to_handle_map_.size());
408  matchedPublications - subscription_match_status_.current_count;
409  subscription_match_status_.current_count = matchedPublications;
410 
413 
415 
417 
418  DDS::DataReaderListener_var listener =
420 
421  if (!CORBA::is_nil(listener)) {
422  listener->on_subscription_matched(this, subscription_match_status_);
423 
424  // TBD - why does the spec say to change this but not change
425  // the ChangeFlagStatus after a listener call?
426 
427  // Client will look at it so next time it looks the change should be 0
430  }
431 
433  }
434 
435  {
438 
439  if (!writers_.count(remote_id)) {
440  return;
441  }
442  writers_[remote_id]->handle(handle);
443  }
444  }
445 
446  if (monitor_) {
447  monitor_->report();
448  }
449 }
#define ACE_DEBUG(X)
RcHandle< LivelinessTimer > liveliness_timer_
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RepoIdToHandleMap publication_id_to_handle_map_
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const StatusKind SUBSCRIPTION_MATCHED_STATUS
ACE_Recursive_Thread_Mutex publication_handle_lock_
DDS::SubscriptionMatchedStatus subscription_match_status_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
WeakRcHandle< DomainParticipantImpl > participant_servant_
TimeDuration liveliness_lease_duration_
Definition: WriterInfo.h:52
#define ACE_WRITE_GUARD(MUTEX, OBJ, LOCK)
Boolean is_nil(T x)

◆ transport_discovery_change()

void OpenDDS::DCPS::DataReaderImpl::transport_discovery_change ( )
virtual

Reimplemented from OpenDDS::DCPS::TransportReceiveListener.

Definition at line 3483 of file DataReaderImpl.cpp.

References TheServiceParticipant.

3484 {
3486  const TransportLocatorSeq& trans_conf_info = connection_info();
3487  const GUID_t dp_id_copy = dp_id_;
3488  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3489  disco->update_subscription_locators(domain_id_,
3490  dp_id_copy,
3491  get_guid(),
3492  trans_conf_info);
3493 }
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
sequence< TransportLocator > TransportLocatorSeq
const TransportLocatorSeq & connection_info() const
#define TheServiceParticipant

◆ unregister_for_writer()

void OpenDDS::DCPS::DataReaderImpl::unregister_for_writer ( const GUID_t participant,
const GUID_t readerid,
const GUID_t writerid 
)
virtual

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3264 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_writer().

3267 {
3268  TransportClient::unregister_for_writer(participant, readerid, writerid);
3269 }
void unregister_for_writer(const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)

◆ update_incompatible_qos()

void OpenDDS::DCPS::DataReaderImpl::update_incompatible_qos ( const IncompatibleQosStatus status)
virtual

Implements OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 679 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::RequestedIncompatibleQosStatus::last_policy_id, listener_for(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::RequestedIncompatibleQosStatus::policies, DDS::REQUESTED_INCOMPATIBLE_QOS_STATUS, requested_incompatible_qos_status_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::RequestedIncompatibleQosStatus::total_count, and DDS::RequestedIncompatibleQosStatus::total_count_change.

680 {
681  DDS::DataReaderListener_var listener =
683 
684  if (this->requested_incompatible_qos_status_.total_count == status.total_count) {
685  // This test should make the method idempotent.
686  return;
687  }
688 
690  true);
691 
692  // copy status and increment change
693  requested_incompatible_qos_status_.total_count = status.total_count;
695  status.count_since_last_send;
697  status.last_policy_id;
699 
700  if (!CORBA::is_nil(listener.in())) {
701  listener->on_requested_incompatible_qos(this, requested_incompatible_qos_status_);
702 
703  // TBD - why does the spec say to change total_count_change but not
704  // change the ChangeFlagStatus after a listener call?
705 
706  // client just looked at it so next time it looks the
707  // change should be 0
709  }
710 
712 }
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind REQUESTED_INCOMPATIBLE_QOS_STATUS
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
DDS::RequestedIncompatibleQosStatus requested_incompatible_qos_status_
Boolean is_nil(T x)

◆ update_locators()

void OpenDDS::DCPS::DataReaderImpl::update_locators ( const GUID_t remote,
const TransportLocatorSeq locators 
)
virtual

Reimplemented from OpenDDS::DCPS::DataReaderCallbacks.

Definition at line 3272 of file DataReaderImpl.cpp.

References ACE_READ_GUARD, and OpenDDS::DCPS::TransportClient::update_locators().

3274 {
3275  {
3277  WriterMapType::const_iterator iter = writers_.find(writerId);
3278  if (iter == writers_.end()) {
3279  return;
3280  }
3281  }
3282  TransportClient::update_locators(writerId, locators);
3283 }
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.

◆ update_lookup_maps()

void OpenDDS::DCPS::DataReaderImpl::update_lookup_maps ( const SubscriptionInstanceMapType::iterator &  input)
protected

Definition at line 3547 of file DataReaderImpl.cpp.

References instance_states, sample_states, and view_states.

3548 {
3549  for (LookupMap::iterator it = combined_state_lookup_.begin(); it != combined_state_lookup_.end(); ++it) {
3550  if (it->first == 0) continue;
3552  split_combined_states(it->first, sample_states, view_states, instance_states);
3553  if (input->second->matches(sample_states, view_states, instance_states)) {
3554  it->second.insert(input->first);
3555  } else {
3556  it->second.erase(input->first);
3557  }
3558  }
3559 }
static void split_combined_states(CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
ACE_CDR::ULong ULong
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ update_ownership_strength()

void OpenDDS::DCPS::DataReaderImpl::update_ownership_strength ( const GUID_t pub_id,
const CORBA::Long ownership_strength 
)

Definition at line 2849 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_READ_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, and LM_DEBUG.

2851 {
2853  read_guard,
2854  this->writers_lock_);
2855  for (WriterMapType::iterator iter = writers_.begin();
2856  iter != writers_.end();
2857  ++iter) {
2858  if (iter->second->writer_id() == pub_id) {
2859  if (ownership_strength != iter->second->writer_qos_ownership_strength()) {
2860  if (DCPS_debug_level >= 1) {
2861  ACE_DEBUG((LM_DEBUG,
2862  ACE_TEXT("(%P|%t) DataReaderImpl::update_ownership_strength - ")
2863  ACE_TEXT("local %C update remote %C strength from %d to %d\n"),
2864  LogGuid(get_guid()).c_str(),
2865  LogGuid(pub_id).c_str(),
2866  iter->second->writer_qos_ownership_strength(), ownership_strength));
2867  }
2868  iter->second->writer_qos_ownership_strength(ownership_strength);
2869  iter->second->clear_owner_evaluated();
2870  }
2871  break;
2872  }
2873  }
2874 }
#define ACE_DEBUG(X)
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ update_subscription_params()

void OpenDDS::DCPS::DataReaderImpl::update_subscription_params ( const DDS::StringSeq params) const

Definition at line 3152 of file DataReaderImpl.cpp.

References TheServiceParticipant.

3153 {
3154  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
3155  disco->update_subscription_params(domain_id_,
3156  dp_id_,
3158  params);
3159 }
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
#define TheServiceParticipant

◆ verify_coherent_changes_completion()

bool OpenDDS::DCPS::DataReaderImpl::verify_coherent_changes_completion ( WriterInfo writer)
private

Definition at line 2878 of file DataReaderImpl.cpp.

References OpenDDS::DCPS::WriterInfo::coherent_change_received(), OpenDDS::DCPS::COMPLETED, OpenDDS::DCPS::WriterInfo::group_coherent(), DDS::INSTANCE_PRESENTATION_QOS, OpenDDS::DCPS::NOT_COMPLETED_YET, OpenDDS::DCPS::WriterInfo::publisher_id(), OpenDDS::DCPS::REJECTED, OpenDDS::DCPS::WriterInfo::reset_coherent_info(), state, and OpenDDS::DCPS::WriterInfo::writer_id().

Referenced by data_received().

2879 {
2881  bool accept_here = true;
2882 
2883  const GUID_t writer_id = writer->writer_id();
2884  const GUID_t publisher_id = writer->publisher_id();
2885 
2888  // verify current coherent changes from single writer
2889  state = writer->coherent_change_received();
2890  if (writer->group_coherent()) { // GROUP coherent any state
2891  RcHandle<SubscriberImpl> subscriber = get_subscriber_servant();
2892  if (subscriber && state != NOT_COMPLETED_YET) {
2893  // verify if all readers received complete coherent changes in a group.
2894  subscriber->coherent_change_received(publisher_id, this, state);
2895  accept_here = false; // coherent_change_received does that itself
2896  }
2897  } else if (state != NOT_COMPLETED_YET) { // TOPIC coherent with final state
2898  if (state == REJECTED) {
2899  reject_coherent(writer_id, publisher_id);
2900  }
2901  writer->reset_coherent_info();
2902  }
2903  }
2904 
2905  if (state == COMPLETED && accept_here) {
2906  accept_coherent(writer_id, publisher_id);
2908  }
2909 
2910  return state == COMPLETED;
2911 }
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
void accept_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
void coherent_changes_completed(DataReaderImpl *reader)
void reject_coherent(const GUID_t &writer_id, const GUID_t &publisher_id)
PresentationQosPolicy presentation
RcHandle< SubscriberImpl > get_subscriber_servant()
PresentationQosPolicyAccessScopeKind access_scope

◆ wait_for_historical_data()

DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl::wait_for_historical_data ( const DDS::Duration_t max_wait)
virtual

Definition at line 1043 of file DataReaderImpl.cpp.

References DDS::RETCODE_OK.

1045 {
1046  // Add your implementation here
1047  return DDS::RETCODE_OK;
1048 }
const ReturnCode_t RETCODE_OK

◆ writer_activity()

void OpenDDS::DCPS::DataReaderImpl::writer_activity ( const DataSampleHeader header)

update liveliness info for this writer.

Definition at line 1323 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_READ_GUARD, ACE_TEXT(), OpenDDS::DCPS::WriterInfo::add_coherent_samples(), OpenDDS::DCPS::LogGuid::c_str(), OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, get_guid(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::RcHandle< T >::is_nil(), LM_DEBUG, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), OpenDDS::DCPS::DataSampleHeader::publication_id_, OpenDDS::DCPS::WriterInfo::received_activity(), OpenDDS::DCPS::SAMPLE_DATA, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::UNREGISTER_INSTANCE, writers_, and writers_lock_.

Referenced by data_received().

1324 {
1325  // caller should have the sample_lock_ !!!
1326 
1327  WriterInfo_rch writer;
1328 
1329  // The received_activity() has to be called outside the writers_lock_
1330  // because it probably acquire writers_lock_ read lock recursively
1331  // (in handle_timeout). This could cause deadlock when there are writers
1332  // waiting.
1333  {
1334  ACE_READ_GUARD(ACE_RW_Thread_Mutex, read_guard, this->writers_lock_);
1335  WriterMapType::iterator iter = writers_.find(header.publication_id_);
1336 
1337  if (iter != writers_.end()) {
1338  writer = iter->second;
1339 
1340  } else if (DCPS_debug_level > 4) {
1341  // This may not be an error since it could happen that the sample
1342  // is delivered to the datareader after the write is dis-associated
1343  // with this datareader.
1344  ACE_DEBUG((LM_DEBUG,
1345  ACE_TEXT("(%P|%t) DataReaderImpl::writer_activity: ")
1346  ACE_TEXT("reader %C is not associated with writer %C.\n"),
1347  LogGuid(get_guid()).c_str(),
1348  LogGuid(header.publication_id_).c_str()));
1349  }
1350  }
1351 
1352  if (!writer.is_nil()) {
1353  writer->received_activity(MonotonicTimePoint::now());
1354 
1355  if ((header.message_id_ == SAMPLE_DATA) ||
1356  (header.message_id_ == INSTANCE_REGISTRATION) ||
1357  (header.message_id_ == UNREGISTER_INSTANCE) ||
1358  (header.message_id_ == DISPOSE_INSTANCE) ||
1359  (header.message_id_ == DISPOSE_UNREGISTER_INSTANCE)) {
1360 
1361 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1362  if (header.coherent_change_) {
1363  writer->add_coherent_samples(header.sequence_);
1364  }
1365 #endif
1366  }
1367  }
1368 }
#define ACE_DEBUG(X)
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
#define ACE_READ_GUARD(MUTEX, OBJ, LOCK)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
ACE_RW_Thread_Mutex writers_lock_
RW lock for reading/writing publications.
RcHandle< WriterInfo > WriterInfo_rch
Definition: WriterInfo.h:275
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")

◆ writer_became_alive()

void OpenDDS::DCPS::DataReaderImpl::writer_became_alive ( WriterInfo info,
const MonotonicTimePoint when 
)
virtual

tell instances when a DataWriter transitions to being alive The writer state is inout parameter, it has to be set ALIVE before handle_timeout is called since some subroutine use the state.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2083 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle(), DDS::LIVELINESS_CHANGED_STATUS, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::WriterInfo::state(), and OpenDDS::DCPS::WriterInfo::writer_id().

2084 {
2085  const GUID_t info_writer_id = info.writer_id();
2086 
2087  if (DCPS_debug_level >= 5) {
2088  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_alive: ")
2089  ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2090  LogGuid(get_guid()).c_str(),
2091  LogGuid(info_writer_id).c_str(),
2092  info.get_state_str()));
2093  }
2094 
2095  // NOTE: each instance will change to ALIVE_STATE when they receive a sample
2096 
2097  const WriterInfo::WriterState info_state = info.state();
2098 
2099  {
2100  bool liveliness_changed = false;
2101 
2103 
2104  if (info_state != WriterInfo::ALIVE) {
2107  liveliness_changed = true;
2108  }
2109 
2110  if (info_state == WriterInfo::DEAD) {
2113  }
2114 
2116  ACE_ERROR((LM_ERROR,
2117  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2118  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2120  return;
2121  }
2122 
2124  ACE_ERROR((LM_ERROR,
2125  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_alive: ")
2126  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2128  return;
2129  }
2130 
2132 
2133  // Change the state to ALIVE since handle_timeout may call writer_became_dead
2134  // which need the current state info.
2135  info.state(WriterInfo::ALIVE);
2136 
2137  if (this->monitor_) {
2138  this->monitor_->report();
2139  }
2140 
2141  // Call listener only when there are liveliness status changes.
2142  if (liveliness_changed) {
2144  this->notify_liveliness_change();
2145  }
2146  }
2147 
2148  // this call will start the liveliness timer if it is not already set
2149  liveliness_timer_->check_liveliness();
2150 }
#define ACE_DEBUG(X)
RcHandle< LivelinessTimer > liveliness_timer_
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::LivelinessChangedStatus liveliness_changed_status_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
const StatusKind LIVELINESS_CHANGED_STATUS

◆ writer_became_dead()

void OpenDDS::DCPS::DataReaderImpl::writer_became_dead ( WriterInfo info)
virtual

tell instances when a DataWriter transitions to DEAD The writer state is inout parameter, the state is set to DEAD when it returns.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2153 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::get_state_str(), OpenDDS::DCPS::WriterInfo::handle(), DDS::HANDLE_NIL, DDS::LIVELINESS_CHANGED_STATUS, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::WriterInfo::state(), and OpenDDS::DCPS::WriterInfo::writer_id().

2154 {
2155  const GUID_t info_writer_id = info.writer_id();
2156 
2157  if (DCPS_debug_level >= 5) {
2158  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl::writer_became_dead: ")
2159  ACE_TEXT("reader %C from writer %C previous state %C.\n"),
2160  LogGuid(get_guid()).c_str(),
2161  LogGuid(info_writer_id).c_str(),
2162  info.get_state_str()));
2163  }
2164 
2165 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2166  OwnershipManagerPtr owner_manager = this->ownership_manager();
2167  if (owner_manager) {
2168  owner_manager->remove_writer(info_writer_id);
2169  info.clear_owner_evaluated();
2170  }
2171 #endif
2172 
2173  bool liveliness_changed = false;
2174 
2175  const WriterInfo::WriterState info_state = info.state();
2176 
2177  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2178  {
2180  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2181  if (pos != publication_id_to_handle_map_.end()) {
2182  publication_handle = pos->second;
2183  }
2184  }
2185 
2186  {
2188 
2189  if (info_state != WriterInfo::DEAD) {
2192  liveliness_changed = true;
2193  }
2194 
2195  if (info_state == WriterInfo::ALIVE) {
2198  }
2199 
2201  ACE_ERROR((LM_ERROR,
2202  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2203  ACE_TEXT("invalid liveliness_changed_status alive count - %d.\n"),
2205  return;
2206  }
2207 
2209  ACE_ERROR((LM_ERROR,
2210  ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::writer_became_dead: ")
2211  ACE_TEXT("invalid liveliness_changed_status not alive count - %d.\n"),
2213  return;
2214  }
2215 
2217 
2218  info.state(WriterInfo::DEAD);
2219 
2220  if (this->monitor_) {
2221  this->monitor_->report();
2222  }
2223 
2224  instances_liveliness_update(info_writer_id, publication_handle);
2225 
2226  // Call listener only when there are liveliness status changes.
2227  if (liveliness_changed) {
2229  this->notify_liveliness_change();
2230  }
2231  }
2232 }
OwnershipManagerPtr ownership_manager()
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void instances_liveliness_update(const GUID_t &writer, DDS::InstanceHandle_t publication_handle)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RepoIdToHandleMap publication_id_to_handle_map_
const InstanceHandle_t HANDLE_NIL
DDS::LivelinessChangedStatus liveliness_changed_status_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex publication_handle_lock_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const StatusKind LIVELINESS_CHANGED_STATUS

◆ writer_removed()

void OpenDDS::DCPS::DataReaderImpl::writer_removed ( WriterInfo info)
virtual

tell instance when a DataWriter is removed. The liveliness status need update.

Reimplemented from OpenDDS::DCPS::WriterInfoListener.

Definition at line 2024 of file DataReaderImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::WriterInfo::ALIVE, OpenDDS::DCPS::WriterInfo::clear_owner_evaluated(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::WriterInfo::DEAD, OpenDDS::DCPS::WriterInfo::handle(), DDS::HANDLE_NIL, DDS::LIVELINESS_CHANGED_STATUS, LM_DEBUG, OpenDDS::DCPS::OwnershipManager::remove_writer(), OpenDDS::DCPS::WriterInfo::state(), and OpenDDS::DCPS::WriterInfo::writer_id().

2025 {
2026  const GUID_t info_writer_id = info.writer_id();
2027 
2028  if (DCPS_debug_level >= 5) {
2029  ACE_DEBUG((LM_DEBUG,
2030  ACE_TEXT("(%P|%t) DataReaderImpl::writer_removed: ")
2031  ACE_TEXT("reader %C from writer %C.\n"),
2032  LogGuid(get_guid()).c_str(),
2033  LogGuid(info_writer_id).c_str()));
2034  }
2035 
2036 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
2037  OwnershipManagerPtr owner_manager = this->ownership_manager();
2038  if (owner_manager) {
2039  owner_manager->remove_writer(info_writer_id);
2040  info.clear_owner_evaluated();
2041  }
2042 #endif
2043 
2044  {
2045  DDS::InstanceHandle_t publication_handle = DDS::HANDLE_NIL;
2046  {
2048  RepoIdToHandleMap::const_iterator pos = publication_id_to_handle_map_.find(info_writer_id);
2049  if (pos != publication_id_to_handle_map_.end()) {
2050  publication_handle = pos->second;
2051  }
2052  }
2053 
2054  bool liveliness_changed = false;
2055 
2057 
2058  const WriterInfo::WriterState info_state = info.state();
2059 
2060  if (info_state == WriterInfo::ALIVE) {
2063  liveliness_changed = true;
2064  }
2065 
2066  if (info_state == WriterInfo::DEAD) {
2069  liveliness_changed = true;
2070  }
2071 
2073  instances_liveliness_update(info_writer_id, publication_handle);
2074 
2075  if (liveliness_changed) {
2077  this->notify_liveliness_change();
2078  }
2079  }
2080 }
OwnershipManagerPtr ownership_manager()
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void instances_liveliness_update(const GUID_t &writer, DDS::InstanceHandle_t publication_handle)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
RepoIdToHandleMap publication_id_to_handle_map_
const InstanceHandle_t HANDLE_NIL
DDS::LivelinessChangedStatus liveliness_changed_status_
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex publication_handle_lock_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const StatusKind LIVELINESS_CHANGED_STATUS

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 870 of file DataReaderImpl.h.

◆ EndHistoricSamplesMissedSweeper

friend class EndHistoricSamplesMissedSweeper
friend

Definition at line 868 of file DataReaderImpl.h.

◆ InstanceState

friend class InstanceState
friend

Definition at line 867 of file DataReaderImpl.h.

◆ OwnershipManagerPtr

friend class OwnershipManagerPtr
friend

Definition at line 481 of file DataReaderImpl.h.

◆ QueryConditionImpl

friend class QueryConditionImpl
friend

Definition at line 216 of file DataReaderImpl.h.

Referenced by create_querycondition().

◆ RequestedDeadlineWatchdog

friend class RequestedDeadlineWatchdog
friend

Definition at line 215 of file DataReaderImpl.h.

◆ SubscriberImpl

friend class SubscriberImpl
friend

Definition at line 217 of file DataReaderImpl.h.

Member Data Documentation

◆ always_get_history_

bool OpenDDS::DCPS::DataReaderImpl::always_get_history_
private

Definition at line 1013 of file DataReaderImpl.h.

◆ budget_exceeded_status_

BudgetExceededStatus OpenDDS::DCPS::DataReaderImpl::budget_exceeded_status_
private

Definition at line 900 of file DataReaderImpl.h.

Referenced by DataReaderImpl().

◆ coherent_

bool OpenDDS::DCPS::DataReaderImpl::coherent_
protected

Is accessing to Group coherent changes ?

Definition at line 805 of file DataReaderImpl.h.

◆ COMBINED_SAMPLE_STATE_SHIFT

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS
staticprotected

Definition at line 665 of file DataReaderImpl.h.

◆ combined_state_lookup_

LookupMap OpenDDS::DCPS::DataReaderImpl::combined_state_lookup_
protected

Definition at line 694 of file DataReaderImpl.h.

◆ COMBINED_VIEW_STATE_SHIFT

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS
staticprotected

Definition at line 664 of file DataReaderImpl.h.

◆ content_filtered_topic_

TopicDescriptionPtr<ContentFilteredTopicImpl> OpenDDS::DCPS::DataReaderImpl::content_filtered_topic_
protected

Definition at line 797 of file DataReaderImpl.h.

Referenced by cleanup(), enable(), and get_topicdescription().

◆ content_filtered_topic_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::content_filtered_topic_mutex_
mutableprotected

Definition at line 796 of file DataReaderImpl.h.

Referenced by cleanup(), enable(), and get_topicdescription().

◆ deadline_period_

TimeDuration OpenDDS::DCPS::DataReaderImpl::deadline_period_
private

Watchdog responsible for reporting missed offered deadlines.

Definition at line 990 of file DataReaderImpl.h.

Referenced by enable(), and qos_change().

◆ deadline_queue_

DeadlineQueue OpenDDS::DCPS::DataReaderImpl::deadline_queue_
private

Definition at line 992 of file DataReaderImpl.h.

◆ deadline_queue_enabled_

bool OpenDDS::DCPS::DataReaderImpl::deadline_queue_enabled_
private

Definition at line 993 of file DataReaderImpl.h.

Referenced by data_received(), enable(), and qos_change().

◆ deadline_task_

RcHandle<DRISporadicTask> OpenDDS::DCPS::DataReaderImpl::deadline_task_
private

Definition at line 995 of file DataReaderImpl.h.

Referenced by ~DataReaderImpl().

◆ decoding_modes_

EncodingKinds OpenDDS::DCPS::DataReaderImpl::decoding_modes_
protected

Definition at line 1051 of file DataReaderImpl.h.

◆ depth_

CORBA::Long OpenDDS::DCPS::DataReaderImpl::depth_
private

Definition at line 884 of file DataReaderImpl.h.

Referenced by enable().

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::DataReaderImpl::domain_id_
private

Definition at line 876 of file DataReaderImpl.h.

Referenced by enable(), init(), and set_qos().

◆ dp_id_

GUID_t OpenDDS::DCPS::DataReaderImpl::dp_id_
private

Definition at line 877 of file DataReaderImpl.h.

Referenced by enable(), and set_qos().

◆ dynamic_type_

DDS::DynamicType_var OpenDDS::DCPS::DataReaderImpl::dynamic_type_
protected

Definition at line 1105 of file DataReaderImpl.h.

Referenced by enable().

◆ end_historic_sweeper_

RcHandle<EndHistoricSamplesMissedSweeper> OpenDDS::DCPS::DataReaderImpl::end_historic_sweeper_
private

Definition at line 882 of file DataReaderImpl.h.

Referenced by remove_associations_i().

◆ group_coherent_ordered_data_

GroupRakeData OpenDDS::DCPS::DataReaderImpl::group_coherent_ordered_data_
protected

Ordered group samples.

Definition at line 808 of file DataReaderImpl.h.

◆ has_subscription_id_

bool OpenDDS::DCPS::DataReaderImpl::has_subscription_id_
protected

Definition at line 767 of file DataReaderImpl.h.

Referenced by add_association(), and enable().

◆ instances_

SubscriptionInstanceMapType OpenDDS::DCPS::DataReaderImpl::instances_
mutableprotected

: document why the instances_ container is mutable.

Definition at line 738 of file DataReaderImpl.h.

Referenced by data_received(), release_instance(), signal_liveliness(), and total_samples().

◆ instances_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::instances_lock_
mutableprotected

Assume since the container is mutable(?!!?) it may need to use the lock while const. : remove the recursive nature of the instances_lock if not needed.

Definition at line 743 of file DataReaderImpl.h.

Referenced by contains_sample(), data_received(), have_instance_states(), have_sample_states(), have_view_states(), release_instance(), signal_liveliness(), and total_samples().

◆ is_bit_

bool OpenDDS::DCPS::DataReaderImpl::is_bit_
private

Flag indicates that this datareader is a builtin topic datareader.

Definition at line 1011 of file DataReaderImpl.h.

Referenced by add_association(), init(), remove_associations(), remove_associations_i(), and transport_assoc_done().

◆ is_exclusive_ownership_

bool OpenDDS::DCPS::DataReaderImpl::is_exclusive_ownership_
protected

Definition at line 791 of file DataReaderImpl.h.

Referenced by data_received(), and init().

◆ last_deadline_missed_total_count_

CORBA::Long OpenDDS::DCPS::DataReaderImpl::last_deadline_missed_total_count_
private

Definition at line 987 of file DataReaderImpl.h.

Referenced by get_requested_deadline_missed_status().

◆ listener_

DDS::DataReaderListener_var OpenDDS::DCPS::DataReaderImpl::listener_
private

Definition at line 875 of file DataReaderImpl.h.

Referenced by get_ext_listener(), get_listener(), listener_for(), and set_listener().

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::DataReaderImpl::listener_mask_
private

Definition at line 874 of file DataReaderImpl.h.

Referenced by listener_for(), and set_listener().

◆ listener_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::listener_mutex_
private

Definition at line 873 of file DataReaderImpl.h.

Referenced by get_ext_listener(), get_listener(), listener_for(), and set_listener().

◆ liveliness_changed_status_

DDS::LivelinessChangedStatus OpenDDS::DCPS::DataReaderImpl::liveliness_changed_status_
private

Definition at line 894 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_liveliness_changed_status().

◆ liveliness_timer_

RcHandle<LivelinessTimer> OpenDDS::DCPS::DataReaderImpl::liveliness_timer_
private

Definition at line 985 of file DataReaderImpl.h.

Referenced by transport_assoc_done().

◆ MAX_INSTANCE_STATE_BITS

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_INSTANCE_STATE_BITS = 3u
staticprotected

Definition at line 661 of file DataReaderImpl.h.

◆ MAX_INSTANCE_STATE_FLAG

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
staticprotected

Definition at line 659 of file DataReaderImpl.h.

◆ MAX_INSTANCE_STATE_MASK

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1
staticprotected

Definition at line 660 of file DataReaderImpl.h.

◆ MAX_SAMPLE_STATE_BITS

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_SAMPLE_STATE_BITS = 2u
staticprotected

Definition at line 651 of file DataReaderImpl.h.

◆ MAX_SAMPLE_STATE_FLAG

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE
staticprotected

Definition at line 649 of file DataReaderImpl.h.

◆ MAX_SAMPLE_STATE_MASK

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1
staticprotected

Definition at line 650 of file DataReaderImpl.h.

◆ MAX_VIEW_STATE_BITS

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_VIEW_STATE_BITS = 2u
staticprotected

Definition at line 656 of file DataReaderImpl.h.

◆ MAX_VIEW_STATE_FLAG

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE
staticprotected

Definition at line 654 of file DataReaderImpl.h.

◆ MAX_VIEW_STATE_MASK

const CORBA::ULong OpenDDS::DCPS::DataReaderImpl::MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1
staticprotected

Definition at line 655 of file DataReaderImpl.h.

◆ mb_alloc_

TransportMessageBlockAllocator OpenDDS::DCPS::DataReaderImpl::mb_alloc_
protected

Definition at line 1108 of file DataReaderImpl.h.

Referenced by data_received().

◆ monitor_

unique_ptr<Monitor> OpenDDS::DCPS::DataReaderImpl::monitor_
private

Monitor object for this entity.

Definition at line 1042 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), enable(), release_instance(), remove_associations_i(), and transport_assoc_done().

◆ multi_topic_

TopicDescriptionPtr<MultiTopicImpl> OpenDDS::DCPS::DataReaderImpl::multi_topic_
protected

Definition at line 801 of file DataReaderImpl.h.

Referenced by cleanup().

◆ n_chunks_

size_t OpenDDS::DCPS::DataReaderImpl::n_chunks_
private

Definition at line 885 of file DataReaderImpl.h.

Referenced by enable().

◆ participant_servant_

WeakRcHandle<DomainParticipantImpl> OpenDDS::DCPS::DataReaderImpl::participant_servant_
protected

◆ passed_qos_

DDS::DataReaderQos OpenDDS::DCPS::DataReaderImpl::passed_qos_
protected

Definition at line 773 of file DataReaderImpl.h.

Referenced by get_qos(), init(), and set_qos().

◆ periodic_monitor_

unique_ptr<Monitor> OpenDDS::DCPS::DataReaderImpl::periodic_monitor_
private

Periodic Monitor object for this entity.

Definition at line 1045 of file DataReaderImpl.h.

Referenced by DataReaderImpl().

◆ publication_handle_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::publication_handle_lock_
private

◆ publication_id_to_handle_map_

RepoIdToHandleMap OpenDDS::DCPS::DataReaderImpl::publication_id_to_handle_map_
private

◆ qos_

DDS::DataReaderQos OpenDDS::DCPS::DataReaderImpl::qos_
protected

◆ raw_latency_buffer_size_

unsigned int OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_size_
private

Bound (or initial reservation) of raw latency buffer.

Definition at line 1032 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_size().

◆ raw_latency_buffer_type_

DataCollector<double>::OnFull OpenDDS::DCPS::DataReaderImpl::raw_latency_buffer_type_
private

Type of raw latency data buffer.

Definition at line 1035 of file DataReaderImpl.h.

Referenced by add_association(), and raw_latency_buffer_type().

◆ rd_allocator_

unique_ptr<ReceivedDataAllocator> OpenDDS::DCPS::DataReaderImpl::rd_allocator_
protected

Definition at line 771 of file DataReaderImpl.h.

Referenced by enable().

◆ reactor_

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataReaderImpl::reactor_
private

The orb's reactor to be used to register the liveliness timer.

Definition at line 916 of file DataReaderImpl.h.

Referenced by DataReaderImpl().

◆ read_conditions_

ReadConditionSet OpenDDS::DCPS::DataReaderImpl::read_conditions_
private

◆ requested_deadline_missed_status_

DDS::RequestedDeadlineMissedStatus OpenDDS::DCPS::DataReaderImpl::requested_deadline_missed_status_
private

Definition at line 895 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_requested_deadline_missed_status().

◆ requested_incompatible_qos_status_

DDS::RequestedIncompatibleQosStatus OpenDDS::DCPS::DataReaderImpl::requested_incompatible_qos_status_
private

◆ reverse_sample_lock_

Reverse_Lock_t OpenDDS::DCPS::DataReaderImpl::reverse_sample_lock_
protected

Definition at line 783 of file DataReaderImpl.h.

Referenced by notify_read_conditions().

◆ sample_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::sample_lock_
protected

◆ sample_lost_status_

DDS::SampleLostStatus OpenDDS::DCPS::DataReaderImpl::sample_lost_status_
protected

Definition at line 777 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_sample_lost_status().

◆ sample_rejected_status_

DDS::SampleRejectedStatus OpenDDS::DCPS::DataReaderImpl::sample_rejected_status_
protected

Definition at line 776 of file DataReaderImpl.h.

Referenced by DataReaderImpl(), and get_sample_rejected_status().

◆ security_config_

Security::SecurityConfig_rch OpenDDS::DCPS::DataReaderImpl::security_config_
protected

Definition at line 1104 of file DataReaderImpl.h.

Referenced by enable().

◆ statistics_

StatsMapType OpenDDS::DCPS::DataReaderImpl::statistics_
private

Statistics for this reader, collected for each writer.

Definition at line 1028 of file DataReaderImpl.h.

Referenced by add_association(), raw_latency_statistics(), and remove_associations().

◆ statistics_enabled_

AtomicBool OpenDDS::DCPS::DataReaderImpl::statistics_enabled_
private

Flag indicating status of statistics gathering.

Definition at line 1016 of file DataReaderImpl.h.

◆ statistics_lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::statistics_lock_
private

Definition at line 1029 of file DataReaderImpl.h.

Referenced by add_association(), and remove_associations().

◆ subqos_

DDS::SubscriberQos OpenDDS::DCPS::DataReaderImpl::subqos_
protected

Definition at line 810 of file DataReaderImpl.h.

◆ subscriber_servant_

WeakRcHandle<SubscriberImpl> OpenDDS::DCPS::DataReaderImpl::subscriber_servant_
private

Definition at line 881 of file DataReaderImpl.h.

Referenced by get_subscriber_servant(), init(), and parent().

◆ subscription_id_condition_

ConditionVariable<ACE_Thread_Mutex> OpenDDS::DCPS::DataReaderImpl::subscription_id_condition_
mutableprotected

Definition at line 769 of file DataReaderImpl.h.

Referenced by add_association(), and enable().

◆ subscription_id_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::subscription_id_mutex_
mutableprotected

Definition at line 768 of file DataReaderImpl.h.

Referenced by add_association(), and enable().

◆ subscription_lost_status_

SubscriptionLostStatus OpenDDS::DCPS::DataReaderImpl::subscription_lost_status_
private
Todo:
The subscription_lost_status_ and subscription_reconnecting_status_ are left here for future use when we add get_subscription_lost_status() and get_subscription_reconnecting_status() methods.

Definition at line 909 of file DataReaderImpl.h.

◆ subscription_match_status_

DDS::SubscriptionMatchedStatus OpenDDS::DCPS::DataReaderImpl::subscription_match_status_
private

◆ topic_desc_

DDS::TopicDescription_var OpenDDS::DCPS::DataReaderImpl::topic_desc_
private

Definition at line 872 of file DataReaderImpl.h.

Referenced by get_topicdescription(), and init().

◆ topic_id_

GUID_t OpenDDS::DCPS::DataReaderImpl::topic_id_
protected

Definition at line 788 of file DataReaderImpl.h.

Referenced by init().

◆ topic_servant_

TopicDescriptionPtr<TopicImpl> OpenDDS::DCPS::DataReaderImpl::topic_servant_
protected

Definition at line 786 of file DataReaderImpl.h.

Referenced by cleanup(), enable(), and init().

◆ transport_disabled_

bool OpenDDS::DCPS::DataReaderImpl::transport_disabled_
private

Definition at line 1047 of file DataReaderImpl.h.

Referenced by disable_transport(), and enable().

◆ type_support_

TypeSupportImpl* OpenDDS::DCPS::DataReaderImpl::type_support_
protected

Definition at line 787 of file DataReaderImpl.h.

Referenced by init().

◆ writers_

WriterMapType OpenDDS::DCPS::DataReaderImpl::writers_
private

◆ writers_lock_

ACE_RW_Thread_Mutex OpenDDS::DCPS::DataReaderImpl::writers_lock_
private

The documentation for this class was generated from the following files: