OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Types | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::DataReaderImpl_T< MessageType > Class Template Reference

#include <DataReaderImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Collaboration graph
[legend]

Classes

struct  FilterDelayedSample
 
struct  MessageTypeMemoryBlock
 
class  MessageTypeWithAllocator
 
class  SharedInstanceMap
 

Public Types

typedef DDSTraits< MessageType > TraitsType
 
typedef MarshalTraits< MessageType > MarshalTraitsType
 
typedef TraitsType::MessageSequenceType MessageSequenceType
 
typedef RcHandle< SharedInstanceMapSharedInstanceMap_rch
 
typedef TraitsType::DataReaderType Interface
 
typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_MutexDataAllocator
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDSTraits< MessageType >::DataReaderType >
typedef DDSTraits< MessageType >::DataReaderType ::_ptr_type _ptr_type
 
typedef DDSTraits< MessageType >::DataReaderType ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::DataReaderImpl
typedef std::pair< GUID_t, WriterInfo::WriterStateWriterStatePair
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
typedef DataReaderEx ::_ptr_type _ptr_type
 
typedef DataReaderEx ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

typedef OPENDDS_MAP_CMP_T (MessageType, DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 
typedef OPENDDS_MAP (DDS::InstanceHandle_t, typename InstanceMap::iterator) ReverseInstanceMap
 
CORBA::Boolean _is_a (const char *type_id)
 
const char * _interface_repository_id () const
 
CORBA::Boolean marshal (TAO_OutputCDR &)
 
 DataReaderImpl_T ()
 
virtual ~DataReaderImpl_T ()
 
virtual DDS::ReturnCode_t enable_specific ()
 
virtual DDS::ReturnCode_t read (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t read_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info_ref)
 
virtual DDS::ReturnCode_t take_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info_ref)
 
virtual DDS::ReturnCode_t read_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t read_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t return_loan (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq)
 
virtual DDS::ReturnCode_t get_key_value (MessageType &key_holder, DDS::InstanceHandle_t handle)
 
virtual DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
 
virtual DDS::ReturnCode_t auto_return_loan (void *seq)
 
void release_loan (MessageSequenceType &received_data)
 
bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)
 
DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
 
DDS::InstanceHandle_t lookup_instance_generic (const void *data)
 
virtual DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::InstanceHandle_t store_synthetic_data (const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
 
void set_instance_state_i (DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &publication_id)
 
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
 
virtual void qos_change (const DDS::DataReaderQos &qos)
 
void set_marshal_skip_serialize (bool value)
 
bool get_marshal_skip_serialize () const
 
void release_all_instances ()
 Release all instances held by the reader. More...
 
template<>
DDS::ReturnCode_t read_generic (GenericBundle &, DDS::SampleStateMask, DDS::ViewStateMask, DDS::InstanceStateMask, bool)
 
template<>
DDS::ReturnCode_t take (AbstractSamples &, DDS::SampleStateMask, DDS::ViewStateMask, DDS::InstanceStateMask)
 
template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)
 
template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
typedef OPENDDS_MAP (DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType
 
typedef OPENDDS_SET (DDS::InstanceHandle_t) InstanceSet
 
typedef OPENDDS_SET (SubscriptionInstance_rch) SubscriptionInstanceSet
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType
 Type of collection of statistics for writers to this reader. More...
 
 DataReaderImpl ()
 
virtual ~DataReaderImpl ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
virtual void add_association (const GUID_t &yourId, const WriterAssociation &writer, bool active)
 
virtual void transport_assoc_done (int flags, const GUID_t &remote_id)
 
virtual void remove_associations (const WriterIdSeq &writers, bool callback)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void signal_liveliness (const GUID_t &remote_participant)
 
DDS::DataReaderListener_ptr listener_for (DDS::StatusKind kind)
 
void writer_became_alive (WriterInfo &info, const MonotonicTimePoint &when)
 
void writer_became_dead (WriterInfo &info)
 
void writer_removed (WriterInfo &info)
 
virtual void cleanup ()
 
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
 
virtual DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
 
virtual DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t delete_contained_entities ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::DataReaderListener_ptr get_listener ()
 
virtual DDS::TopicDescription_ptr get_topicdescription ()
 
virtual DDS::Subscriber_ptr get_subscriber ()
 
virtual DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
 
virtual DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
 
virtual DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
 
virtual DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
 
virtual DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
 
virtual DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
 
virtual DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
 
virtual DDS::ReturnCode_t enable ()
 
virtual void get_latency_stats (LatencyStatisticsSeq &stats)
 
virtual void reset_latency_stats ()
 Clear any intermediate statistical values. More...
 
virtual CORBA::Boolean statistics_enabled ()
 
virtual void statistics_enabled (CORBA::Boolean statistics_enabled)
 
void writer_activity (const DataSampleHeader &header)
 update liveliness info for this writer. More...
 
virtual void data_received (const ReceivedDataSample &sample)
 process a message that has been received - could be control or a data sample. More...
 
void transport_discovery_change ()
 
virtual bool check_transport_qos (const TransportInst &inst)
 
bool have_sample_states (DDS::SampleStateMask sample_states) const
 
bool have_view_states (DDS::ViewStateMask view_states) const
 
bool have_instance_states (DDS::InstanceStateMask instance_states) const
 
bool contains_sample (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
void process_latency (const ReceivedDataSample &sample)
 
void notify_latency (GUID_t writer)
 
size_t get_depth () const
 
size_t get_n_chunks () const
 
void liveliness_lost ()
 
void remove_all_associations ()
 
void notify_subscription_disconnected (const WriterIdSeq &pubids)
 
void notify_subscription_reconnected (const WriterIdSeq &pubids)
 
void notify_subscription_lost (const WriterIdSeq &pubids)
 
void notify_liveliness_change ()
 
bool is_bit () const
 
bool has_zero_copies ()
 
void release_instance (DDS::InstanceHandle_t handle)
 Release the instance with the handle. More...
 
void state_updated (DDS::InstanceHandle_t handle)
 
ACE_Reactor_Timer_Interfaceget_reactor ()
 
GUID_t get_topic_id ()
 
GUID_t get_dp_id ()
 
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
 
void get_instance_handles (InstanceHandleVec &instance_handles)
 
typedef OPENDDS_VECTOR (WriterStatePair) WriterStatePairVec
 
void get_writer_states (WriterStatePairVec &writer_states)
 
void update_ownership_strength (const GUID_t &pub_id, const CORBA::Long &ownership_strength)
 
OwnershipManagerPtr ownership_manager ()
 
void enable_filtering (ContentFilteredTopicImpl *cft)
 
DDS::ContentFilteredTopic_ptr get_cf_topic () const
 
void enable_multi_topic (MultiTopicImpl *mt)
 
void update_subscription_params (const DDS::StringSeq &params) const
 
typedef OPENDDS_VECTOR (void *) GenericSeq
 
void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
 
void begin_access ()
 
void end_access ()
 
void get_ordered_data (GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
void accept_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void reject_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void coherent_change_received (const GUID_t &publisher_id, Coherent_State &result)
 
void coherent_changes_completed (DataReaderImpl *reader)
 
void reset_coherent_info (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void set_subscriber_qos (const DDS::SubscriberQos &qos)
 
void reset_ownership (DDS::InstanceHandle_t instance)
 
virtual RcHandle< EntityImplparent () const
 
void disable_transport ()
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
GUID_t get_guid () const
 
void return_handle (DDS::InstanceHandle_t handle)
 
const ValueDispatcherget_value_dispatcher () const
 
const StatsMapType & raw_latency_statistics () const
 Expose the statistics container. More...
 
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer. More...
 
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderEx
void get_latency_stats (inout LatencyStatisticsSeq stats)
 Obtain a sequence of statistics summaries. More...
 
- Public Member Functions inherited from DDS::DataReader
ReadCondition create_readcondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states)
 
QueryCondition create_querycondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states, in string query_expression, in StringSeq query_parameters)
 
ReturnCode_t delete_readcondition (in ReadCondition a_condition)
 
ReturnCode_t set_qos (in DataReaderQos qos)
 
ReturnCode_t get_qos (inout DataReaderQos qos)
 
ReturnCode_t set_listener (in DataReaderListener a_listener, in StatusMask mask)
 
ReturnCode_t get_sample_rejected_status (inout SampleRejectedStatus status)
 
ReturnCode_t get_liveliness_changed_status (inout LivelinessChangedStatus status)
 
ReturnCode_t get_requested_deadline_missed_status (inout RequestedDeadlineMissedStatus status)
 
ReturnCode_t get_requested_incompatible_qos_status (inout RequestedIncompatibleQosStatus status)
 
ReturnCode_t get_subscription_matched_status (inout SubscriptionMatchedStatus status)
 
ReturnCode_t get_sample_lost_status (inout SampleLostStatus status)
 
ReturnCode_t wait_for_historical_data (in Duration_t max_wait)
 
ReturnCode_t get_matched_publications (inout InstanceHandleSeq publication_handles)
 
ReturnCode_t get_matched_publication_data (inout PublicationBuiltinTopicData publication_data, in InstanceHandle_t publication_handle)
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderCallbacks
 DataReaderCallbacks ()
 
virtual ~DataReaderCallbacks ()
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
virtual ~TransportReceiveListener ()
 

Protected Member Functions

virtual RcHandle< MessageHolderdds_demarshal (const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)
 
virtual void dispose_unregister (const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
 
virtual void purge_data (OpenDDS::DCPS::SubscriptionInstance_rch instance)
 
virtual void release_instance_i (DDS::InstanceHandle_t handle)
 
virtual void state_updated_i (DDS::InstanceHandle_t handle)
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
typedef OPENDDS_SET (DDS::InstanceHandle_t) HandleSet
 
typedef OPENDDS_MAP (CORBA::ULong, HandleSet) LookupMap
 
void initialize_lookup_maps ()
 
void update_lookup_maps (const SubscriptionInstanceMapType::iterator &input)
 
void remove_from_lookup_maps (DDS::InstanceHandle_t handle)
 
const HandleSet & lookup_matching_instances (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
 
DataReaderListener_ptr get_ext_listener ()
 
virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
 
void prepare_to_delete ()
 
DDS::ReturnCode_t setup_deserialization ()
 Setup deserialization options. More...
 
RcHandle< SubscriberImplget_subscriber_servant ()
 
void post_read_or_take ()
 
void sample_info (DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
 
CORBA::Long total_samples () const
 
void set_sample_lost_status (const DDS::SampleLostStatus &status)
 
void set_sample_rejected_status (const DDS::SampleRejectedStatus &status)
 
SubscriptionInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
 
DDS::InstanceHandle_t get_next_handle (const DDS::BuiltinTopicKey_t &key)
 
bool has_readcondition (DDS::ReadCondition_ptr a_condition)
 
bool filter_sample (const DataSampleHeader &header)
 
bool ownership_filter_instance (const SubscriptionInstance_rch &instance, const GUID_t &pubid)
 
bool time_based_filter_instance (const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
 
void accept_sample_processing (const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
 
void notify_read_conditions ()
 Data has arrived into the cache, unblock waiting ReadConditions. More...
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
typedef OPENDDS_SET (Encoding::Kind) EncodingKinds
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
 TransportReceiveListener ()
 

Private Types

typedef DCPS::PmfSporadicTask< DataReaderImpl_TDRISporadicTask
 
typedef ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_MutexDataSampleHeader_ptr
 

Private Member Functions

void dynamic_hook (MessageType &)
 
bool store_instance_data_check (unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
 
DDS::ReturnCode_t read_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t read_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t read_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
void store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
 
void finish_store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
 
void notify_status_condition_no_sample_lock ()
 
DDS::ReturnCode_t check_inputs (const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
 Common input read* & take* input processing and precondition checks. More...
 
void delay_sample (DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
 
void clear_sample (DDS::InstanceHandle_t handle)
 
void drop_sample (DDS::InstanceHandle_t handle)
 
void filter_delayed (const MonotonicTimePoint &now)
 
unique_ptr< DataAllocator > & data_allocator ()
 
typedef OPENDDS_MAP (DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap
 
typedef OPENDDS_MULTIMAP (MonotonicTimePoint, DDS::InstanceHandle_t) FilterDelayedSampleQueue
 
template<>
void dynamic_hook (XTypes::DynamicSample &sample)
 
template<>
void dynamic_hook (XTypes::DynamicSample &sample)
 

Private Attributes

unique_ptr< DataAllocatordata_allocator_
 
InstanceMap instance_map_
 
ReverseInstanceMap reverse_instance_map_
 
RcHandle< DRISporadicTaskfilter_delayed_sample_task_
 
FilterDelayedSampleMap filter_delayed_sample_map_
 
FilterDelayedSampleQueue filter_delayed_sample_queue_
 
bool marshal_skip_serialize_
 

Additional Inherited Members

- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDSTraits< MessageType >::DataReaderType >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Public Attributes inherited from OpenDDS::DCPS::DataReaderEx
attribute boolean statistics_enabled
 Statistics gathering enable state. More...
 
- Protected Types inherited from OpenDDS::DCPS::DataReaderImpl
typedef ACE_Reverse_Lock< ACE_Recursive_Thread_MutexReverse_Lock_t
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
static CORBA::ULong to_combined_states (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
 
static void split_combined_states (CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::DataReaderImpl
LookupMap combined_state_lookup_
 
SubscriptionInstanceMapType instances_
 : document why the instances_ container is mutable. More...
 
ACE_Recursive_Thread_Mutex instances_lock_
 
bool has_subscription_id_
 
ACE_Thread_Mutex subscription_id_mutex_
 
ConditionVariable< ACE_Thread_Mutexsubscription_id_condition_
 
unique_ptr< ReceivedDataAllocatorrd_allocator_
 
DDS::DataReaderQos qos_
 
DDS::DataReaderQos passed_qos_
 
DDS::SampleRejectedStatus sample_rejected_status_
 
DDS::SampleLostStatus sample_lost_status_
 
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses. More...
 
Reverse_Lock_t reverse_sample_lock_
 
WeakRcHandle< DomainParticipantImplparticipant_servant_
 
TopicDescriptionPtr< TopicImpltopic_servant_
 
TypeSupportImpltype_support_
 
GUID_t topic_id_
 
bool is_exclusive_ownership_
 
ACE_Thread_Mutex content_filtered_topic_mutex_
 
TopicDescriptionPtr< ContentFilteredTopicImplcontent_filtered_topic_
 
TopicDescriptionPtr< MultiTopicImplmulti_topic_
 
bool coherent_
 Is accessing to Group coherent changes ? More...
 
GroupRakeData group_coherent_ordered_data_
 Ordered group samples. More...
 
DDS::SubscriberQos subqos_
 
EncodingKinds decoding_modes_
 
Security::SecurityConfig_rch security_config_
 
DDS::DynamicType_var dynamic_type_
 
TransportMessageBlockAllocator mb_alloc_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 
- Static Protected Attributes inherited from OpenDDS::DCPS::DataReaderImpl
static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE
 
static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u
 
static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE
 
static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u
 
static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
 
static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u
 
static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS
 
static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS
 

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataReaderImpl_T< MessageType >

Servant for DataReader interface of Traits::MessageType data type.

See the DDS specification, OMG formal/2015-04-10, for a description of this interface.

Definition at line 41 of file DataReaderImpl_T.h.

Member Typedef Documentation

◆ DataAllocator

Definition at line 118 of file DataReaderImpl_T.h.

◆ DataSampleHeader_ptr

Definition at line 2424 of file DataReaderImpl_T.h.

◆ DRISporadicTask

template<typename MessageType>
typedef DCPS::PmfSporadicTask<DataReaderImpl_T> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DRISporadicTask
private

Definition at line 2418 of file DataReaderImpl_T.h.

◆ Interface

template<typename MessageType>
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface

Definition at line 66 of file DataReaderImpl_T.h.

◆ MarshalTraitsType

template<typename MessageType>
typedef MarshalTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MarshalTraitsType

Definition at line 51 of file DataReaderImpl_T.h.

◆ MessageSequenceType

template<typename MessageType>
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType

Definition at line 52 of file DataReaderImpl_T.h.

◆ SharedInstanceMap_rch

template<typename MessageType>
typedef RcHandle<SharedInstanceMap> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::SharedInstanceMap_rch

Definition at line 64 of file DataReaderImpl_T.h.

◆ TraitsType

template<typename MessageType>
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType

Definition at line 50 of file DataReaderImpl_T.h.

Constructor & Destructor Documentation

◆ DataReaderImpl_T()

template<typename MessageType>
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T ( )
inline

Definition at line 120 of file DataReaderImpl_T.h.

121  : filter_delayed_sample_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl_T::filter_delayed))
122  , marshal_skip_serialize_(false)
123  {
125  }
RcHandle< DRISporadicTask > filter_delayed_sample_task_
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
#define TheServiceParticipant
void filter_delayed(const MonotonicTimePoint &now)

◆ ~DataReaderImpl_T()

template<typename MessageType>
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T ( )
inlinevirtual

Definition at line 127 of file DataReaderImpl_T.h.

128  {
129  filter_delayed_sample_task_->cancel();
130 
131  for (typename InstanceMap::iterator it = instance_map_.begin();
132  it != instance_map_.end(); ++it)
133  {
135  if (!ptr) continue;
136  purge_data(ptr);
137  }
138  //X SHH release the data samples in the instance_map_.
139  }
virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
RcHandle< DRISporadicTask > filter_delayed_sample_task_
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)

Member Function Documentation

◆ _interface_repository_id()

template<typename MessageType>
const char* OpenDDS::DCPS::DataReaderImpl_T< MessageType >::_interface_repository_id ( void  ) const
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 73 of file DataReaderImpl_T.h.

74  {
75  return Interface::_interface_repository_id();
76  }

◆ _is_a()

template<typename MessageType>
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl_T< MessageType >::_is_a ( const char *  type_id)
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 68 of file DataReaderImpl_T.h.

69  {
70  return Interface::_is_a(type_id);
71  }

◆ auto_return_loan()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan ( void *  seq)
inlinevirtual

Definition at line 682 of file DataReaderImpl_T.h.

683  {
684  MessageSequenceType& received_data =
685  *static_cast< MessageSequenceType*> (seq);
686 
687  if (!received_data.release())
688  {
689  // release_loan(received_data);
690  received_data.length(0);
691  }
692  return DDS::RETCODE_OK;
693  }
const ReturnCode_t RETCODE_OK
TraitsType::MessageSequenceType MessageSequenceType

◆ check_inputs()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs ( const char *  method_name,
MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples 
)
inlineprivate

Common input read* & take* input processing and precondition checks.

Definition at line 2178 of file DataReaderImpl_T.h.

2182 {
2183  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
2184 
2185  // ---- start of preconditions common to read and take -----
2186  // SPEC ref v1.2 7.1.2.5.3.8 #1
2187  // NOTE: We can't check maximum() or release() here since those are
2188  // implementation details of the sequences. In general, the
2189  // info_seq will have release() == true and maximum() == 0.
2190  // If we're in zero-copy mode, the received_data will have
2191  // release() == false and maximum() == 0. If it's not
2192  // zero-copy then received_data will have release == true()
2193  // and maximum() == anything.
2194  if (received_data.length() != info_seq.length())
2195  {
2196  ACE_DEBUG((LM_DEBUG,
2197  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2198  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
2199  ACE_TEXT("sequences do not match.\n"),
2200  TraitsType::type_name(),
2201  method_name ));
2203  }
2204 
2205  //SPEC ref v1.2 7.1.2.5.3.8 #4
2206  if ((received_data.maximum() > 0) && (received_data.release() == false))
2207  {
2208  ACE_DEBUG((LM_DEBUG,
2209  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2210  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
2211  ACE_TEXT("maximum %d and owns %d\n"),
2212  TraitsType::type_name(),
2213  method_name,
2214  received_data.maximum(),
2215  received_data.release() ));
2216 
2218  }
2219 
2220  if (received_data.maximum() == 0)
2221  {
2222  // not in SPEC but needed.
2224  {
2225  max_samples =
2226  static_cast< ::CORBA::Long> (received_data_p.max_slots());
2227  }
2228  }
2229  else
2230  {
2232  {
2233  //SPEC ref v1.2 7.1.2.5.3.8 #5a
2234  max_samples = received_data.maximum();
2235  }
2236  else if (
2237  max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
2238  {
2239  //SPEC ref v1.2 7.1.2.5.3.8 #5c
2240  ACE_DEBUG((LM_DEBUG,
2241  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2242  ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
2243  TraitsType::type_name(),
2244  method_name,
2245  max_samples,
2246  received_data.maximum()));
2248  }
2249  //else
2250  //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
2251  }
2252 
2253  // The spec does not say what to do in this case but it appears to be a good thing.
2254  // Note: max_slots is the greater of the sequence's maximum and init_size.
2255  if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
2256  {
2257  max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
2258  }
2259  //---- end of preconditions common to read and take -----
2260 
2261  return DDS::RETCODE_OK;
2262 }
#define ACE_DEBUG(X)
ACE_CDR::Long Long
const ReturnCode_t RETCODE_OK
const long LENGTH_UNLIMITED
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ clear_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::clear_sample ( DDS::InstanceHandle_t  handle)
inlineprivate

Definition at line 2308 of file DataReaderImpl_T.h.

2309 {
2310  // sample_lock_ should already be held
2311 
2312  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2313  if (sample != filter_delayed_sample_map_.end()) {
2314  // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
2315  sample->second.message.reset();
2316  }
2317 }
FilterDelayedSampleMap filter_delayed_sample_map_

◆ contains_sample_filtered()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const FilterEvaluator evaluator,
const DDS::StringSeq params 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 701 of file DataReaderImpl_T.h.

706  {
709 
711  TypeSupport* const ts = topic->get_type_support();
712  TypeSupportImpl* const type_support = dynamic_cast<TypeSupportImpl*>(ts);
713  const bool filter_has_non_key_fields = type_support ? evaluator.has_non_key_fields(*type_support) : true;
714 
716  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
717  ++next; // pre-increment iterator, in case updates cause changes to match set
718  const DDS::InstanceHandle_t handle = *it;
719  const SubscriptionInstance_rch inst = get_handle_instance(handle);
720  if (!inst) continue;
721 
722  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
723  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
724  if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
725  continue;
726  }
727  if (evaluator.eval(*static_cast<MessageType*>(item->registered_data_), params)) {
728  return true;
729  }
730  }
731  }
732 
733  return false;
734  }
bool has_non_key_fields(const TypeSupportImpl &ts) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
TopicDescriptionPtr< TopicImpl > topic_servant_
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
bool eval(const T &sample, const DDS::StringSeq &params) const

◆ data_allocator()

template<typename MessageType>
unique_ptr<DataAllocator>& OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator ( )
inlineprivate

Definition at line 2411 of file DataReaderImpl_T.h.

2411 { return data_allocator_; }
unique_ptr< DataAllocator > data_allocator_

◆ dds_demarshal()

template<typename MessageType>
virtual RcHandle<MessageHolder> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal ( const OpenDDS::DCPS::ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
OpenDDS::DCPS::SubscriptionInstance_rch instance,
bool &  just_registered,
bool &  filtered,
OpenDDS::DCPS::MarshalingType  marshaling_type,
bool  full_copy 
)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1067 of file DataReaderImpl_T.h.

1074  {
1075  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
1076  dynamic_hook(*data);
1077  RcHandle<MessageHolder> message_holder;
1078 
1079  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1081  if (!MarshalTraitsType::from_message_block(*data, *payload)) {
1082  if (DCPS_debug_level > 0) {
1083  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::dds_demarshal: ")
1084  ACE_TEXT("attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
1085  }
1086  return message_holder;
1087  }
1088  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1089  return message_holder;
1090  }
1091  const bool encapsulated = sample.header_.cdr_encapsulation_;
1092 
1094  payload.get(),
1096  static_cast<Endianness>(sample.header_.byte_order_));
1097 
1098  if (encapsulated) {
1099  EncapsulationHeader encap;
1100  if (!(ser >> encap)) {
1101  if (DCPS_debug_level > 0) {
1102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1103  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1104  ACE_TEXT("deserialization of encapsulation header failed.\n"),
1105  TraitsType::type_name()));
1106  }
1107  return message_holder;
1108  }
1110  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
1111  return message_holder;
1112  }
1113 
1114  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
1115  if (DCPS_debug_level >= 1) {
1116  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
1117  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1118  ACE_TEXT("Encoding kind %C of the received sample does not ")
1119  ACE_TEXT("match the ones specified by DataReader.\n"),
1120  TraitsType::type_name(),
1121  Encoding::kind_to_string(encoding.kind()).c_str()));
1122  }
1123  return message_holder;
1124  }
1125  if (DCPS_debug_level >= 8) {
1126  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
1127  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1128  ACE_TEXT("Deserializing with encoding kind %C.\n"),
1129  TraitsType::type_name(),
1130  Encoding::kind_to_string(encoding.kind()).c_str()));
1131  }
1132 
1133  ser.encoding(encoding);
1134  }
1135 
1136  const bool key_only_marshaling =
1137  marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING;
1138 
1139  bool ser_ret = true;
1140  if (key_only_marshaling) {
1141  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(*data);
1142  } else {
1143  ser_ret = ser >> *data;
1144  if (full_copy) {
1145  message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
1146  }
1147  }
1148  if (!ser_ret) {
1149  if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
1150  if (DCPS_debug_level > 1) {
1151  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
1152  ACE_TEXT("object construction failure, dropping sample.\n"),
1153  TraitsType::type_name()));
1154  }
1155  } else {
1156  if (DCPS_debug_level > 0) {
1157  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR %CDataReaderImpl::dds_demarshal ")
1158  ACE_TEXT("deserialization failed, dropping sample.\n"),
1159  TraitsType::type_name()));
1160  }
1161  }
1162  return message_holder;
1163  }
1164 
1165 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1166  /*
1167  * If sample.header_.content_filter_ is true, the writer has already
1168  * filtered.
1169  */
1170  if (!sample.header_.content_filter_) {
1173  const bool sample_only_has_key_fields = !sample.header_.valid_data();
1174  if (key_only_marshaling != sample_only_has_key_fields) {
1175  if (DCPS_debug_level > 0) {
1176  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1177  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1178  ACE_TEXT("Mismatch between the key only and valid data properties ")
1179  ACE_TEXT("of a %C message of a content filtered topic!\n"),
1180  TraitsType::type_name(),
1181  to_string(static_cast<MessageId>(sample.header_.message_id_))));
1182  }
1183  filtered = true;
1184  message_holder.reset();
1185  return message_holder;
1186  }
1187  const MessageType& type = static_cast<MessageType&>(*data);
1188  if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
1189  filtered = true;
1190  message_holder.reset();
1191  return message_holder;
1192  }
1193  }
1194  }
1195 #endif
1196 
1197  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1198  return message_holder;
1199  }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
char message_id_
The enum MessageId.
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
unique_ptr< DataAllocator > & data_allocator()
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
bool valid_data() const
Returns true if the sample has a complete serialized payload.
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153
const char * to_string(MessageId value)
TypeSupportImpl * type_support_
ACE_Thread_Mutex content_filtered_topic_mutex_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DataSampleHeader header_
The demarshalled sample header.
ACE_TEXT("TCP_Factory")
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
TransportMessageBlockAllocator mb_alloc_

◆ delay_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::delay_sample ( DDS::InstanceHandle_t  handle,
unique_ptr< MessageTypeWithAllocator data,
const OpenDDS::DCPS::DataSampleHeader header,
const bool  just_registered,
const MonotonicTimePoint now,
const MonotonicTimePoint deadline 
)
inlineprivate

Definition at line 2264 of file DataReaderImpl_T.h.

2270 {
2271  // sample_lock_ should already be held
2273 
2274  typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
2275  if (i == filter_delayed_sample_map_.end()) {
2276 
2277  // emplace()/insert() only if the sample is going to be
2278  // new (otherwise we call move(data) twice).
2279  std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
2280 #ifdef ACE_HAS_CPP11
2281  filter_delayed_sample_map_.emplace(std::piecewise_construct,
2282  std::forward_as_tuple(handle),
2283  std::forward_as_tuple(move(data), hdr, just_registered));
2284 #else
2285  filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
2286 #endif
2287  FilterDelayedSample& sample = result.first->second;
2288  sample.expiration_time = deadline;
2289  const bool schedule = filter_delayed_sample_queue_.empty();
2290  filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
2291  if (schedule) {
2292  filter_delayed_sample_task_->schedule(now - deadline);
2293  } else if (filter_delayed_sample_queue_.begin()->second == handle) {
2294  filter_delayed_sample_task_->cancel();
2295  filter_delayed_sample_task_->schedule(now - deadline);
2296  }
2297  } else {
2298  FilterDelayedSample& sample = i->second;
2299  // we only care about the most recently filtered sample, so clean up the last one
2300 
2301  sample.message = move(data);
2302  sample.header = hdr;
2303  sample.new_instance = just_registered;
2304  // already scheduled for timeout at the desired time
2305  }
2306 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleMap filter_delayed_sample_map_
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
FilterDelayedSampleQueue filter_delayed_sample_queue_

◆ dispose_unregister()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister ( const OpenDDS::DCPS::ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
)
inlineprotectedvirtual

!! caller should already have the sample_lock_

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 1201 of file DataReaderImpl_T.h.

1204  {
1205  //!!! caller should already have the sample_lock_
1206 
1207  // The data sample in this dispose message does not contain any valid data.
1208  // What it needs here is the key value to identify the instance to dispose.
1209  // The demarshal push this "sample" to received sample list so the user
1210  // can be notified the dispose event.
1211  bool just_registered = false;
1212  bool filtered = false;
1214  if (sample.header_.key_fields_only_) {
1216  }
1217  dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling, false);
1218  }
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
DataSampleHeader header_
The demarshalled sample header.
virtual RcHandle< MessageHolder > dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)

◆ drop_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::drop_sample ( DDS::InstanceHandle_t  handle)
inlineprivate

Definition at line 2319 of file DataReaderImpl_T.h.

2320 {
2321  // sample_lock_ should already be held
2322 
2323  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2324  if (sample != filter_delayed_sample_map_.end()) {
2325  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
2326  if (pos->second == handle) {
2327  filter_delayed_sample_queue_.erase(pos);
2328  break;
2329  }
2330  }
2331 
2332  // use the handle to erase, since the sample lock was released
2333  filter_delayed_sample_map_.erase(handle);
2334  }
2335 }
FilterDelayedSampleMap filter_delayed_sample_map_
FilterDelayedSampleQueue filter_delayed_sample_queue_

◆ dynamic_hook() [1/3]

template<>
void OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::dynamic_hook ( XTypes::DynamicSample sample)
private

◆ dynamic_hook() [2/3]

template<>
void OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::dynamic_hook ( XTypes::DynamicSample sample)
inlineprivate

Definition at line 114 of file DynamicDataReaderImpl.h.

115  {
116  XTypes::DynamicDataReaderImpl* const self = dynamic_cast<XTypes::DynamicDataReaderImpl*>(this);
117  if (self) {
118  self->imbue_type(sample);
119  }
120  }

◆ dynamic_hook() [3/3]

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dynamic_hook ( MessageType &  )
inlineprivate

Available for specialization so that some types of MessageType can observe and change the sample before dds_demarshal deserializes into it

Definition at line 1257 of file DataReaderImpl_T.h.

1257 {}

◆ enable_specific()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific ( )
inlinevirtual

Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 145 of file DataReaderImpl_T.h.

146  {
147  data_allocator().reset(new DataAllocator(get_n_chunks ()));
149  ACE_DEBUG((LM_DEBUG,
150  ACE_TEXT("(%P|%t) %CDataReaderImpl::")
151  ACE_TEXT("enable_specific-data")
152  ACE_TEXT(" Cached_Allocator_With_Overflow ")
153  ACE_TEXT("%x with %d chunks\n"),
154  TraitsType::type_name(),
155  data_allocator().get(),
156  get_n_chunks ()));
157 
158  return DDS::RETCODE_OK;
159  }
#define ACE_DEBUG(X)
const ReturnCode_t RETCODE_OK
unique_ptr< DataAllocator > & data_allocator()
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_Mutex > DataAllocator
ACE_TEXT("TCP_Factory")

◆ filter_delayed()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed ( const MonotonicTimePoint now)
inlineprivate

Definition at line 2337 of file DataReaderImpl_T.h.

2338 {
2339  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2340 
2341  // Make a copy because finish_store_instance_data will release the sample lock.
2342  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) Handles;
2343  Handles handles;
2344 
2346 
2347  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
2348  handles.push_back(pos->second);
2349  filter_delayed_sample_queue_.erase(pos++);
2350  }
2351 
2353 
2354  for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
2355  const DDS::InstanceHandle_t handle = *pos;
2356 
2357  SubscriptionInstance_rch instance = get_handle_instance(handle);
2358  if (!instance) {
2359  continue;
2360  }
2361 
2362  typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
2363  if (data == filter_delayed_sample_map_.end()) {
2364  continue;
2365  }
2366 
2367  if (data->second.message) {
2368  const bool NOT_DISPOSE_MSG = false;
2369  const bool NOT_UNREGISTER_MSG = false;
2370  // clear the message, since ownership is being transferred to finish_store_instance_data.
2371 
2372  instance->last_accepted_.set_to_now();
2373  const DataSampleHeader_ptr header = data->second.header;
2374  const bool new_instance = data->second.new_instance;
2375 
2376  // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
2377  finish_store_instance_data(move(data->second.message),
2378  *header,
2379  instance,
2380  NOT_DISPOSE_MSG,
2381  NOT_UNREGISTER_MSG);
2382 
2383  accept_sample_processing(instance, *header, new_instance);
2384 
2385  // Refresh the iterator.
2386  data = filter_delayed_sample_map_.find(handle);
2387  if (data == filter_delayed_sample_map_.end()) {
2388  continue;
2389  }
2390 
2391  // Reschedule.
2392  data->second.expiration_time = now + interval;
2393  filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
2394 
2395  } else {
2396  // this check is performed to handle the corner case where
2397  // store_instance_data received and delivered a sample, while this
2398  // method was waiting for the lock
2399  if (MonotonicTimePoint::now() - instance->last_sample_tv_ >= interval) {
2400  // no new data to process, so remove from container
2401  filter_delayed_sample_map_.erase(data);
2402  }
2403  }
2404  }
2405 
2406  if (!filter_delayed_sample_queue_.empty()) {
2407  filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
2408  }
2409 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
TimeBasedFilterQosPolicy time_based_filter
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
RcHandle< DRISporadicTask > filter_delayed_sample_task_
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
FilterDelayedSampleMap filter_delayed_sample_map_
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
void accept_sample_processing(const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
#define TheServiceParticipant
FilterDelayedSampleQueue filter_delayed_sample_queue_

◆ finish_store_instance_data()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
const DataSampleHeader header,
SubscriptionInstance_rch  instance_ptr,
bool  is_dispose_msg,
bool  is_unregister_msg 
)
inlineprivate

Definition at line 1943 of file DataReaderImpl_T.h.

1945 {
1948  (instance_ptr->rcvd_samples_.size() >=
1949  static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
1950 
1951  // According to spec 1.2, Samples that contain no data do not
1952  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
1953  // so do not remove the oldest sample when unregister/dispose
1954  // message arrives.
1955 
1956  if (!is_dispose_msg && !is_unregister_msg
1957  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
1958  {
1959  DDS::DataReaderListener_var listener
1961 
1963 
1969 
1970  if (!CORBA::is_nil(listener.in()))
1971  {
1973 
1974  listener->on_sample_rejected(this, sample_rejected_status_);
1976  } // do we want to do something if listener is nil???
1978  return;
1979  }
1980  else if (!is_dispose_msg && !is_unregister_msg)
1981  {
1982  // Discard the oldest previously-read sample
1984  instance_ptr->rcvd_samples_.remove_head();
1985  item->dec_ref();
1986  }
1987  }
1989  {
1991  {
1993  for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
1994  iter != instances_.end();
1995  ++iter) {
1996  OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
1997 
1998  total_samples += (CORBA::Long) ptr->rcvd_samples_.size();
1999  }
2000  }
2001 
2002  if (total_samples >= qos_.resource_limits.max_samples)
2003  {
2004  // According to spec 1.2, Samples that contain no data do not
2005  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
2006  // so do not remove the oldest sample when unregister/dispose
2007  // message arrives.
2008 
2009  if (!is_dispose_msg && !is_unregister_msg
2010  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
2011  {
2012  DDS::DataReaderListener_var listener
2014 
2016 
2022  if (!CORBA::is_nil(listener.in()))
2023  {
2025 
2026  listener->on_sample_rejected(this, sample_rejected_status_);
2028  } // do we want to do something if listener is nil???
2030 
2031  return;
2032  }
2033  else if (!is_dispose_msg && !is_unregister_msg)
2034  {
2035  // Discard the oldest previously-read sample
2037  instance_ptr->rcvd_samples_.remove_head();
2038  item->dec_ref();
2039  }
2040  }
2041  }
2042 
2043  bool event_notify = false;
2044 
2045  if (is_dispose_msg) {
2046  event_notify = instance_ptr->instance_state_->dispose_was_received(header.publication_id_);
2047  }
2048 
2049  if (is_unregister_msg) {
2050  if (instance_ptr->instance_state_->unregister_was_received(header.publication_id_)) {
2051  event_notify = true;
2052  }
2053  }
2054 
2055  if (!is_dispose_msg && !is_unregister_msg) {
2056  event_notify = true;
2057  instance_ptr->instance_state_->data_was_received(header.publication_id_);
2058  }
2059 
2060  if (!event_notify) {
2061  return;
2062  }
2063 
2064  ReceivedDataElement* const ptr =
2066  header, instance_data.release(), &sample_lock_);
2067 
2069  instance_ptr->instance_state_->disposed_generation_count();
2072 
2073  instance_ptr->last_sequence_ = header.sequence_;
2074 
2075  instance_ptr->rcvd_strategy_->add(ptr);
2076 
2077  if (! is_dispose_msg && ! is_unregister_msg
2078  && instance_ptr->rcvd_samples_.size() > get_depth())
2079  {
2081  instance_ptr->rcvd_samples_.remove_head();
2082 
2083  if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
2084  {
2085  DDS::DataReaderListener_var listener
2087 
2090 
2092 
2093  if (!CORBA::is_nil(listener.in()))
2094  {
2096 
2097  listener->on_sample_lost(this, sample_lost_status_);
2098 
2100  }
2101 
2103  }
2104 
2105  head_ptr->dec_ref();
2106  }
2107 
2108 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2109  if (! ptr->coherent_change_) {
2110 #endif
2112  if (!sub || get_deleted())
2113  return;
2114 
2116 
2118 
2119  DDS::SubscriberListener_var sub_listener =
2121  if (!CORBA::is_nil(sub_listener.in()) && !coherent_) {
2122  if (!is_bit()) {
2125  sub_listener->on_data_on_readers(sub.in());
2126  } else {
2127  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(sub, sub_listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, false));
2128  }
2129  } else {
2130  sub->notify_status_condition();
2131 
2132  DDS::DataReaderListener_var listener =
2134 
2135  if (!CORBA::is_nil(listener.in())) {
2136  if (!is_bit()) {
2139  sub.reset();
2141  listener->on_data_available(this);
2142  } else {
2143  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, true, true));
2144  }
2145  } else {
2147  }
2148  }
2149 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2150  }
2151 #endif
2152 }
ACE_CDR::Long Long
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
bool coherent_change_
Sample belongs to an active coherent change set.
ResourceLimitsQosPolicy resource_limits
ACE_Recursive_Thread_Mutex instances_lock_
const StatusKind DATA_ON_READERS_STATUS
void data_was_received(const GUID_t &writer_id)
Data sample received for this instance.
unique_ptr< ReceivedDataAllocator > rd_allocator_
size_t disposed_generation_count() const
Access disposed generation count.
const long LENGTH_UNLIMITED
const StatusKind DATA_AVAILABLE_STATUS
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
size_t no_writers_generation_count() const
Access no writers generation count.
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
bool dispose_was_received(const GUID_t &writer_id)
const InstanceState_rch instance_state_
Instance state for this instance.
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
bool matches(CORBA::ULong sample_states) const
DDS::SampleRejectedStatus sample_rejected_status_
unique_ptr< ReceivedDataStrategy > rcvd_strategy_
ReceivedDataElementList strategy.
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::SampleLostStatus sample_lost_status_
const SampleStateKind NOT_READ_SAMPLE_STATE
bool unregister_was_received(const GUID_t &writer_id)
const StatusKind SAMPLE_LOST_STATUS
bool coherent_
Is accessing to Group coherent changes ?
RcHandle< SubscriberImpl > get_subscriber_servant()
const StatusKind SAMPLE_REJECTED_STATUS
#define TheServiceParticipant
SequenceNumber last_sequence_
Sequence number of the move recent data sample received.
SampleRejectedStatusKind last_reason
Boolean is_nil(T x)
CORBA::Long total_samples() const
const SampleStateKind READ_SAMPLE_STATE

◆ get_key_value()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
DDS::InstanceHandle_t  handle 
)
inlinevirtual

Definition at line 657 of file DataReaderImpl_T.h.

659  {
661 
662  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
663  if (pos != reverse_instance_map_.end()) {
664  key_holder = pos->second->first;
665  return DDS::RETCODE_OK;
666  }
667 
669  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER
ReverseInstanceMap reverse_instance_map_

◆ get_marshal_skip_serialize()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_marshal_skip_serialize ( ) const
inline

Definition at line 1047 of file DataReaderImpl_T.h.

1048  {
1049  return marshal_skip_serialize_;
1050  }

◆ lookup_instance() [1/2]

template<typename MessageType>
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data)
inlinevirtual

Definition at line 671 of file DataReaderImpl_T.h.

Referenced by OpenDDS::DCPS::BitSubscriber::remove_connection_record(), and OpenDDS::DCPS::BitSubscriber::remove_thread_status().

672  {
674 
675  const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
676  if (it != instance_map_.end()) {
677  return it->second;
678  }
679  return DDS::HANDLE_NIL;
680  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const InstanceHandle_t HANDLE_NIL

◆ lookup_instance() [2/2]

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
)
inlinevirtual

!! caller should already have the sample_lock_

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 916 of file DataReaderImpl_T.h.

918  {
919  //!!! caller should already have the sample_lock_
920  const bool encapsulated = sample.header_.cdr_encapsulation_;
921  Message_Block_Ptr payload(sample.data(&mb_alloc_));
923  payload.get(),
925  static_cast<Endianness>(sample.header_.byte_order_));
926 
927  if (encapsulated) {
928  EncapsulationHeader encap;
929  if (!(ser >> encap)) {
930  if (DCPS_debug_level > 0) {
931  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
932  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
933  ACE_TEXT("deserialization of encapsulation header failed.\n"),
934  TraitsType::type_name()));
935  }
936  return;
937  }
939  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
940  return;
941  }
942 
943  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
944  if (DCPS_debug_level >= 1) {
945  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
946  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
947  ACE_TEXT("Encoding kind of the received sample (%C) does not ")
948  ACE_TEXT("match the ones specified by DataReader.\n"),
949  TraitsType::type_name(),
950  Encoding::kind_to_string(encoding.kind()).c_str()));
951  }
952  return;
953  }
954  if (DCPS_debug_level >= 8) {
955  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
956  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
957  ACE_TEXT("Deserializing with encoding kind %C.\n"),
958  TraitsType::type_name(),
959  Encoding::kind_to_string(encoding.kind()).c_str()));
960  }
961 
962  ser.encoding(encoding);
963  }
964 
965  bool ser_ret = true;
966  MessageType data;
967  if (sample.header_.key_fields_only_) {
968  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(data);
969  } else {
970  ser_ret = ser >> data;
971  }
972  if (!ser_ret) {
973  if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
974  if (DCPS_debug_level > 1) {
975  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
976  ACE_TEXT("object construction failure, dropping sample.\n"),
977  TraitsType::type_name()));
978  }
979  } else {
980  if (DCPS_debug_level > 0) {
981  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
982  ACE_TEXT("deserialization failed.\n"),
983  TraitsType::type_name()));
984  }
985  }
986  return;
987  }
988 
990  typename InstanceMap::const_iterator const it = instance_map_.find(data);
991  if (it != instance_map_.end()) {
992  handle = it->second;
993  }
994 
995  if (handle == DDS::HANDLE_NIL) {
996  instance.reset();
997  } else {
998  instance = get_handle_instance(handle);
999  }
1000  }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
const InstanceHandle_t HANDLE_NIL
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
TypeSupportImpl * type_support_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DataSampleHeader header_
The demarshalled sample header.
ACE_TEXT("TCP_Factory")
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
TransportMessageBlockAllocator mb_alloc_

◆ lookup_instance_generic()

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic ( const void *  data)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 760 of file DataReaderImpl_T.h.

761  {
762  return lookup_instance(*static_cast<const MessageType*>(data));
763  }
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)

◆ marshal()

template<typename MessageType>
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl_T< MessageType >::marshal ( TAO_OutputCDR )
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 78 of file DataReaderImpl_T.h.

79  {
80  return false;
81  }

◆ notify_status_condition_no_sample_lock()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock ( )
inlineprivate

Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.

Definition at line 2157 of file DataReaderImpl_T.h.

2158 {
2159  // This member function avoids a deadlock condition which otherwise
2160  // could occur as follows:
2161  // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
2162  // eventually DataReaderImpl::sample_lock_ to lock in call to
2163  // DataReaderImpl::contains_samples().
2164  // Thread2: Call to DataReaderImpl::data_received()
2165  // causes DataReaderImpl::sample_lock_ to lock and eventually
2166  // during notify of status condition a call to WaitSet::signal()
2167  // causes WaitSet::lock_ to lock.
2168  // Because the DataReaderImpl::sample_lock_ is not needed during
2169  // status notification this member function is used in
2170  // store_instance_data() to release sample_lock_ before making
2171  // the notification.
2174 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ OPENDDS_MAP() [1/2]

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
typename InstanceMap::iterator   
)

◆ OPENDDS_MAP() [2/2]

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
FilterDelayedSample   
)
private

◆ OPENDDS_MAP_CMP_T()

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP_T ( MessageType  ,
DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)

◆ OPENDDS_MULTIMAP()

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MULTIMAP ( MonotonicTimePoint  ,
DDS::InstanceHandle_t   
)
private

◆ purge_data()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data ( OpenDDS::DCPS::SubscriptionInstance_rch  instance)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1220 of file DataReaderImpl_T.h.

1221  {
1222  drop_sample(instance->instance_handle_);
1223 
1224 
1225  instance->instance_state_->cancel_release();
1226 
1227  while (instance->rcvd_samples_.size() > 0)
1228  {
1230  instance->rcvd_samples_.remove_head();
1231  head->dec_ref();
1232  }
1233  }
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
const InstanceState_rch instance_state_
Instance state for this instance.
void cancel_release()
Cancel a scheduled or pending release of resources.
void drop_sample(DDS::InstanceHandle_t handle)

◆ qos_change()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change ( const DDS::DataReaderQos qos)
inlinevirtual

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 1002 of file DataReaderImpl_T.h.

1003  {
1004  // reliability is not changeable, just time_based_filter
1009  if (qos.time_based_filter.minimum_separation != zero) {
1012  FilterDelayedSampleQueue queue;
1013 
1015  for (typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
1016  FilterDelayedSample& sample = pos->second;
1017  sample.expiration_time = now + (interval - (sample.expiration_time - now));
1018  queue.insert(std::make_pair(sample.expiration_time, pos->first));
1019  }
1021 
1022  if (!filter_delayed_sample_queue_.empty()) {
1023  filter_delayed_sample_task_->cancel();
1024  filter_delayed_sample_task_->schedule(interval);
1025  }
1026 
1027  } else {
1028  filter_delayed_sample_task_->cancel();
1032  }
1033  }
1034  // else no existing timers to change/cancel
1035  }
1036  // else no qos change so nothing to change
1037  }
1038 
1040  }
virtual void qos_change(const DDS::DataReaderQos &qos)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ReliabilityQosPolicyKind kind
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
TimeBasedFilterQosPolicy time_based_filter
RcHandle< DRISporadicTask > filter_delayed_sample_task_
ReliabilityQosPolicy reliability
const long DURATION_ZERO_SEC
Definition: DdsDcpsCore.idl:75
const unsigned long DURATION_ZERO_NSEC
Definition: DdsDcpsCore.idl:76
FilterDelayedSampleMap filter_delayed_sample_map_
FilterDelayedSampleQueue filter_delayed_sample_queue_
void swap(MessageBlock &lhs, MessageBlock &rhs)

◆ read()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 161 of file DataReaderImpl_T.h.

168  {
169  DDS::ReturnCode_t const precond =
170  check_inputs("read", received_data, info_seq, max_samples);
171  if (DDS::RETCODE_OK != precond)
172  {
173  return precond;
174  }
175 
177  guard,
178  sample_lock_,
180 
181  return read_i(received_data, info_seq, max_samples, sample_states,
183  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_generic() [1/3]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 21 of file DynamicDataReaderImpl.cpp.

26  {
28  }
const ReturnCode_t RETCODE_UNSUPPORTED

◆ read_generic() [2/3]

template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count 
)
virtual

◆ read_generic() [3/3]

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count = false 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 736 of file DataReaderImpl_T.h.

741  {
742  MessageSequenceType data;
744  {
746  rc = read_i(data, gen.info_, DDS::LENGTH_UNLIMITED,
748  if (adjust_ref_count) {
749  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(data);
750  received_data_p.increment_references();
751  }
752  }
753  gen.samples_.reserve(data.length());
754  for (CORBA::ULong i = 0; i < data.length(); ++i) {
755  gen.samples_.push_back(&data[i]);
756  }
757  return rc;
758  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const long LENGTH_UNLIMITED
TraitsType::MessageSequenceType MessageSequenceType
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66

◆ read_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1326 of file DataReaderImpl_T.h.

1337 {
1338 
1339  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1340 
1341 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1344  }
1345 
1346  const bool group_coherent_ordered =
1350 
1351  if (group_coherent_ordered && coherent_) {
1352  max_samples = 1;
1353  }
1354 #endif
1355 
1356  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1357 #ifndef OPENDDS_NO_QUERY_CONDITION
1358  a_condition,
1359 #endif
1361 
1363 
1364 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1365  if (!group_coherent_ordered) {
1366 #endif
1368  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1369  ++next; // pre-increment iterator, in case updates cause changes to match set
1370  const DDS::InstanceHandle_t handle = *it;
1371  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1372  if (!inst) continue;
1373 
1374  size_t i(0);
1375  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1376  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1377  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1378 
1379  const ValueDispatcher* vd = get_value_dispatcher();
1380  if (observer && item->registered_data_ && vd) {
1381  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1382  observer->on_sample_read(this, s);
1383  }
1384  }
1385  }
1386 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1387  } else {
1389  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1390  const ValueDispatcher* vd = get_value_dispatcher();
1391  if (observer && item.rde_->registered_data_ && vd) {
1392  typename InstanceMap::iterator i = instance_map_.begin();
1393  const DDS::InstanceHandle_t handle = (i != instance_map_.end()) ? i->second : DDS::HANDLE_NIL;
1394  Observer::Sample s(handle, item.si_->instance_state_->instance_state(), *item.rde_, *vd);
1395  observer->on_sample_read(this, s);
1396  }
1397  }
1398 #endif
1399 
1400  results.copy_to_user();
1401 
1403  if (received_data.length()) {
1404  ret = DDS::RETCODE_OK;
1405  if (received_data.maximum() == 0) { // using ZeroCopy
1406  received_data_p.set_loaner(this);
1407  }
1408  }
1409 
1411  return ret;
1412 }
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
DDS::InstanceStateKind instance_state() const
Access instance state.
const ReturnCode_t RETCODE_NO_DATA
const InstanceHandle_t HANDLE_NIL
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch si_
Definition: RakeData.h:30
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
ReceivedDataElementList * rdel_
Definition: RakeData.h:29
const InstanceState_rch instance_state_
Instance state for this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
PresentationQosPolicy presentation
const ValueDispatcher * get_value_dispatcher() const
ReceivedDataElement * rde_
Definition: RakeData.h:28
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
bool coherent_
Is accessing to Group coherent changes ?
PresentationQosPolicyAccessScopeKind access_scope
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 385 of file DataReaderImpl_T.h.

393  {
394  DDS::ReturnCode_t const precond =
395  check_inputs("read_instance", received_data, info_seq, max_samples);
396  if (DDS::RETCODE_OK != precond)
397  {
398  return precond;
399  }
400 
402  guard,
403  sample_lock_,
405  return read_instance_i(received_data, info_seq, max_samples, a_handle,
407  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_instance_generic()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 788 of file DataReaderImpl_T.h.

792  {
793  MessageSequenceType dataseq;
794  DDS::SampleInfoSeq infoseq;
795  const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
797  instance_states, 0);
798  if (rc != DDS::RETCODE_NO_DATA)
799  {
800  const CORBA::ULong last = dataseq.length() - 1;
801  data = new MessageType(dataseq[last]);
802  info = infoseq[last];
803  }
804  return rc;
805  }
const ReturnCode_t RETCODE_NO_DATA
const long LENGTH_UNLIMITED
TraitsType::MessageSequenceType MessageSequenceType
ACE_CDR::ULong ULong
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
sequence< SampleInfo > SampleInfoSeq

◆ read_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1494 of file DataReaderImpl_T.h.

1506 {
1507  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1508  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1509 
1510  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1511 
1512  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1513 #ifndef OPENDDS_NO_QUERY_CONDITION
1514  a_condition,
1515 #endif
1517 
1518  const InstanceState_rch state_obj = inst->instance_state_;
1519  if (state_obj->match(view_states, instance_states)) {
1521  size_t i(0);
1522  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1523  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1524  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1525  const ValueDispatcher* vd = get_value_dispatcher();
1526  if (observer && item->registered_data_ && vd) {
1527  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1528  observer->on_sample_read(this, s);
1529  }
1530  }
1531  } else if (DCPS_debug_level >= 8) {
1532  OPENDDS_STRING msg;
1533  if ((state_obj->view_state() & view_states) == 0) {
1534  msg = "view state is not valid";
1535  }
1536  if ((state_obj->instance_state() & instance_states) == 0) {
1537  if (!msg.empty()) msg += " and ";
1538  msg += "instance state is ";
1539  msg += state_obj->instance_state_string();
1540  msg += " while the validity mask is " + InstanceState::instance_state_mask_string(instance_states);
1541  }
1542  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl_T::read_instance_i: ")
1543  ACE_TEXT("will return no data reading sub %C because:\n %C\n"),
1544  LogGuid(get_guid()).c_str(), msg.c_str()));
1545  }
1546 
1547  results.copy_to_user();
1548 
1550  if (received_data.length()) {
1551  ret = DDS::RETCODE_OK;
1552  if (received_data.maximum() == 0) { // using ZeroCopy
1553  received_data_p.set_loaner(this);
1554  }
1555  }
1556 
1558  return ret;
1559 }
#define ACE_DEBUG(X)
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const ReturnCode_t RETCODE_BAD_PARAMETER
DDS::InstanceStateKind instance_state() const
Access instance state.
DDS::ViewStateKind view_state() const
Access view state.
const ReturnCode_t RETCODE_NO_DATA
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
#define OPENDDS_STRING
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
const InstanceState_rch instance_state_
Instance state for this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
const char * c_str() const
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
PresentationQosPolicy presentation
const ValueDispatcher * get_value_dispatcher() const
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
static OPENDDS_STRING instance_state_mask_string(DDS::InstanceStateMask mask)
Return string representation of the instance state mask passed.
const char * instance_state_string() const
Return string of the name of the current instance state.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 433 of file DataReaderImpl_T.h.

439  {
440  DDS::ReturnCode_t const precond =
441  check_inputs("read_instance_w_condition", received_data, info_seq,
442  max_samples);
443  if (DDS::RETCODE_OK != precond)
444  {
445  return precond;
446  }
447 
450 
451  if (!has_readcondition(a_condition))
452  {
454  }
455 
456 #ifndef OPENDDS_NO_QUERY_CONDITION
457  DDS::QueryCondition_ptr query_condition =
458  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
459 #endif
460 
461  return read_instance_i(received_data, info_seq, max_samples, a_handle,
462  a_condition->get_sample_state_mask(),
463  a_condition->get_view_state_mask(),
464  a_condition->get_instance_state_mask(),
465 #ifndef OPENDDS_NO_QUERY_CONDITION
466  query_condition
467 #else
468  0
469 #endif
470  );
471  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ read_next_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 513 of file DataReaderImpl_T.h.

521  {
522  DDS::ReturnCode_t const precond =
523  check_inputs("read_next_instance", received_data, info_seq, max_samples);
524  if (DDS::RETCODE_OK != precond)
525  {
526  return precond;
527  }
528 
529  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
531  }
const ReturnCode_t RETCODE_OK
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_next_instance_generic()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 807 of file DataReaderImpl_T.h.

811  {
812  MessageSequenceType dataseq;
813  DDS::SampleInfoSeq infoseq;
814  const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
815  DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
816  instance_states, 0);
817  if (rc != DDS::RETCODE_NO_DATA)
818  {
819  const CORBA::ULong last = dataseq.length() - 1;
820  data = new MessageType(dataseq[last]);
821  info = infoseq[last];
822  }
823  return rc;
824  }
const ReturnCode_t RETCODE_NO_DATA
const long LENGTH_UNLIMITED
TraitsType::MessageSequenceType MessageSequenceType
ACE_CDR::ULong ULong
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
sequence< SampleInfo > SampleInfoSeq

◆ read_next_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1614 of file DataReaderImpl_T.h.

1626 {
1628 
1629  typename InstanceMap::iterator it = instance_map_.begin();
1630  const typename InstanceMap::iterator the_end = instance_map_.end();
1631  if (a_handle != DDS::HANDLE_NIL) {
1632  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1633  if (pos != reverse_instance_map_.end()) {
1634  it = pos->second;
1635  ++it;
1636  } else {
1637  it = the_end;
1638  }
1639  }
1640 
1642  for (; it != the_end; ++it) {
1643  handle = it->second;
1644  const DDS::ReturnCode_t status =
1645  read_instance_i(received_data, info_seq, max_samples, handle,
1647 #ifndef OPENDDS_NO_QUERY_CONDITION
1648  a_condition);
1649 #else
1650  0);
1651 #endif
1652  if (status != DDS::RETCODE_NO_DATA) {
1654  return status;
1655  }
1656  }
1657 
1659  return DDS::RETCODE_NO_DATA;
1660 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_NO_DATA
const InstanceHandle_t HANDLE_NIL
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
ReverseInstanceMap reverse_instance_map_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ read_next_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 553 of file DataReaderImpl_T.h.

559  {
560  DDS::ReturnCode_t const precond =
561  check_inputs("read_next_instance_w_condition", received_data, info_seq,
562  max_samples);
563  if (DDS::RETCODE_OK != precond)
564  {
565  return precond;
566  }
567 
570 
571  if (!has_readcondition(a_condition))
572  {
574  }
575 
576 #ifndef OPENDDS_NO_QUERY_CONDITION
577  DDS::QueryCondition_ptr query_condition =
578  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
579 #endif
580 
581  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
582  a_condition->get_sample_state_mask(),
583  a_condition->get_view_state_mask(),
584  a_condition->get_instance_state_mask(),
585 #ifndef OPENDDS_NO_QUERY_CONDITION
586  query_condition
587 #else
588  0
589 #endif
590  );
591  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ read_next_sample()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info_ref 
)
inlinevirtual

Definition at line 274 of file DataReaderImpl_T.h.

276  {
277  bool found_data = false;
279 
281 
283  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
284  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
285  ++next; // pre-increment iterator, in case updates cause changes to match set
286  const DDS::InstanceHandle_t handle = *it;
287  const SubscriptionInstance_rch inst = get_handle_instance(handle);
288  if (!inst) continue;
289 
290  bool most_recent_generation = false;
291  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
292  !found_data && item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
293  if (item->registered_data_) {
294  received_data = *static_cast<MessageType*>(item->registered_data_);
295  }
296  inst->instance_state_->sample_info(sample_info_ref, item);
297  inst->rcvd_samples_.mark_read(item);
298 
300  if (observer && item->registered_data_ && vd) {
301  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
302  observer->on_sample_read(this, s);
303  }
304 
305  if (!most_recent_generation) {
306  most_recent_generation = inst->instance_state_->most_recent_generation(item);
307  }
308 
309  found_data = true;
310  }
311 
312  if (found_data) {
313  if (most_recent_generation) {
314  inst->instance_state_->accessed();
315  }
316  // Get the sample_ranks, generation_ranks, and
317  // absolute_generation_ranks for this info_seq
318  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
319 
320  break;
321  }
322  }
323 
325  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
326  }
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
InstanceHandle_t instance_handle
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const ReturnCode_t RETCODE_NO_DATA
bool most_recent_generation(ReceivedDataElement *item) const
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
const InstanceStateMask ANY_INSTANCE_STATE
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
const InstanceState_rch instance_state_
Instance state for this instance.
InstanceStateKind instance_state
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
void accessed()
A read or take operation has been performed on this instance.
const ValueDispatcher * get_value_dispatcher() const
const ViewStateMask ANY_VIEW_STATE
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const SampleStateKind NOT_READ_SAMPLE_STATE
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)

◆ read_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 209 of file DataReaderImpl_T.h.

214  {
215  DDS::ReturnCode_t const precond =
216  check_inputs("read_w_condition", received_data, sample_info, max_samples);
217  if (DDS::RETCODE_OK != precond)
218  {
219  return precond;
220  }
221 
224 
225  if (!has_readcondition(a_condition))
226  {
228  }
229 
230  return read_i(received_data, sample_info, max_samples,
231  a_condition->get_sample_state_mask(),
232  a_condition->get_view_state_mask(),
233  a_condition->get_instance_state_mask(),
234 #ifndef OPENDDS_NO_QUERY_CONDITION
235  dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
236 #else
237  0);
238 #endif
239  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ release_all_instances()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_all_instances ( )
inlinevirtual

Release all instances held by the reader.

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1052 of file DataReaderImpl_T.h.

1053  {
1055 
1056  const typename InstanceMap::iterator end = instance_map_.end();
1057  typename InstanceMap::iterator it = instance_map_.begin();
1058  while (it != end) {
1059  const DDS::InstanceHandle_t handle = it->second;
1060  ++it; // it will be invalid, so iterate now.
1061  release_instance(handle);
1062  }
1063  }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
void release_instance(DDS::InstanceHandle_t handle)
Release the instance with the handle.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51

◆ release_instance_i()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i ( DDS::InstanceHandle_t  handle)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1235 of file DataReaderImpl_T.h.

1236  {
1237  const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
1238  if (pos != reverse_instance_map_.end()) {
1239  remove_from_lookup_maps(handle);
1240  instance_map_.erase(pos->second);
1241  reverse_instance_map_.erase(pos);
1242  }
1243  }
void remove_from_lookup_maps(DDS::InstanceHandle_t handle)
ReverseInstanceMap reverse_instance_map_

◆ release_loan()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan ( MessageSequenceType received_data)
inline

Definition at line 695 of file DataReaderImpl_T.h.

696  {
697  received_data.length(0);
698  }

◆ return_loan()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq 
)
inlinevirtual

Definition at line 633 of file DataReaderImpl_T.h.

636  {
637  // Some incomplete tests to see that the data and info are from the
638  // same read.
639  if (received_data.length() != info_seq.length())
640  {
642  }
643 
644  if (received_data.release())
645  {
646  // nothing to do because this is not zero-copy data
647  return DDS::RETCODE_OK;
648  }
649  else
650  {
651  info_seq.length(0);
652  received_data.length(0);
653  }
654  return DDS::RETCODE_OK;
655  }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ set_instance_state_i()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state_i ( DDS::InstanceHandle_t  instance,
DDS::InstanceHandle_t  publication_handle,
DDS::InstanceStateKind  state,
const SystemTimePoint timestamp,
const GUID_t publication_id 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 887 of file DataReaderImpl_T.h.

892  {
893  // sample_lock_ must be held.
894  using namespace OpenDDS::DCPS;
895 
897  if (si && state != DDS::ALIVE_INSTANCE_STATE) {
898  const DDS::Time_t now = timestamp.to_dds_time();
900  header.publication_id_ = publication_id;
901  header.source_timestamp_sec_ = now.sec;
902  header.source_timestamp_nanosec_ = now.nanosec;
903  const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
905  header.message_id_ = static_cast<char>(msg);
906  bool just_registered, filtered;
907  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
908  get_key_value(*data, instance);
909  store_instance_data(move(data), publication_handle, header, si, just_registered, filtered);
910  if (!filtered) {
912  }
913  }
914  }
unsigned long nanosec
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
char message_id_
The enum MessageId.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
const InstanceStateKind ALIVE_INSTANCE_STATE
unique_ptr< DataAllocator > & data_allocator()
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
virtual DDS::ReturnCode_t get_key_value(MessageType &key_holder, DDS::InstanceHandle_t handle)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE

◆ set_marshal_skip_serialize()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_marshal_skip_serialize ( bool  value)
inline

Definition at line 1042 of file DataReaderImpl_T.h.

1043  {
1045  }
const LogLevel::Value value
Definition: debug.cpp:61

◆ state_updated_i()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::state_updated_i ( DDS::InstanceHandle_t  handle)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1245 of file DataReaderImpl_T.h.

1246  {
1247  const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
1248  if (pos != instances_.end()) {
1249  update_lookup_maps(pos);
1250  }
1251  }
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)

◆ store_instance_data()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
DDS::InstanceHandle_t  publication_handle,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance_rch instance_ptr,
bool &  just_registered,
bool &  filtered 
)
inlineprivate

!! caller should already have the sample_lock_

Definition at line 1711 of file DataReaderImpl_T.h.

1717 {
1718  ACE_UNUSED_ARG(publication_handle);
1719 
1720  const bool is_dispose_msg =
1723  const bool is_unregister_msg =
1726 
1727  if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
1728  return;
1729  }
1730 
1731  // not filtering any data, except what is specifically identified as filtered below
1732  filtered = false;
1733 
1735 
1736  //!!! caller should already have the sample_lock_
1737  //We will unlock it before calling into listeners
1738 
1739  typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
1740 
1741  if (it == instance_map_.end()) {
1742  if (is_dispose_msg || is_unregister_msg) {
1743  return;
1744  }
1745 
1746  std::size_t instances_size = 0;
1747  {
1749  instances_size = instances_.size();
1750  }
1752  ((::CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
1753  {
1754  DDS::DataReaderListener_var listener
1756 
1758 
1763 
1764  if (!CORBA::is_nil(listener.in()))
1765  {
1767 
1768  listener->on_sample_rejected(this, sample_rejected_status_);
1770  } // do we want to do something if listener is nil???
1772 
1773  return;
1774  }
1775 
1776  {
1778 
1779 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1780  SharedInstanceMap_rch inst;
1781  OwnershipManagerScopedAccess ownership_scoped_access;
1782  OwnershipManagerPtr owner_manager = ownership_manager();
1783 
1784  bool new_handle = true;
1786  OwnershipManagerScopedAccess temp(owner_manager);
1787  temp.swap(ownership_scoped_access);
1788  if (!owner_manager || ownership_scoped_access.lock_result_ != 0) {
1789  if (DCPS_debug_level > 0) {
1790  ACE_ERROR ((LM_ERROR,
1791  ACE_TEXT("(%P|%t) ")
1792  ACE_TEXT("%CDataReaderImpl::")
1793  ACE_TEXT("store_instance_data, ")
1794  ACE_TEXT("acquire instance_lock failed.\n"), TraitsType::type_name()));
1795  }
1796  return;
1797  }
1798 
1799  inst = dynamic_rchandle_cast<SharedInstanceMap>(
1800  owner_manager->get_instance_map(topic_servant_->type_name(), this));
1801  if (inst != 0) {
1802  typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
1803  if (iter != inst->end ()) {
1804  handle = iter->second;
1805  new_handle = false;
1806  }
1807  }
1808  }
1809 #endif
1810 
1811  just_registered = true;
1812  DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
1813  bool owns_handle = false;
1814  if (handle == DDS::HANDLE_NIL) {
1815  handle = get_next_handle(key);
1816  owns_handle = true;
1817  }
1819  OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
1820  rchandle_from(this),
1821  qos_,
1823  handle, owns_handle);
1824 
1825  const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
1826  instances_.insert(typename SubscriptionInstanceMapType::value_type(handle, instance));
1827 
1828  if (bpair.second == false) {
1829  if (DCPS_debug_level > 0) {
1830  ACE_ERROR((LM_ERROR,
1831  ACE_TEXT("(%P|%t) ")
1832  ACE_TEXT("%CDataReaderImpl::")
1833  ACE_TEXT("store_instance_data, ")
1834  ACE_TEXT("insert handle failed.\n"), TraitsType::type_name()));
1835  }
1836  return;
1837  }
1838  update_lookup_maps(bpair.first);
1839 
1840 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1841  if (owner_manager) {
1842  if (!inst) {
1843  inst = make_rch<SharedInstanceMap>();
1844  owner_manager->set_instance_map(
1845  topic_servant_->type_name(),
1846  inst,
1847  this);
1848  }
1849 
1850  if (new_handle) {
1851  const std::pair<typename InstanceMap::iterator, bool> bpair =
1852  inst->insert(typename InstanceMap::value_type(*instance_data, handle));
1853  if (!bpair.second) {
1854  if (DCPS_debug_level > 0) {
1855  ACE_ERROR ((LM_ERROR,
1856  ACE_TEXT("(%P|%t) ")
1857  ACE_TEXT("%CDataReaderImpl::")
1858  ACE_TEXT("store_instance_data, ")
1859  ACE_TEXT("insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1860  }
1861  return;
1862  }
1863  }
1864 
1865  OwnershipManagerScopedAccess temp;
1866  temp.swap(ownership_scoped_access);
1867  if (temp.release() != 0) {
1868  if (DCPS_debug_level > 0) {
1869  ACE_ERROR ((LM_ERROR,
1870  ACE_TEXT("(%P|%t) ")
1871  ACE_TEXT("%CDataReaderImpl::")
1872  ACE_TEXT("store_instance_data, ")
1873  ACE_TEXT("release instance_lock failed.\n"), TraitsType::type_name()));
1874  }
1875  return;
1876  }
1877  }
1878 #endif
1879  } // scope for instances_lock_
1880 
1881  std::pair<typename InstanceMap::iterator, bool> bpair =
1882  instance_map_.insert(typename InstanceMap::value_type(*instance_data,
1883  handle));
1884  if (bpair.second == false)
1885  {
1886  if (DCPS_debug_level > 0) {
1887  ACE_ERROR ((LM_ERROR,
1888  ACE_TEXT("(%P|%t) ")
1889  ACE_TEXT("%CDataReaderImpl::")
1890  ACE_TEXT("store_instance_data, ")
1891  ACE_TEXT("insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1892  }
1893  return;
1894  }
1895  reverse_instance_map_[handle] = bpair.first;
1896  }
1897  else
1898  {
1899  just_registered = false;
1900  handle = it->second;
1901  }
1902 
1904  {
1905  instance_ptr = get_handle_instance(handle);
1906  OPENDDS_ASSERT(instance_ptr);
1907 
1909  {
1910  {
1912  filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
1913  }
1914 
1915  MonotonicTimePoint now;
1916  MonotonicTimePoint deadline;
1917  if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
1918  filtered = true;
1919  if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1920  delay_sample(handle, move(instance_data), header, just_registered, now, deadline);
1921  }
1922  } else {
1923  // nothing time based filtered now
1924  clear_sample(handle);
1925 
1926  }
1927 
1928  if (filtered) {
1929  return;
1930  }
1931  }
1932 
1933  finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
1934  }
1935  else
1936  {
1937  instance_ptr = get_handle_instance(handle);
1938  OPENDDS_ASSERT(instance_ptr);
1939  instance_ptr->instance_state_->lively(header.publication_id_);
1940  }
1941 }
OwnershipManagerPtr ownership_manager()
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
char message_id_
The enum MessageId.
bool ownership_filter_instance(const SubscriptionInstance_rch &instance, const GUID_t &pubid)
sequence< octet > key
DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t &key)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
const InstanceHandle_t HANDLE_NIL
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:66
ResourceLimitsQosPolicy resource_limits
ACE_Recursive_Thread_Mutex instances_lock_
bool time_based_filter_instance(const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
const long LENGTH_UNLIMITED
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
const InstanceState_rch instance_state_
Instance state for this instance.
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
TopicDescriptionPtr< TopicImpl > topic_servant_
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::SampleRejectedStatus sample_rejected_status_
ACE_TEXT("TCP_Factory")
ReverseInstanceMap reverse_instance_map_
void clear_sample(DDS::InstanceHandle_t handle)
void delay_sample(DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::BuiltinTopicKey_t keyFromSample(TopicType *sample)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const StatusKind SAMPLE_REJECTED_STATUS
bool store_instance_data_check(unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
SampleRejectedStatusKind last_reason
RcHandle< SharedInstanceMap > SharedInstanceMap_rch
Boolean is_nil(T x)

◆ store_instance_data_check()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data_check ( unique_ptr< MessageTypeWithAllocator > &  instance_data,
DDS::InstanceHandle_t  publication_handle,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance_rch instance_ptr 
)
inlineprivate

Definition at line 1259 of file DataReaderImpl_T.h.

1263  {
1264 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
1265  const bool is_dispose_msg =
1268 
1269  if (!is_bit() && security_config_) {
1270  if (header.message_id_ == SAMPLE_DATA ||
1271  header.message_id_ == INSTANCE_REGISTRATION) {
1272 
1273  // Pubulisher has already gone through the check.
1274  if (instance_ptr &&
1275  instance_ptr->instance_state_ &&
1276  instance_ptr->instance_state_->writes_instance(header.publication_id_)) {
1277  return true;
1278  }
1279 
1281  const GUID_t local_participant = make_id(get_guid(), ENTITYID_PARTICIPANT);
1282  const GUID_t remote_participant = make_id(header.publication_id_, ENTITYID_PARTICIPANT);
1283  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1284  // Construct a DynamicData around the deserialized sample.
1285  XTypes::DynamicDataAdapter<MessageType> dda(dynamic_type_, getMetaStruct<MessageType>(), *instance_data);
1286  // The remote participant might not be using security.
1287  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1288  !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle, this, publication_handle, &dda, ex)) {
1289  if (log_level >= LogLevel::Warning) {
1290  ACE_ERROR((LM_WARNING,
1291  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
1292  ex.code, ex.minor_code, ex.message.in()));
1293  }
1294  return false;
1295  }
1296  } else if (is_dispose_msg) {
1297 
1299  const GUID_t local_participant = make_id(get_guid(), ENTITYID_PARTICIPANT);
1300  const GUID_t remote_participant = make_id(header.publication_id_, ENTITYID_PARTICIPANT);
1301  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1302  // Construct a DynamicData around the deserialized sample.
1303  XTypes::DynamicDataAdapter<MessageType> dda(dynamic_type_, getMetaStruct<MessageType>(), *instance_data);
1304  // The remote participant might not be using security.
1305  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1306  !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle, this, publication_handle, &dda, ex)) {
1307  if (log_level >= LogLevel::Warning) {
1308  ACE_ERROR((LM_WARNING,
1309  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
1310  ex.code, ex.minor_code, ex.message.in()));
1311  }
1312  return false;
1313  }
1314  }
1315  }
1316 #else
1317  ACE_UNUSED_ARG(instance_data);
1318  ACE_UNUSED_ARG(publication_handle);
1319  ACE_UNUSED_ARG(header);
1320  ACE_UNUSED_ARG(instance_ptr);
1321 #endif
1322 
1323  return true;
1324  }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
char message_id_
The enum MessageId.
Security::SecurityConfig_rch security_config_
bool writes_instance(const GUID_t &writer_id) const
Returns true if the writer is a writer of this instance.
const InstanceHandle_t HANDLE_NIL
const EntityId_t ENTITYID_PARTICIPANT
Definition: GuidUtils.h:37
const InstanceState_rch instance_state_
Instance state for this instance.
DDS::DynamicType_var dynamic_type_
GUID_t make_id(const GuidPrefix_t &prefix, const EntityId_t &entity)
Definition: GuidUtils.h:200

◆ store_synthetic_data()

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data ( const MessageType &  sample,
DDS::ViewStateKind  view,
const SystemTimePoint timestamp = SystemTimePoint::now() 
)
inline

Definition at line 828 of file DataReaderImpl_T.h.

Referenced by OpenDDS::DCPS::BitSubscriber::add_thread_status(), and OpenDDS::DCPS::StaticEndpointManager::init_bit().

831  {
832  using namespace OpenDDS::DCPS;
835 #ifndef OPENDDS_NO_MULTI_TOPIC
836  DDS::TopicDescription_var descr = get_topicdescription();
837  if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
838  if (!mt->filter(sample)) {
839  return DDS::HANDLE_NIL;
840  }
841  }
842 #endif
843 
844  get_subscriber_servant()->data_received(this);
845 
846  DDS::InstanceHandle_t inst = lookup_instance(sample);
847  bool filtered = false;
848  SubscriptionInstance_rch instance;
849 
850  const DDS::Time_t now = timestamp.to_dds_time();
852  header.source_timestamp_sec_ = now.sec;
853  header.source_timestamp_nanosec_ = now.nanosec;
854 
855  // Call store_instance_data() once or twice, depending on if we need to
856  // process the INSTANCE_REGISTRATION. In either case, store_instance_data()
857  // owns the memory for the sample and it must come from the correct allocator.
858  for (int i = 0; i < 2; ++i) {
859  if (i == 0 && inst != DDS::HANDLE_NIL) continue;
860 
861  const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
862  header.message_id_ = static_cast<char>(msg);
863 
864  bool just_registered;
865  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
866  store_instance_data(move(data), DDS::HANDLE_NIL, header, instance, just_registered, filtered);
867  if (instance) inst = instance->instance_handle_;
868  }
869 
870  if (!filtered) {
871  if (view == DDS::NOT_NEW_VIEW_STATE) {
872  if (instance) instance->instance_state_->accessed();
873  }
875  }
876 
879  if (observer && vd) {
880  Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, header.instance_state(), now, header.sequence_, &sample, *vd);
881  observer->on_sample_received(this, s);
882  }
883 
884  return inst;
885  }
unsigned long nanosec
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
char message_id_
The enum MessageId.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const InstanceHandle_t HANDLE_NIL
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
unique_ptr< DataAllocator > & data_allocator()
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:92
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
const ViewStateKind NOT_NEW_VIEW_STATE
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const InstanceState_rch instance_state_
Instance state for this instance.
virtual DDS::TopicDescription_ptr get_topicdescription()
void accessed()
A read or take operation has been performed on this instance.
const ValueDispatcher * get_value_dispatcher() const
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
RcHandle< SubscriberImpl > get_subscriber_servant()
DDS::InstanceStateKind instance_state() const
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89

◆ take() [1/4]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 32 of file DynamicDataReaderImpl.cpp.

36  {
38  }
const ReturnCode_t RETCODE_UNSUPPORTED

◆ take() [2/4]

◆ take() [3/4]

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 185 of file DataReaderImpl_T.h.

192  {
193  DDS::ReturnCode_t const precond =
194  check_inputs("take", received_data, info_seq, max_samples);
195  if (DDS::RETCODE_OK != precond)
196  {
197  return precond;
198  }
199 
201  guard,
202  sample_lock_,
204 
205  return take_i(received_data, info_seq, max_samples, sample_states,
207  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ take() [4/4]

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 765 of file DataReaderImpl_T.h.

768  {
770  guard,
771  sample_lock_,
773 
774  MessageSequenceType data;
775  DDS::SampleInfoSeq infos;
776  const DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
778 
779  samples.reserve(data.length());
780 
781  for (CORBA::ULong i = 0; i < data.length(); ++i) {
782  samples.push_back(infos[i], &data[i]);
783  }
784 
785  return rc;
786  }
virtual void reserve(CORBA::ULong size)=0
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const long LENGTH_UNLIMITED
TraitsType::MessageSequenceType MessageSequenceType
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
virtual void push_back(const DDS::SampleInfo &info, const void *sample)=0
sequence< SampleInfo > SampleInfoSeq

◆ take_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1414 of file DataReaderImpl_T.h.

1425 {
1426  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1427 
1428 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1431  }
1432 
1433  const bool group_coherent_ordered =
1437 
1438  if (group_coherent_ordered && coherent_) {
1439  max_samples = 1;
1440  }
1441 #endif
1442 
1443  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1444 #ifndef OPENDDS_NO_QUERY_CONDITION
1445  a_condition,
1446 #endif
1448 
1450 
1451 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1452  if (!group_coherent_ordered) {
1453 #endif
1455  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1456  ++next; // pre-increment iterator, in case updates cause changes to match set
1457  const DDS::InstanceHandle_t handle = *it;
1458  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1459  if (!inst) continue;
1460 
1461  size_t i(0);
1462  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1463  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1464  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1465 
1466  const ValueDispatcher* vd = get_value_dispatcher();
1467  if (observer && item->registered_data_ && vd) {
1468  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1469  observer->on_sample_taken(this, s);
1470  }
1471  }
1472  }
1473 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1474  } else {
1476  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1477  }
1478 #endif
1479 
1480  results.copy_to_user();
1481 
1483  if (received_data.length()) {
1484  ret = DDS::RETCODE_OK;
1485  if (received_data.maximum() == 0) { // using ZeroCopy
1486  received_data_p.set_loaner(this);
1487  }
1488  }
1489 
1491  return ret;
1492 }
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
DDS::InstanceStateKind instance_state() const
Access instance state.
const ReturnCode_t RETCODE_NO_DATA
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch si_
Definition: RakeData.h:30
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
ReceivedDataElementList * rdel_
Definition: RakeData.h:29
const InstanceState_rch instance_state_
Instance state for this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
PresentationQosPolicy presentation
const ValueDispatcher * get_value_dispatcher() const
ReceivedDataElement * rde_
Definition: RakeData.h:28
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
bool coherent_
Is accessing to Group coherent changes ?
PresentationQosPolicyAccessScopeKind access_scope
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94

◆ take_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 409 of file DataReaderImpl_T.h.

417  {
418  DDS::ReturnCode_t const precond =
419  check_inputs("take_instance", received_data, info_seq, max_samples);
420  if (DDS::RETCODE_OK != precond)
421  {
422  return precond;
423  }
424 
426  guard,
427  sample_lock_,
429  return take_instance_i(received_data, info_seq, max_samples, a_handle,
431  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ take_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1561 of file DataReaderImpl_T.h.

1573 {
1574  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1575  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1576 
1577  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1578 
1579  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1580 #ifndef OPENDDS_NO_QUERY_CONDITION
1581  a_condition,
1582 #endif
1584 
1585  const InstanceState_rch state_obj = inst->instance_state_;
1586  if (state_obj->match(view_states, instance_states)) {
1588  size_t i(0);
1589  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1590  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1591  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1592  const ValueDispatcher* vd = get_value_dispatcher();
1593  if (observer && item->registered_data_ && vd) {
1594  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1595  observer->on_sample_taken(this, s);
1596  }
1597  }
1598  }
1599 
1600  results.copy_to_user();
1601 
1603  if (received_data.length()) {
1604  ret = DDS::RETCODE_OK;
1605  if (received_data.maximum() == 0) { // using ZeroCopy
1606  received_data_p.set_loaner(this);
1607  }
1608  }
1609 
1611  return ret;
1612 }
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const ReturnCode_t RETCODE_BAD_PARAMETER
DDS::InstanceStateKind instance_state() const
Access instance state.
const ReturnCode_t RETCODE_NO_DATA
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
const InstanceState_rch instance_state_
Instance state for this instance.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
PresentationQosPolicy presentation
const ValueDispatcher * get_value_dispatcher() const
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94

◆ take_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 473 of file DataReaderImpl_T.h.

479  {
480  DDS::ReturnCode_t const precond =
481  check_inputs("take_instance_w_condition", received_data, info_seq,
482  max_samples);
483  if (DDS::RETCODE_OK != precond)
484  {
485  return precond;
486  }
487 
490 
491  if (!has_readcondition(a_condition))
492  {
494  }
495 
496 #ifndef OPENDDS_NO_QUERY_CONDITION
497  DDS::QueryCondition_ptr query_condition =
498  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
499 #endif
500 
501  return take_instance_i(received_data, info_seq, max_samples, a_handle,
502  a_condition->get_sample_state_mask(),
503  a_condition->get_view_state_mask(),
504  a_condition->get_instance_state_mask(),
505 #ifndef OPENDDS_NO_QUERY_CONDITION
506  query_condition
507 #else
508  0
509 #endif
510  );
511  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ take_next_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 533 of file DataReaderImpl_T.h.

541  {
542  DDS::ReturnCode_t const precond =
543  check_inputs("take_next_instance", received_data, info_seq, max_samples);
544  if (DDS::RETCODE_OK != precond)
545  {
546  return precond;
547  }
548 
549  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
551  }
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ take_next_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1662 of file DataReaderImpl_T.h.

1674 {
1676 
1677  typename InstanceMap::iterator it = instance_map_.begin();
1678  const typename InstanceMap::iterator the_end = instance_map_.end();
1679  if (a_handle != DDS::HANDLE_NIL) {
1680  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1681  if (pos != reverse_instance_map_.end()) {
1682  it = pos->second;
1683  ++it;
1684  } else {
1685  it = the_end;
1686  }
1687  }
1688 
1690  for (; it != the_end; ++it) {
1691  handle = it->second;
1692  const DDS::ReturnCode_t status =
1693  take_instance_i(received_data, info_seq, max_samples, handle,
1695 #ifndef OPENDDS_NO_QUERY_CONDITION
1696  a_condition);
1697 #else
1698  0);
1699 #endif
1700  if (status != DDS::RETCODE_NO_DATA) {
1701  total_samples(); // see if we are empty
1703  return status;
1704  }
1705  }
1706 
1708  return DDS::RETCODE_NO_DATA;
1709 }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_NO_DATA
const InstanceHandle_t HANDLE_NIL
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:66
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
ReverseInstanceMap reverse_instance_map_
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:66
CORBA::Long total_samples() const
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66

◆ take_next_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 593 of file DataReaderImpl_T.h.

599  {
600  DDS::ReturnCode_t const precond =
601  check_inputs("take_next_instance_w_condition", received_data, info_seq,
602  max_samples);
603  if (DDS::RETCODE_OK != precond)
604  {
605  return precond;
606  }
607 
610 
611  if (!has_readcondition(a_condition))
612  {
614  }
615 
616 #ifndef OPENDDS_NO_QUERY_CONDITION
617  DDS::QueryCondition_ptr query_condition =
618  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
619 #endif
620 
621  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
622  a_condition->get_sample_state_mask(),
623  a_condition->get_view_state_mask(),
624  a_condition->get_instance_state_mask(),
625 #ifndef OPENDDS_NO_QUERY_CONDITION
626  query_condition
627 #else
628  0
629 #endif
630  );
631  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:66
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ take_next_sample()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info_ref 
)
inlinevirtual

Definition at line 328 of file DataReaderImpl_T.h.

330  {
331  bool found_data = false;
333 
335 
337  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
338  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
339  ++next; // pre-increment iterator, in case updates cause changes to match set
340  const DDS::InstanceHandle_t handle = *it;
341  const SubscriptionInstance_rch inst = get_handle_instance(handle);
342  if (!inst) continue;
343 
344  bool most_recent_generation = false;
345  ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
346  if (item) {
347  if (item->registered_data_) {
348  received_data = *static_cast<MessageType*>(item->registered_data_);
349  }
350  inst->instance_state_->sample_info(sample_info_ref, item);
351  inst->rcvd_samples_.mark_read(item);
352 
354  if (observer && item->registered_data_ && vd) {
355  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
356  observer->on_sample_taken(this, s);
357  }
358 
359  if (!most_recent_generation) {
360  most_recent_generation = inst->instance_state_->most_recent_generation(item);
361  }
362 
363  if (most_recent_generation) {
364  inst->instance_state_->accessed();
365  }
366 
367  // Get the sample_ranks, generation_ranks, and
368  // absolute_generation_ranks for this info_seq
369  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
370 
371  inst->rcvd_samples_.remove(item);
372  item->dec_ref();
373  item = 0;
374 
375  found_data = true;
376 
377  break;
378  }
379  }
380 
382  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
383  }
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
InstanceHandle_t instance_handle
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const ReturnCode_t RETCODE_NO_DATA
bool most_recent_generation(ReceivedDataElement *item) const
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
const InstanceStateMask ANY_INSTANCE_STATE
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
const InstanceState_rch instance_state_
Instance state for this instance.
InstanceStateKind instance_state
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:66
void accessed()
A read or take operation has been performed on this instance.
const ValueDispatcher * get_value_dispatcher() const
const ViewStateMask ANY_VIEW_STATE
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const SampleStateKind NOT_READ_SAMPLE_STATE
bool remove(ReceivedDataElement *data_sample)
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94

◆ take_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 241 of file DataReaderImpl_T.h.

246  {
247  DDS::ReturnCode_t const precond =
248  check_inputs("take_w_condition", received_data, sample_info, max_samples);
249  if (DDS::RETCODE_OK != precond)
250  {
251  return precond;
252  }
253 
256 
257  if (!has_readcondition(a_condition))
258  {
260  }
261 
262  return take_i(received_data, sample_info, max_samples,
263  a_condition->get_sample_state_mask(),
264  a_condition->get_view_state_mask(),
265  a_condition->get_instance_state_mask(),
266 #ifndef OPENDDS_NO_QUERY_CONDITION
267  dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
268 #else
269  0
270 #endif
271  );
272  }
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:66
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

Member Data Documentation

◆ data_allocator_

template<typename MessageType>
unique_ptr<DataAllocator> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator_
private

Definition at line 2413 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_map_

template<typename MessageType>
FilterDelayedSampleMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_map_
private

Definition at line 2438 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_queue_

template<typename MessageType>
FilterDelayedSampleQueue OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_queue_
private

Definition at line 2440 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_task_

template<typename MessageType>
RcHandle<DRISporadicTask> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_task_
private

Definition at line 2420 of file DataReaderImpl_T.h.

◆ instance_map_

template<typename MessageType>
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_
private

Definition at line 2415 of file DataReaderImpl_T.h.

◆ marshal_skip_serialize_

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::marshal_skip_serialize_
private

Definition at line 2442 of file DataReaderImpl_T.h.

◆ reverse_instance_map_

template<typename MessageType>
ReverseInstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::reverse_instance_map_
private

Definition at line 2416 of file DataReaderImpl_T.h.


The documentation for this class was generated from the following file: