OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Types | Public Member Functions | Protected Member Functions | Private Types | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::DataReaderImpl_T< MessageType > Class Template Reference

#include <DataReaderImpl_T.h>

Inheritance diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataReaderImpl_T< MessageType >:
Collaboration graph
[legend]

Classes

struct  FilterDelayedSample
 
struct  MessageTypeMemoryBlock
 
class  MessageTypeWithAllocator
 
class  SharedInstanceMap
 

Public Types

typedef DDSTraits< MessageType > TraitsType
 
typedef MarshalTraits< MessageType > MarshalTraitsType
 
typedef TraitsType::MessageSequenceType MessageSequenceType
 
typedef RcHandle< SharedInstanceMapSharedInstanceMap_rch
 
typedef TraitsType::DataReaderType Interface
 
typedef OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_MutexDataAllocator
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDSTraits< MessageType >::DataReaderType >
typedef DDSTraits< MessageType >::DataReaderType ::_ptr_type _ptr_type
 
typedef DDSTraits< MessageType >::DataReaderType ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::DataReaderImpl
typedef std::pair< GUID_t, WriterInfo::WriterStateWriterStatePair
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
typedef DataReaderEx ::_ptr_type _ptr_type
 
typedef DataReaderEx ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

typedef OPENDDS_MAP_CMP_T (MessageType, DDS::InstanceHandle_t, typename TraitsType::LessThanType) InstanceMap
 
typedef OPENDDS_MAP (DDS::InstanceHandle_t, typename InstanceMap::iterator) ReverseInstanceMap
 
CORBA::Boolean _is_a (const char *type_id)
 
const char * _interface_repository_id () const
 
CORBA::Boolean marshal (TAO_OutputCDR &)
 
 DataReaderImpl_T ()
 
virtual ~DataReaderImpl_T ()
 
virtual DDS::ReturnCode_t enable_specific ()
 
virtual DDS::ReturnCode_t read (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &sample_info, ::CORBA::Long max_samples, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t read_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info_ref)
 
virtual DDS::ReturnCode_t take_next_sample (MessageType &received_data, DDS::SampleInfo &sample_info_ref)
 
virtual DDS::ReturnCode_t read_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t read_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t take_next_instance (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::ReturnCode_t read_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t take_next_instance_w_condition (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t return_loan (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq)
 
virtual DDS::ReturnCode_t get_key_value (MessageType &key_holder, DDS::InstanceHandle_t handle)
 
virtual DDS::InstanceHandle_t lookup_instance (const MessageType &instance_data)
 
virtual DDS::ReturnCode_t auto_return_loan (void *seq)
 
void release_loan (MessageSequenceType &received_data)
 
bool contains_sample_filtered (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const FilterEvaluator &evaluator, const DDS::StringSeq &params)
 
DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count=false)
 
DDS::InstanceHandle_t lookup_instance_generic (const void *data)
 
virtual DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::ReturnCode_t read_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::ReturnCode_t read_next_instance_generic (void *&data, DDS::SampleInfo &info, DDS::InstanceHandle_t previous_instance, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
DDS::InstanceHandle_t store_synthetic_data (const MessageType &sample, DDS::ViewStateKind view, const SystemTimePoint &timestamp=SystemTimePoint::now())
 
void set_instance_state_i (DDS::InstanceHandle_t instance, DDS::InstanceHandle_t publication_handle, DDS::InstanceStateKind state, const SystemTimePoint &timestamp, const GUID_t &publication_id)
 
virtual void lookup_instance (const OpenDDS::DCPS::ReceivedDataSample &sample, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
 
virtual void qos_change (const DDS::DataReaderQos &qos)
 
void set_marshal_skip_serialize (bool value)
 
bool get_marshal_skip_serialize () const
 
void release_all_instances ()
 Release all instances held by the reader. More...
 
template<>
DDS::ReturnCode_t read_generic (GenericBundle &, DDS::SampleStateMask, DDS::ViewStateMask, DDS::InstanceStateMask, bool)
 
template<>
DDS::ReturnCode_t take (AbstractSamples &, DDS::SampleStateMask, DDS::ViewStateMask, DDS::InstanceStateMask)
 
template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t read_generic (GenericBundle &gen, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, bool adjust_ref_count)
 
template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t take (AbstractSamples &samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
typedef OPENDDS_MAP (DDS::InstanceHandle_t, SubscriptionInstance_rch) SubscriptionInstanceMapType
 
typedef OPENDDS_SET (DDS::InstanceHandle_t) InstanceSet
 
typedef OPENDDS_SET (SubscriptionInstance_rch) SubscriptionInstanceSet
 
typedef OPENDDS_MAP_CMP (GUID_t, WriterStats, GUID_tKeyLessThan) StatsMapType
 Type of collection of statistics for writers to this reader. More...
 
 DataReaderImpl ()
 
virtual ~DataReaderImpl ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
virtual void add_association (const GUID_t &yourId, const WriterAssociation &writer, bool active)
 
virtual void transport_assoc_done (int flags, const GUID_t &remote_id)
 
virtual void remove_associations (const WriterIdSeq &writers, bool callback)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void signal_liveliness (const GUID_t &remote_participant)
 
DDS::DataReaderListener_ptr listener_for (DDS::StatusKind kind)
 
void writer_became_alive (WriterInfo &info, const MonotonicTimePoint &when)
 
void writer_became_dead (WriterInfo &info)
 
void writer_removed (WriterInfo &info)
 
virtual void cleanup ()
 
void init (TopicDescriptionImpl *a_topic_desc, const DDS::DataReaderQos &qos, DDS::DataReaderListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant, SubscriberImpl *subscriber)
 
virtual DDS::ReadCondition_ptr create_readcondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
virtual DDS::QueryCondition_ptr create_querycondition (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, const char *query_expression, const DDS::StringSeq &query_parameters)
 
virtual DDS::ReturnCode_t delete_readcondition (DDS::ReadCondition_ptr a_condition)
 
virtual DDS::ReturnCode_t delete_contained_entities ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::DataReaderQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::DataReaderListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::DataReaderListener_ptr get_listener ()
 
virtual DDS::TopicDescription_ptr get_topicdescription ()
 
virtual DDS::Subscriber_ptr get_subscriber ()
 
virtual DDS::ReturnCode_t get_sample_rejected_status (DDS::SampleRejectedStatus &status)
 
virtual DDS::ReturnCode_t get_liveliness_changed_status (DDS::LivelinessChangedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_deadline_missed_status (DDS::RequestedDeadlineMissedStatus &status)
 
virtual DDS::ReturnCode_t get_requested_incompatible_qos_status (DDS::RequestedIncompatibleQosStatus &status)
 
virtual DDS::ReturnCode_t get_subscription_matched_status (DDS::SubscriptionMatchedStatus &status)
 
virtual DDS::ReturnCode_t get_sample_lost_status (DDS::SampleLostStatus &status)
 
virtual DDS::ReturnCode_t wait_for_historical_data (const DDS::Duration_t &max_wait)
 
virtual DDS::ReturnCode_t get_matched_publications (DDS::InstanceHandleSeq &publication_handles)
 
virtual DDS::ReturnCode_t get_matched_publication_data (DDS::PublicationBuiltinTopicData &publication_data, DDS::InstanceHandle_t publication_handle)
 
virtual DDS::ReturnCode_t enable ()
 
virtual void get_latency_stats (LatencyStatisticsSeq &stats)
 
virtual void reset_latency_stats ()
 Clear any intermediate statistical values. More...
 
virtual CORBA::Boolean statistics_enabled ()
 
virtual void statistics_enabled (CORBA::Boolean statistics_enabled)
 
void writer_activity (const DataSampleHeader &header)
 update liveliness info for this writer. More...
 
virtual void data_received (const ReceivedDataSample &sample)
 process a message that has been received - could be control or a data sample. More...
 
void transport_discovery_change ()
 
virtual bool check_transport_qos (const TransportInst &inst)
 
bool have_sample_states (DDS::SampleStateMask sample_states) const
 
bool have_view_states (DDS::ViewStateMask view_states) const
 
bool have_instance_states (DDS::InstanceStateMask instance_states) const
 
bool contains_sample (DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
void process_latency (const ReceivedDataSample &sample)
 
void notify_latency (GUID_t writer)
 
size_t get_depth () const
 
size_t get_n_chunks () const
 
void liveliness_lost ()
 
void remove_all_associations ()
 
void notify_subscription_disconnected (const WriterIdSeq &pubids)
 
void notify_subscription_reconnected (const WriterIdSeq &pubids)
 
void notify_subscription_lost (const WriterIdSeq &pubids)
 
void notify_liveliness_change ()
 
bool is_bit () const
 
bool has_zero_copies ()
 
void release_instance (DDS::InstanceHandle_t handle)
 Release the instance with the handle. More...
 
void state_updated (DDS::InstanceHandle_t handle)
 
ACE_Reactor_Timer_Interfaceget_reactor ()
 
GUID_t get_topic_id ()
 
GUID_t get_dp_id ()
 
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
 
void get_instance_handles (InstanceHandleVec &instance_handles)
 
typedef OPENDDS_VECTOR (WriterStatePair) WriterStatePairVec
 
void get_writer_states (WriterStatePairVec &writer_states)
 
void update_ownership_strength (const GUID_t &pub_id, const CORBA::Long &ownership_strength)
 
OwnershipManagerPtr ownership_manager ()
 
void enable_filtering (ContentFilteredTopicImpl *cft)
 
DDS::ContentFilteredTopic_ptr get_cf_topic () const
 
void enable_multi_topic (MultiTopicImpl *mt)
 
void update_subscription_params (const DDS::StringSeq &params) const
 
typedef OPENDDS_VECTOR (void *) GenericSeq
 
void set_instance_state (DDS::InstanceHandle_t instance, DDS::InstanceStateKind state, const SystemTimePoint &timestamp=SystemTimePoint::now(), const GUID_t &guid=GUID_UNKNOWN)
 
void begin_access ()
 
void end_access ()
 
void get_ordered_data (GroupRakeData &data, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states)
 
void accept_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void reject_coherent (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void coherent_change_received (const GUID_t &publisher_id, Coherent_State &result)
 
void coherent_changes_completed (DataReaderImpl *reader)
 
void reset_coherent_info (const GUID_t &writer_id, const GUID_t &publisher_id)
 
void set_subscriber_qos (const DDS::SubscriberQos &qos)
 
void reset_ownership (DDS::InstanceHandle_t instance)
 
virtual RcHandle< EntityImplparent () const
 
void disable_transport ()
 
virtual void register_for_writer (const GUID_t &, const GUID_t &, const GUID_t &, const TransportLocatorSeq &, DiscoveryListener *)
 
virtual void unregister_for_writer (const GUID_t &, const GUID_t &, const GUID_t &)
 
virtual void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
GUID_t get_guid () const
 
void return_handle (DDS::InstanceHandle_t handle)
 
const ValueDispatcherget_value_dispatcher () const
 
const StatsMapType & raw_latency_statistics () const
 Expose the statistics container. More...
 
unsigned int & raw_latency_buffer_size ()
 Configure the size of the raw data collection buffer. More...
 
DataCollector< double >::OnFull & raw_latency_buffer_type ()
 Configure the type of the raw data collection buffer. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderEx
void get_latency_stats (inout LatencyStatisticsSeq stats)
 Obtain a sequence of statistics summaries. More...
 
- Public Member Functions inherited from DDS::DataReader
ReadCondition create_readcondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states)
 
QueryCondition create_querycondition (in SampleStateMask sample_states, in ViewStateMask view_states, in InstanceStateMask instance_states, in string query_expression, in StringSeq query_parameters)
 
ReturnCode_t delete_readcondition (in ReadCondition a_condition)
 
ReturnCode_t set_qos (in DataReaderQos qos)
 
ReturnCode_t get_qos (inout DataReaderQos qos)
 
ReturnCode_t set_listener (in DataReaderListener a_listener, in StatusMask mask)
 
ReturnCode_t get_sample_rejected_status (inout SampleRejectedStatus status)
 
ReturnCode_t get_liveliness_changed_status (inout LivelinessChangedStatus status)
 
ReturnCode_t get_requested_deadline_missed_status (inout RequestedDeadlineMissedStatus status)
 
ReturnCode_t get_requested_incompatible_qos_status (inout RequestedIncompatibleQosStatus status)
 
ReturnCode_t get_subscription_matched_status (inout SubscriptionMatchedStatus status)
 
ReturnCode_t get_sample_lost_status (inout SampleLostStatus status)
 
ReturnCode_t wait_for_historical_data (in Duration_t max_wait)
 
ReturnCode_t get_matched_publications (inout InstanceHandleSeq publication_handles)
 
ReturnCode_t get_matched_publication_data (inout PublicationBuiltinTopicData publication_data, in InstanceHandle_t publication_handle)
 
- Public Member Functions inherited from OpenDDS::DCPS::DataReaderCallbacks
 DataReaderCallbacks ()
 
virtual ~DataReaderCallbacks ()
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
virtual ~TransportReceiveListener ()
 

Protected Member Functions

virtual RcHandle< MessageHolderdds_demarshal (const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)
 
virtual void dispose_unregister (const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance)
 
virtual void purge_data (OpenDDS::DCPS::SubscriptionInstance_rch instance)
 
virtual void release_instance_i (DDS::InstanceHandle_t handle)
 
virtual void state_updated_i (DDS::InstanceHandle_t handle)
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
typedef OPENDDS_SET (DDS::InstanceHandle_t) HandleSet
 
typedef OPENDDS_MAP (CORBA::ULong, HandleSet) LookupMap
 
void initialize_lookup_maps ()
 
void update_lookup_maps (const SubscriptionInstanceMapType::iterator &input)
 
void remove_from_lookup_maps (DDS::InstanceHandle_t handle)
 
const HandleSet & lookup_matching_instances (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
 
DataReaderListener_ptr get_ext_listener ()
 
virtual void remove_associations_i (const WriterIdSeq &writers, bool callback)
 
void prepare_to_delete ()
 
DDS::ReturnCode_t setup_deserialization ()
 Setup deserialization options. More...
 
RcHandle< SubscriberImplget_subscriber_servant ()
 
void post_read_or_take ()
 
void sample_info (DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
 
CORBA::Long total_samples () const
 
void set_sample_lost_status (const DDS::SampleLostStatus &status)
 
void set_sample_rejected_status (const DDS::SampleRejectedStatus &status)
 
SubscriptionInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
 
DDS::InstanceHandle_t get_next_handle (const DDS::BuiltinTopicKey_t &key)
 
bool has_readcondition (DDS::ReadCondition_ptr a_condition)
 
bool filter_sample (const DataSampleHeader &header)
 
bool ownership_filter_instance (const SubscriptionInstance_rch &instance, const GUID_t &pubid)
 
bool time_based_filter_instance (const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
 
void accept_sample_processing (const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
 
void notify_read_conditions ()
 Data has arrived into the cache, unblock waiting ReadConditions. More...
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
typedef OPENDDS_SET (Encoding::Kind) EncodingKinds
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportReceiveListener
 TransportReceiveListener ()
 

Private Types

typedef DCPS::PmfSporadicTask< DataReaderImpl_TDRISporadicTask
 
typedef ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_MutexDataSampleHeader_ptr
 

Private Member Functions

void dynamic_hook (MessageType &)
 
bool store_instance_data_check (unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
 
DDS::ReturnCode_t read_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t read_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t read_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
DDS::ReturnCode_t take_next_instance_i (MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
 
void store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
 
void finish_store_instance_data (unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
 
void notify_status_condition_no_sample_lock ()
 
DDS::ReturnCode_t check_inputs (const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
 Common input read* & take* input processing and precondition checks. More...
 
void delay_sample (DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
 
void clear_sample (DDS::InstanceHandle_t handle)
 
void drop_sample (DDS::InstanceHandle_t handle)
 
void filter_delayed (const MonotonicTimePoint &now)
 
unique_ptr< DataAllocator > & data_allocator ()
 
typedef OPENDDS_MAP (DDS::InstanceHandle_t, FilterDelayedSample) FilterDelayedSampleMap
 
typedef OPENDDS_MULTIMAP (MonotonicTimePoint, DDS::InstanceHandle_t) FilterDelayedSampleQueue
 
template<>
void dynamic_hook (XTypes::DynamicSample &sample)
 
template<>
void dynamic_hook (XTypes::DynamicSample &sample)
 

Private Attributes

unique_ptr< DataAllocatordata_allocator_
 
InstanceMap instance_map_
 
ReverseInstanceMap reverse_instance_map_
 
RcHandle< DRISporadicTaskfilter_delayed_sample_task_
 
FilterDelayedSampleMap filter_delayed_sample_map_
 
FilterDelayedSampleQueue filter_delayed_sample_queue_
 
bool marshal_skip_serialize_
 

Additional Inherited Members

- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDSTraits< MessageType >::DataReaderType >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DataReaderEx >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Public Attributes inherited from OpenDDS::DCPS::DataReaderEx
attribute boolean statistics_enabled
 Statistics gathering enable state. More...
 
- Protected Types inherited from OpenDDS::DCPS::DataReaderImpl
typedef ACE_Reverse_Lock< ACE_Recursive_Thread_MutexReverse_Lock_t
 
- Static Protected Member Functions inherited from OpenDDS::DCPS::DataReaderImpl
static CORBA::ULong to_combined_states (CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states)
 
static void split_combined_states (CORBA::ULong combined, CORBA::ULong &sample_states, CORBA::ULong &view_states, CORBA::ULong &instance_states)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::DataReaderImpl
LookupMap combined_state_lookup_
 
SubscriptionInstanceMapType instances_
 : document why the instances_ container is mutable. More...
 
ACE_Recursive_Thread_Mutex instances_lock_
 
bool has_subscription_id_
 
ACE_Thread_Mutex subscription_id_mutex_
 
ConditionVariable< ACE_Thread_Mutexsubscription_id_condition_
 
unique_ptr< ReceivedDataAllocatorrd_allocator_
 
DDS::DataReaderQos qos_
 
DDS::DataReaderQos passed_qos_
 
DDS::SampleRejectedStatus sample_rejected_status_
 
DDS::SampleLostStatus sample_lost_status_
 
ACE_Recursive_Thread_Mutex sample_lock_
 lock protecting sample container as well as statuses. More...
 
Reverse_Lock_t reverse_sample_lock_
 
WeakRcHandle< DomainParticipantImplparticipant_servant_
 
TopicDescriptionPtr< TopicImpltopic_servant_
 
TypeSupportImpltype_support_
 
GUID_t topic_id_
 
bool is_exclusive_ownership_
 
ACE_Thread_Mutex content_filtered_topic_mutex_
 
TopicDescriptionPtr< ContentFilteredTopicImplcontent_filtered_topic_
 
TopicDescriptionPtr< MultiTopicImplmulti_topic_
 
bool coherent_
 Is accessing to Group coherent changes ? More...
 
GroupRakeData group_coherent_ordered_data_
 Ordered group samples. More...
 
DDS::SubscriberQos subqos_
 
EncodingKinds decoding_modes_
 
Security::SecurityConfig_rch security_config_
 
DDS::DynamicType_var dynamic_type_
 
TransportMessageBlockAllocator mb_alloc_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 
- Static Protected Attributes inherited from OpenDDS::DCPS::DataReaderImpl
static const CORBA::ULong MAX_SAMPLE_STATE_FLAG = DDS::NOT_READ_SAMPLE_STATE
 
static const CORBA::ULong MAX_SAMPLE_STATE_MASK = (MAX_SAMPLE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_SAMPLE_STATE_BITS = 2u
 
static const CORBA::ULong MAX_VIEW_STATE_FLAG = DDS::NOT_NEW_VIEW_STATE
 
static const CORBA::ULong MAX_VIEW_STATE_MASK = (MAX_VIEW_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_VIEW_STATE_BITS = 2u
 
static const CORBA::ULong MAX_INSTANCE_STATE_FLAG = DDS::NOT_ALIVE_NO_WRITERS_INSTANCE_STATE
 
static const CORBA::ULong MAX_INSTANCE_STATE_MASK = (MAX_INSTANCE_STATE_FLAG << 1) - 1
 
static const CORBA::ULong MAX_INSTANCE_STATE_BITS = 3u
 
static const CORBA::ULong COMBINED_VIEW_STATE_SHIFT = MAX_INSTANCE_STATE_BITS
 
static const CORBA::ULong COMBINED_SAMPLE_STATE_SHIFT = COMBINED_VIEW_STATE_SHIFT + MAX_VIEW_STATE_BITS
 

Detailed Description

template<typename MessageType>
class OpenDDS::DCPS::DataReaderImpl_T< MessageType >

Servant for DataReader interface of Traits::MessageType data type.

See the DDS specification, OMG formal/2015-04-10, for a description of this interface.

Definition at line 41 of file DataReaderImpl_T.h.

Member Typedef Documentation

◆ DataAllocator

Definition at line 118 of file DataReaderImpl_T.h.

◆ DataSampleHeader_ptr

Definition at line 2426 of file DataReaderImpl_T.h.

◆ DRISporadicTask

template<typename MessageType>
typedef DCPS::PmfSporadicTask<DataReaderImpl_T> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DRISporadicTask
private

Definition at line 2420 of file DataReaderImpl_T.h.

◆ Interface

template<typename MessageType>
typedef TraitsType::DataReaderType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::Interface

Definition at line 66 of file DataReaderImpl_T.h.

◆ MarshalTraitsType

template<typename MessageType>
typedef MarshalTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MarshalTraitsType

Definition at line 51 of file DataReaderImpl_T.h.

◆ MessageSequenceType

template<typename MessageType>
typedef TraitsType::MessageSequenceType OpenDDS::DCPS::DataReaderImpl_T< MessageType >::MessageSequenceType

Definition at line 52 of file DataReaderImpl_T.h.

◆ SharedInstanceMap_rch

template<typename MessageType>
typedef RcHandle<SharedInstanceMap> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::SharedInstanceMap_rch

Definition at line 64 of file DataReaderImpl_T.h.

◆ TraitsType

template<typename MessageType>
typedef DDSTraits<MessageType> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::TraitsType

Definition at line 50 of file DataReaderImpl_T.h.

Constructor & Destructor Documentation

◆ DataReaderImpl_T()

template<typename MessageType>
OpenDDS::DCPS::DataReaderImpl_T< MessageType >::DataReaderImpl_T ( )
inline

Definition at line 120 of file DataReaderImpl_T.h.

121  : filter_delayed_sample_task_(make_rch<DRISporadicTask>(TheServiceParticipant->time_source(), TheServiceParticipant->interceptor(), rchandle_from(this), &DataReaderImpl_T::filter_delayed))
122  , marshal_skip_serialize_(false)
123  {
125  }
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
void filter_delayed(const MonotonicTimePoint &now)
RcHandle< DRISporadicTask > filter_delayed_sample_task_
#define TheServiceParticipant

◆ ~DataReaderImpl_T()

template<typename MessageType>
virtual OpenDDS::DCPS::DataReaderImpl_T< MessageType >::~DataReaderImpl_T ( )
inlinevirtual

Definition at line 127 of file DataReaderImpl_T.h.

128  {
129  filter_delayed_sample_task_->cancel();
130 
131  for (typename InstanceMap::iterator it = instance_map_.begin();
132  it != instance_map_.end(); ++it)
133  {
135  if (!ptr) continue;
136  purge_data(ptr);
137  }
138  //X SHH release the data samples in the instance_map_.
139  }
virtual void purge_data(OpenDDS::DCPS::SubscriptionInstance_rch instance)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
RcHandle< DRISporadicTask > filter_delayed_sample_task_

Member Function Documentation

◆ _interface_repository_id()

template<typename MessageType>
const char* OpenDDS::DCPS::DataReaderImpl_T< MessageType >::_interface_repository_id ( void  ) const
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 73 of file DataReaderImpl_T.h.

74  {
75  return Interface::_interface_repository_id();
76  }

◆ _is_a()

template<typename MessageType>
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl_T< MessageType >::_is_a ( const char *  type_id)
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 68 of file DataReaderImpl_T.h.

69  {
70  return Interface::_is_a(type_id);
71  }

◆ auto_return_loan()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::auto_return_loan ( void *  seq)
inlinevirtual

Definition at line 682 of file DataReaderImpl_T.h.

683  {
684  MessageSequenceType& received_data =
685  *static_cast< MessageSequenceType*> (seq);
686 
687  if (!received_data.release())
688  {
689  // release_loan(received_data);
690  received_data.length(0);
691  }
692  return DDS::RETCODE_OK;
693  }
TraitsType::MessageSequenceType MessageSequenceType
const ReturnCode_t RETCODE_OK

◆ check_inputs()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::check_inputs ( const char *  method_name,
MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples 
)
inlineprivate

Common input read* & take* input processing and precondition checks.

Definition at line 2180 of file DataReaderImpl_T.h.

2184 {
2185  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
2186 
2187  // ---- start of preconditions common to read and take -----
2188  // SPEC ref v1.2 7.1.2.5.3.8 #1
2189  // NOTE: We can't check maximum() or release() here since those are
2190  // implementation details of the sequences. In general, the
2191  // info_seq will have release() == true and maximum() == 0.
2192  // If we're in zero-copy mode, the received_data will have
2193  // release() == false and maximum() == 0. If it's not
2194  // zero-copy then received_data will have release == true()
2195  // and maximum() == anything.
2196  if (received_data.length() != info_seq.length())
2197  {
2198  ACE_DEBUG((LM_DEBUG,
2199  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2200  ACE_TEXT("PRECONDITION_NOT_MET sample and info input ")
2201  ACE_TEXT("sequences do not match.\n"),
2202  TraitsType::type_name(),
2203  method_name ));
2205  }
2206 
2207  //SPEC ref v1.2 7.1.2.5.3.8 #4
2208  if ((received_data.maximum() > 0) && (received_data.release() == false))
2209  {
2210  ACE_DEBUG((LM_DEBUG,
2211  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2212  ACE_TEXT("PRECONDITION_NOT_MET mismatch of ")
2213  ACE_TEXT("maximum %d and owns %d\n"),
2214  TraitsType::type_name(),
2215  method_name,
2216  received_data.maximum(),
2217  received_data.release() ));
2218 
2220  }
2221 
2222  if (received_data.maximum() == 0)
2223  {
2224  // not in SPEC but needed.
2226  {
2227  max_samples =
2228  static_cast< ::CORBA::Long> (received_data_p.max_slots());
2229  }
2230  }
2231  else
2232  {
2234  {
2235  //SPEC ref v1.2 7.1.2.5.3.8 #5a
2236  max_samples = received_data.maximum();
2237  }
2238  else if (
2239  max_samples > static_cast< ::CORBA::Long> (received_data.maximum()))
2240  {
2241  //SPEC ref v1.2 7.1.2.5.3.8 #5c
2242  ACE_DEBUG((LM_DEBUG,
2243  ACE_TEXT("(%P|%t) %CDataReaderImpl::%C ")
2244  ACE_TEXT("PRECONDITION_NOT_MET max_samples %d > maximum %d\n"),
2245  TraitsType::type_name(),
2246  method_name,
2247  max_samples,
2248  received_data.maximum()));
2250  }
2251  //else
2252  //SPEC ref v1.2 7.1.2.5.3.8 #5b - is true by impl below.
2253  }
2254 
2255  // The spec does not say what to do in this case but it appears to be a good thing.
2256  // Note: max_slots is the greater of the sequence's maximum and init_size.
2257  if (static_cast< ::CORBA::Long> (received_data_p.max_slots()) < max_samples)
2258  {
2259  max_samples = static_cast< ::CORBA::Long> (received_data_p.max_slots());
2260  }
2261  //---- end of preconditions common to read and take -----
2262 
2263  return DDS::RETCODE_OK;
2264 }
#define ACE_DEBUG(X)
ACE_CDR::Long Long
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_OK
const long LENGTH_UNLIMITED

◆ clear_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::clear_sample ( DDS::InstanceHandle_t  handle)
inlineprivate

Definition at line 2310 of file DataReaderImpl_T.h.

2311 {
2312  // sample_lock_ should already be held
2313 
2314  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2315  if (sample != filter_delayed_sample_map_.end()) {
2316  // leave the entry in the container, so that the key remains valid if the reactor is waiting on this lock while this is occurring
2317  sample->second.message.reset();
2318  }
2319 }
FilterDelayedSampleMap filter_delayed_sample_map_

◆ contains_sample_filtered()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::contains_sample_filtered ( DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
const FilterEvaluator evaluator,
const DDS::StringSeq params 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 701 of file DataReaderImpl_T.h.

706  {
709 
711  TypeSupport* const ts = topic->get_type_support();
712  TypeSupportImpl* const type_support = dynamic_cast<TypeSupportImpl*>(ts);
713  const bool filter_has_non_key_fields = type_support ? evaluator.has_non_key_fields(*type_support) : true;
714 
716  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
717  ++next; // pre-increment iterator, in case updates cause changes to match set
718  const DDS::InstanceHandle_t handle = *it;
719  const SubscriptionInstance_rch inst = get_handle_instance(handle);
720  if (!inst) continue;
721 
722  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
723  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
724  if (!item->registered_data_ || (!item->valid_data_ && filter_has_non_key_fields)) {
725  continue;
726  }
727  if (evaluator.eval(*static_cast<MessageType*>(item->registered_data_), params)) {
728  return true;
729  }
730  }
731  }
732 
733  return false;
734  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
bool eval(const T &sample, const DDS::StringSeq &params) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
ACE_Recursive_Thread_Mutex instances_lock_
TopicDescriptionPtr< TopicImpl > topic_servant_
bool has_non_key_fields(const TypeSupportImpl &ts) const

◆ data_allocator()

template<typename MessageType>
unique_ptr<DataAllocator>& OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator ( )
inlineprivate

Definition at line 2413 of file DataReaderImpl_T.h.

2413 { return data_allocator_; }
unique_ptr< DataAllocator > data_allocator_

◆ dds_demarshal()

template<typename MessageType>
virtual RcHandle<MessageHolder> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dds_demarshal ( const OpenDDS::DCPS::ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
OpenDDS::DCPS::SubscriptionInstance_rch instance,
bool &  just_registered,
bool &  filtered,
OpenDDS::DCPS::MarshalingType  marshaling_type,
bool  full_copy 
)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1067 of file DataReaderImpl_T.h.

1074  {
1075  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
1076  dynamic_hook(*data);
1077  RcHandle<MessageHolder> message_holder;
1078 
1079  Message_Block_Ptr payload(sample.data(&mb_alloc_));
1081  if (!MarshalTraitsType::from_message_block(*data, *payload)) {
1082  if (DCPS_debug_level > 0) {
1083  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataReaderImpl::dds_demarshal: ")
1084  ACE_TEXT("attempting to skip serialize but bad from_message_block. Returning from demarshal.\n")));
1085  }
1086  return message_holder;
1087  }
1088  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1089  return message_holder;
1090  }
1091  const bool encapsulated = sample.header_.cdr_encapsulation_;
1092 
1094  payload.get(),
1096  static_cast<Endianness>(sample.header_.byte_order_));
1097 
1098  if (encapsulated) {
1099  EncapsulationHeader encap;
1100  if (!(ser >> encap)) {
1101  if (DCPS_debug_level > 0) {
1102  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1103  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1104  ACE_TEXT("deserialization of encapsulation header failed.\n"),
1105  TraitsType::type_name()));
1106  }
1107  return message_holder;
1108  }
1110  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
1111  return message_holder;
1112  }
1113 
1114  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
1115  if (DCPS_debug_level >= 1) {
1116  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
1117  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1118  ACE_TEXT("Encoding kind %C of the received sample does not ")
1119  ACE_TEXT("match the ones specified by DataReader.\n"),
1120  TraitsType::type_name(),
1121  Encoding::kind_to_string(encoding.kind()).c_str()));
1122  }
1123  return message_holder;
1124  }
1125  if (DCPS_debug_level >= 8) {
1126  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
1127  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1128  ACE_TEXT("Deserializing with encoding kind %C.\n"),
1129  TraitsType::type_name(),
1130  Encoding::kind_to_string(encoding.kind()).c_str()));
1131  }
1132 
1133  ser.encoding(encoding);
1134  }
1135 
1136  const bool key_only_marshaling =
1137  marshaling_type == OpenDDS::DCPS::KEY_ONLY_MARSHALING;
1138 
1139  bool ser_ret = true;
1140  if (key_only_marshaling) {
1141  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(*data);
1142  } else {
1143  ser_ret = ser >> *data;
1144  if (full_copy) {
1145  message_holder = make_rch<MessageHolder_T<MessageType> >(*data);
1146  }
1147  }
1148  if (!ser_ret) {
1149  if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
1150  if (DCPS_debug_level > 1) {
1151  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::dds_demarshal ")
1152  ACE_TEXT("object construction failure, dropping sample.\n"),
1153  TraitsType::type_name()));
1154  }
1155  } else {
1156  if (DCPS_debug_level > 0) {
1157  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR %CDataReaderImpl::dds_demarshal ")
1158  ACE_TEXT("deserialization failed, dropping sample.\n"),
1159  TraitsType::type_name()));
1160  }
1161  }
1162  return message_holder;
1163  }
1164 
1165 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1166  /*
1167  * If sample.header_.content_filter_ is true, the writer has already
1168  * filtered.
1169  */
1170  if (!sample.header_.content_filter_) {
1173  const bool sample_only_has_key_fields = !sample.header_.valid_data();
1174  if (key_only_marshaling != sample_only_has_key_fields) {
1175  if (DCPS_debug_level > 0) {
1176  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
1177  ACE_TEXT("%CDataReaderImpl::dds_demarshal: ")
1178  ACE_TEXT("Mismatch between the key only and valid data properties ")
1179  ACE_TEXT("of a %C message of a content filtered topic!\n"),
1180  TraitsType::type_name(),
1181  to_string(static_cast<MessageId>(sample.header_.message_id_))));
1182  }
1183  filtered = true;
1184  message_holder.reset();
1185  return message_holder;
1186  }
1187  const MessageType& type = static_cast<MessageType&>(*data);
1188  if (!content_filtered_topic_->filter(type, sample_only_has_key_fields)) {
1189  filtered = true;
1190  message_holder.reset();
1191  return message_holder;
1192  }
1193  }
1194  }
1195 #endif
1196 
1197  store_instance_data(move(data), publication_handle, sample.header_, instance, just_registered, filtered);
1198  return message_holder;
1199  }
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
char message_id_
The enum MessageId.
TransportMessageBlockAllocator mb_alloc_
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
bool valid_data() const
Returns true if the sample has a complete serialized payload.
TopicDescriptionPtr< ContentFilteredTopicImpl > content_filtered_topic_
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TypeSupportImpl * type_support_
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
const char * to_string(MessageId value)
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
unique_ptr< DataAllocator > & data_allocator()
ACE_Thread_Mutex content_filtered_topic_mutex_
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153

◆ delay_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::delay_sample ( DDS::InstanceHandle_t  handle,
unique_ptr< MessageTypeWithAllocator data,
const OpenDDS::DCPS::DataSampleHeader header,
const bool  just_registered,
const MonotonicTimePoint now,
const MonotonicTimePoint deadline 
)
inlineprivate

Definition at line 2266 of file DataReaderImpl_T.h.

2272 {
2273  // sample_lock_ should already be held
2275 
2276  typename FilterDelayedSampleMap::iterator i = filter_delayed_sample_map_.find(handle);
2277  if (i == filter_delayed_sample_map_.end()) {
2278 
2279  // emplace()/insert() only if the sample is going to be
2280  // new (otherwise we call move(data) twice).
2281  std::pair<typename FilterDelayedSampleMap::iterator, bool> result =
2282 #ifdef ACE_HAS_CPP11
2283  filter_delayed_sample_map_.emplace(std::piecewise_construct,
2284  std::forward_as_tuple(handle),
2285  std::forward_as_tuple(move(data), hdr, just_registered));
2286 #else
2287  filter_delayed_sample_map_.insert(std::make_pair(handle, FilterDelayedSample(move(data), hdr, just_registered)));
2288 #endif
2289  FilterDelayedSample& sample = result.first->second;
2290  sample.expiration_time = deadline;
2291  const bool schedule = filter_delayed_sample_queue_.empty();
2292  filter_delayed_sample_queue_.insert(std::make_pair(deadline, handle));
2293  if (schedule) {
2294  filter_delayed_sample_task_->schedule(now - deadline);
2295  } else if (filter_delayed_sample_queue_.begin()->second == handle) {
2296  filter_delayed_sample_task_->cancel();
2297  filter_delayed_sample_task_->schedule(now - deadline);
2298  }
2299  } else {
2300  FilterDelayedSample& sample = i->second;
2301  // we only care about the most recently filtered sample, so clean up the last one
2302 
2303  sample.message = move(data);
2304  sample.header = hdr;
2305  sample.new_instance = just_registered;
2306  // already scheduled for timeout at the desired time
2307  }
2308 }
FilterDelayedSampleMap filter_delayed_sample_map_
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleQueue filter_delayed_sample_queue_

◆ dispose_unregister()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dispose_unregister ( const OpenDDS::DCPS::ReceivedDataSample sample,
DDS::InstanceHandle_t  publication_handle,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
)
inlineprotectedvirtual

!! caller should already have the sample_lock_

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 1201 of file DataReaderImpl_T.h.

1204  {
1205  //!!! caller should already have the sample_lock_
1206 
1207  // The data sample in this dispose message does not contain any valid data.
1208  // What it needs here is the key value to identify the instance to dispose.
1209  // The demarshal push this "sample" to received sample list so the user
1210  // can be notified the dispose event.
1211  bool just_registered = false;
1212  bool filtered = false;
1214  if (sample.header_.key_fields_only_) {
1216  }
1217  dds_demarshal(sample, publication_handle, instance, just_registered, filtered, marshaling, false);
1218  }
DataSampleHeader header_
The demarshalled sample header.
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
virtual RcHandle< MessageHolder > dds_demarshal(const OpenDDS::DCPS::ReceivedDataSample &sample, DDS::InstanceHandle_t publication_handle, OpenDDS::DCPS::SubscriptionInstance_rch &instance, bool &just_registered, bool &filtered, OpenDDS::DCPS::MarshalingType marshaling_type, bool full_copy)

◆ drop_sample()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::drop_sample ( DDS::InstanceHandle_t  handle)
inlineprivate

Definition at line 2321 of file DataReaderImpl_T.h.

2322 {
2323  // sample_lock_ should already be held
2324 
2325  typename FilterDelayedSampleMap::iterator sample = filter_delayed_sample_map_.find(handle);
2326  if (sample != filter_delayed_sample_map_.end()) {
2327  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.lower_bound(sample->second.expiration_time), limit = filter_delayed_sample_queue_.upper_bound(sample->second.expiration_time); pos != limit; ++pos) {
2328  if (pos->second == handle) {
2329  filter_delayed_sample_queue_.erase(pos);
2330  break;
2331  }
2332  }
2333 
2334  // use the handle to erase, since the sample lock was released
2335  filter_delayed_sample_map_.erase(handle);
2336  }
2337 }
FilterDelayedSampleMap filter_delayed_sample_map_
FilterDelayedSampleQueue filter_delayed_sample_queue_

◆ dynamic_hook() [1/3]

template<>
void OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::dynamic_hook ( XTypes::DynamicSample sample)
private

◆ dynamic_hook() [2/3]

template<>
void OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::dynamic_hook ( XTypes::DynamicSample sample)
inlineprivate

Definition at line 114 of file DynamicDataReaderImpl.h.

115  {
116  XTypes::DynamicDataReaderImpl* const self = dynamic_cast<XTypes::DynamicDataReaderImpl*>(this);
117  if (self) {
118  self->imbue_type(sample);
119  }
120  }

◆ dynamic_hook() [3/3]

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::dynamic_hook ( MessageType &  )
inlineprivate

Available for specialization so that some types of MessageType can observe and change the sample before dds_demarshal deserializes into it

Definition at line 1257 of file DataReaderImpl_T.h.

1257 {}

◆ enable_specific()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::enable_specific ( )
inlinevirtual

Do parts of enable specific to the datatype. Called by DataReaderImpl::enable().

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 145 of file DataReaderImpl_T.h.

146  {
147  data_allocator().reset(new DataAllocator(get_n_chunks ()));
149  ACE_DEBUG((LM_DEBUG,
150  ACE_TEXT("(%P|%t) %CDataReaderImpl::")
151  ACE_TEXT("enable_specific-data")
152  ACE_TEXT(" Cached_Allocator_With_Overflow ")
153  ACE_TEXT("%x with %d chunks\n"),
154  TraitsType::type_name(),
155  data_allocator().get(),
156  get_n_chunks ()));
157 
158  return DDS::RETCODE_OK;
159  }
#define ACE_DEBUG(X)
OpenDDS::DCPS::Cached_Allocator_With_Overflow< MessageTypeMemoryBlock, ACE_Thread_Mutex > DataAllocator
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const ReturnCode_t RETCODE_OK
unique_ptr< DataAllocator > & data_allocator()

◆ filter_delayed()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed ( const MonotonicTimePoint now)
inlineprivate

Definition at line 2339 of file DataReaderImpl_T.h.

2340 {
2341  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2342 
2343  // Make a copy because finish_store_instance_data will release the sample lock.
2344  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) Handles;
2345  Handles handles;
2346 
2348 
2349  for (FilterDelayedSampleQueue::iterator pos = filter_delayed_sample_queue_.begin(), limit = filter_delayed_sample_queue_.end(); pos != limit && pos->first <= now;) {
2350  handles.push_back(pos->second);
2351  filter_delayed_sample_queue_.erase(pos++);
2352  }
2353 
2355 
2356  for (Handles::const_iterator pos = handles.begin(), limit = handles.end(); pos != limit; ++pos) {
2357  const DDS::InstanceHandle_t handle = *pos;
2358 
2359  SubscriptionInstance_rch instance = get_handle_instance(handle);
2360  if (!instance) {
2361  continue;
2362  }
2363 
2364  typename FilterDelayedSampleMap::iterator data = filter_delayed_sample_map_.find(handle);
2365  if (data == filter_delayed_sample_map_.end()) {
2366  continue;
2367  }
2368 
2369  if (data->second.message) {
2370  const bool NOT_DISPOSE_MSG = false;
2371  const bool NOT_UNREGISTER_MSG = false;
2372  // clear the message, since ownership is being transferred to finish_store_instance_data.
2373 
2374  instance->last_accepted_.set_to_now();
2375  const DataSampleHeader_ptr header = data->second.header;
2376  const bool new_instance = data->second.new_instance;
2377 
2378  // should not use data iterator anymore, since finish_store_instance_data releases sample_lock_
2379  finish_store_instance_data(move(data->second.message),
2380  *header,
2381  instance,
2382  NOT_DISPOSE_MSG,
2383  NOT_UNREGISTER_MSG);
2384 
2385  accept_sample_processing(instance, *header, new_instance);
2386 
2387  // Refresh the iterator.
2388  data = filter_delayed_sample_map_.find(handle);
2389  if (data == filter_delayed_sample_map_.end()) {
2390  continue;
2391  }
2392 
2393  // Reschedule.
2394  data->second.expiration_time = now + interval;
2395  filter_delayed_sample_queue_.insert(std::make_pair(data->second.expiration_time, handle));
2396 
2397  } else {
2398  // this check is performed to handle the corner case where
2399  // store_instance_data received and delivered a sample, while this
2400  // method was waiting for the lock
2401  if (MonotonicTimePoint::now() - instance->last_sample_tv_ >= interval) {
2402  // no new data to process, so remove from container
2403  filter_delayed_sample_map_.erase(data);
2404  }
2405  }
2406  }
2407 
2408  if (!filter_delayed_sample_queue_.empty()) {
2409  filter_delayed_sample_task_->schedule(filter_delayed_sample_queue_.begin()->first - now);
2410  }
2411 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void accept_sample_processing(const SubscriptionInstance_rch &instance, const DataSampleHeader &header, bool is_new_instance)
FilterDelayedSampleMap filter_delayed_sample_map_
TimeBasedFilterQosPolicy time_based_filter
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_Strong_Bound_Ptr< const OpenDDS::DCPS::DataSampleHeader, ACE_Null_Mutex > DataSampleHeader_ptr
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleQueue filter_delayed_sample_queue_
#define TheServiceParticipant

◆ finish_store_instance_data()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::finish_store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
const DataSampleHeader header,
SubscriptionInstance_rch  instance_ptr,
bool  is_dispose_msg,
bool  is_unregister_msg 
)
inlineprivate

Definition at line 1945 of file DataReaderImpl_T.h.

1947 {
1950  (instance_ptr->rcvd_samples_.size() >=
1951  static_cast<size_t>(qos_.resource_limits.max_samples_per_instance))) {
1952 
1953  // According to spec 1.2, Samples that contain no data do not
1954  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
1955  // so do not remove the oldest sample when unregister/dispose
1956  // message arrives.
1957 
1958  if (!is_dispose_msg && !is_unregister_msg
1959  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
1960  {
1961  DDS::DataReaderListener_var listener
1963 
1965 
1971 
1972  if (!CORBA::is_nil(listener.in()))
1973  {
1975 
1976  listener->on_sample_rejected(this, sample_rejected_status_);
1978  } // do we want to do something if listener is nil???
1980  return;
1981  }
1982  else if (!is_dispose_msg && !is_unregister_msg)
1983  {
1984  // Discard the oldest previously-read sample
1986  instance_ptr->rcvd_samples_.remove_head();
1987  item->dec_ref();
1988  }
1989  }
1991  {
1993  {
1995  for (OpenDDS::DCPS::DataReaderImpl::SubscriptionInstanceMapType::iterator iter = instances_.begin();
1996  iter != instances_.end();
1997  ++iter) {
1998  OpenDDS::DCPS::SubscriptionInstance_rch ptr = iter->second;
1999 
2000  total_samples += (CORBA::Long) ptr->rcvd_samples_.size();
2001  }
2002  }
2003 
2004  if (total_samples >= qos_.resource_limits.max_samples)
2005  {
2006  // According to spec 1.2, Samples that contain no data do not
2007  // count towards the limits imposed by the RESOURCE_LIMITS QoS policy
2008  // so do not remove the oldest sample when unregister/dispose
2009  // message arrives.
2010 
2011  if (!is_dispose_msg && !is_unregister_msg
2012  && !instance_ptr->rcvd_samples_.matches(DDS::READ_SAMPLE_STATE))
2013  {
2014  DDS::DataReaderListener_var listener
2016 
2018 
2024  if (!CORBA::is_nil(listener.in()))
2025  {
2027 
2028  listener->on_sample_rejected(this, sample_rejected_status_);
2030  } // do we want to do something if listener is nil???
2032 
2033  return;
2034  }
2035  else if (!is_dispose_msg && !is_unregister_msg)
2036  {
2037  // Discard the oldest previously-read sample
2039  instance_ptr->rcvd_samples_.remove_head();
2040  item->dec_ref();
2041  }
2042  }
2043  }
2044 
2045  bool event_notify = false;
2046 
2047  if (is_dispose_msg) {
2048  event_notify = instance_ptr->instance_state_->dispose_was_received(header.publication_id_);
2049  }
2050 
2051  if (is_unregister_msg) {
2052  if (instance_ptr->instance_state_->unregister_was_received(header.publication_id_)) {
2053  event_notify = true;
2054  }
2055  }
2056 
2057  if (!is_dispose_msg && !is_unregister_msg) {
2058  event_notify = true;
2059  instance_ptr->instance_state_->data_was_received(header.publication_id_);
2060  }
2061 
2062  if (!event_notify) {
2063  return;
2064  }
2065 
2066  ReceivedDataElement* const ptr =
2068  header, instance_data.release(), &sample_lock_);
2069 
2071  instance_ptr->instance_state_->disposed_generation_count();
2074 
2075  instance_ptr->last_sequence_ = header.sequence_;
2076 
2077  instance_ptr->rcvd_strategy_->add(ptr);
2078 
2079  if (! is_dispose_msg && ! is_unregister_msg
2080  && instance_ptr->rcvd_samples_.size() > get_depth())
2081  {
2083  instance_ptr->rcvd_samples_.remove_head();
2084 
2085  if (head_ptr->sample_state_ == DDS::NOT_READ_SAMPLE_STATE)
2086  {
2087  DDS::DataReaderListener_var listener
2089 
2092 
2094 
2095  if (!CORBA::is_nil(listener.in()))
2096  {
2098 
2099  listener->on_sample_lost(this, sample_lost_status_);
2100 
2102  }
2103 
2105  }
2106 
2107  head_ptr->dec_ref();
2108  }
2109 
2110 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2111  if (! ptr->coherent_change_) {
2112 #endif
2114  if (!sub || get_deleted())
2115  return;
2116 
2118 
2120 
2121  DDS::SubscriberListener_var sub_listener =
2123  if (!CORBA::is_nil(sub_listener.in()) && !coherent_) {
2124  if (!is_bit()) {
2127  sub_listener->on_data_on_readers(sub.in());
2128  } else {
2129  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataOnReaders>(sub, sub_listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, false));
2130  }
2131  } else {
2132  sub->notify_status_condition();
2133 
2134  DDS::DataReaderListener_var listener =
2136 
2137  if (!CORBA::is_nil(listener.in())) {
2138  if (!is_bit()) {
2141  sub.reset();
2143  listener->on_data_available(this);
2144  } else {
2145  TheServiceParticipant->job_queue()->enqueue(make_rch<OnDataAvailable>(listener, rchandle_from(static_cast<DataReaderImpl*>(this)), true, true, true));
2146  }
2147  } else {
2149  }
2150  }
2151 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2152  }
2153 #endif
2154 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const StatusKind SAMPLE_LOST_STATUS
CORBA::Long total_samples() const
bool dispose_was_received(const GUID_t &writer_id)
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
DDS::SubscriberListener_ptr listener_for(DDS::StatusKind kind)
unique_ptr< ReceivedDataAllocator > rd_allocator_
bool unregister_was_received(const GUID_t &writer_id)
unique_ptr< ReceivedDataStrategy > rcvd_strategy_
ReceivedDataElementList strategy.
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind DATA_ON_READERS_STATUS
SequenceNumber last_sequence_
Sequence number of the move recent data sample received.
RcHandle< SubscriberImpl > get_subscriber_servant()
bool coherent_
Is accessing to Group coherent changes ?
SampleRejectedStatusKind last_reason
void data_was_received(const GUID_t &writer_id)
Data sample received for this instance.
const StatusKind DATA_AVAILABLE_STATUS
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::SampleLostStatus sample_lost_status_
const SampleStateKind READ_SAMPLE_STATE
DDS::SampleRejectedStatus sample_rejected_status_
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const SampleStateKind NOT_READ_SAMPLE_STATE
size_t disposed_generation_count() const
Access disposed generation count.
const long LENGTH_UNLIMITED
#define TheServiceParticipant
ACE_Recursive_Thread_Mutex instances_lock_
bool matches(CORBA::ULong sample_states) const
const InstanceState_rch instance_state_
Instance state for this instance.
ResourceLimitsQosPolicy resource_limits
bool coherent_change_
Sample belongs to an active coherent change set.
Boolean is_nil(T x)
const StatusKind SAMPLE_REJECTED_STATUS
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
size_t no_writers_generation_count() const
Access no writers generation count.

◆ get_key_value()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_key_value ( MessageType &  key_holder,
DDS::InstanceHandle_t  handle 
)
inlinevirtual

Definition at line 657 of file DataReaderImpl_T.h.

659  {
661 
662  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(handle);
663  if (pos != reverse_instance_map_.end()) {
664  key_holder = pos->second->first;
665  return DDS::RETCODE_OK;
666  }
667 
669  }
ReverseInstanceMap reverse_instance_map_
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_BAD_PARAMETER

◆ get_marshal_skip_serialize()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::get_marshal_skip_serialize ( ) const
inline

Definition at line 1047 of file DataReaderImpl_T.h.

1048  {
1049  return marshal_skip_serialize_;
1050  }

◆ lookup_instance() [1/2]

template<typename MessageType>
virtual DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const MessageType &  instance_data)
inlinevirtual

Definition at line 671 of file DataReaderImpl_T.h.

Referenced by OpenDDS::DCPS::BitSubscriber::remove_connection_record(), and OpenDDS::DCPS::BitSubscriber::remove_thread_status().

672  {
674 
675  const typename InstanceMap::const_iterator it = instance_map_.find(instance_data);
676  if (it != instance_map_.end()) {
677  return it->second;
678  }
679  return DDS::HANDLE_NIL;
680  }
const InstanceHandle_t HANDLE_NIL
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.

◆ lookup_instance() [2/2]

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance ( const OpenDDS::DCPS::ReceivedDataSample sample,
OpenDDS::DCPS::SubscriptionInstance_rch instance 
)
inlinevirtual

!! caller should already have the sample_lock_

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 916 of file DataReaderImpl_T.h.

918  {
919  //!!! caller should already have the sample_lock_
920  const bool encapsulated = sample.header_.cdr_encapsulation_;
921  Message_Block_Ptr payload(sample.data(&mb_alloc_));
923  payload.get(),
925  static_cast<Endianness>(sample.header_.byte_order_));
926 
927  if (encapsulated) {
928  EncapsulationHeader encap;
929  if (!(ser >> encap)) {
930  if (DCPS_debug_level > 0) {
931  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR ")
932  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
933  ACE_TEXT("deserialization of encapsulation header failed.\n"),
934  TraitsType::type_name()));
935  }
936  return;
937  }
939  if (!encap.to_encoding(encoding, type_support_->base_extensibility())) {
940  return;
941  }
942 
943  if (decoding_modes_.find(encoding.kind()) == decoding_modes_.end()) {
944  if (DCPS_debug_level >= 1) {
945  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) WARNING ")
946  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
947  ACE_TEXT("Encoding kind of the received sample (%C) does not ")
948  ACE_TEXT("match the ones specified by DataReader.\n"),
949  TraitsType::type_name(),
950  Encoding::kind_to_string(encoding.kind()).c_str()));
951  }
952  return;
953  }
954  if (DCPS_debug_level >= 8) {
955  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) ")
956  ACE_TEXT("%CDataReaderImpl::lookup_instance: ")
957  ACE_TEXT("Deserializing with encoding kind %C.\n"),
958  TraitsType::type_name(),
959  Encoding::kind_to_string(encoding.kind()).c_str()));
960  }
961 
962  ser.encoding(encoding);
963  }
964 
965  bool ser_ret = true;
966  MessageType data;
967  if (sample.header_.key_fields_only_) {
968  ser_ret = ser >> OpenDDS::DCPS::KeyOnly<MessageType>(data);
969  } else {
970  ser_ret = ser >> data;
971  }
972  if (!ser_ret) {
973  if (ser.get_construction_status() != Serializer::ConstructionSuccessful) {
974  if (DCPS_debug_level > 1) {
975  ACE_DEBUG((LM_WARNING, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
976  ACE_TEXT("object construction failure, dropping sample.\n"),
977  TraitsType::type_name()));
978  }
979  } else {
980  if (DCPS_debug_level > 0) {
981  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) %CDataReaderImpl::lookup_instance ")
982  ACE_TEXT("deserialization failed.\n"),
983  TraitsType::type_name()));
984  }
985  }
986  return;
987  }
988 
990  typename InstanceMap::const_iterator const it = instance_map_.find(data);
991  if (it != instance_map_.end()) {
992  handle = it->second;
993  }
994 
995  if (handle == DDS::HANDLE_NIL) {
996  instance.reset();
997  } else {
998  instance = get_handle_instance(handle);
999  }
1000  }
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
const InstanceHandle_t HANDLE_NIL
bool key_fields_only_
Only the key fields of the data sample are present in the payload.
TransportMessageBlockAllocator mb_alloc_
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TypeSupportImpl * type_support_
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
bool to_encoding(Encoding &encoding, Extensibility expected_extensibility)
Definition: Serializer.cpp:153

◆ lookup_instance_generic()

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::lookup_instance_generic ( const void *  data)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 760 of file DataReaderImpl_T.h.

761  {
762  return lookup_instance(*static_cast<const MessageType*>(data));
763  }
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)

◆ marshal()

template<typename MessageType>
CORBA::Boolean OpenDDS::DCPS::DataReaderImpl_T< MessageType >::marshal ( TAO_OutputCDR )
inlinevirtual

Reimplemented from CORBA::Object.

Definition at line 78 of file DataReaderImpl_T.h.

79  {
80  return false;
81  }

◆ notify_status_condition_no_sample_lock()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::notify_status_condition_no_sample_lock ( )
inlineprivate

Release sample_lock_ during status notifications in store_instance_data() as the lock is not needed and could cause deadlock condition. See comments in member function implementation for details.

Definition at line 2159 of file DataReaderImpl_T.h.

2160 {
2161  // This member function avoids a deadlock condition which otherwise
2162  // could occur as follows:
2163  // Thread 1: Call to WaitSet::wait() causes WaitSet::lock_ to lock and
2164  // eventually DataReaderImpl::sample_lock_ to lock in call to
2165  // DataReaderImpl::contains_samples().
2166  // Thread2: Call to DataReaderImpl::data_received()
2167  // causes DataReaderImpl::sample_lock_ to lock and eventually
2168  // during notify of status condition a call to WaitSet::signal()
2169  // causes WaitSet::lock_ to lock.
2170  // Because the DataReaderImpl::sample_lock_ is not needed during
2171  // status notification this member function is used in
2172  // store_instance_data() to release sample_lock_ before making
2173  // the notification.
2176 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ OPENDDS_MAP() [1/2]

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
typename InstanceMap::iterator   
)

◆ OPENDDS_MAP() [2/2]

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
FilterDelayedSample   
)
private

◆ OPENDDS_MAP_CMP_T()

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MAP_CMP_T ( MessageType  ,
DDS::InstanceHandle_t  ,
typename TraitsType::LessThanType   
)

◆ OPENDDS_MULTIMAP()

template<typename MessageType>
typedef OpenDDS::DCPS::DataReaderImpl_T< MessageType >::OPENDDS_MULTIMAP ( MonotonicTimePoint  ,
DDS::InstanceHandle_t   
)
private

◆ purge_data()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::purge_data ( OpenDDS::DCPS::SubscriptionInstance_rch  instance)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1220 of file DataReaderImpl_T.h.

1221  {
1222  drop_sample(instance->instance_handle_);
1223 
1224 
1225  instance->instance_state_->cancel_release();
1226 
1227  while (instance->rcvd_samples_.size() > 0)
1228  {
1230  instance->rcvd_samples_.remove_head();
1231  head->dec_ref();
1232  }
1233  }
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
void cancel_release()
Cancel a scheduled or pending release of resources.
const InstanceState_rch instance_state_
Instance state for this instance.
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.
void drop_sample(DDS::InstanceHandle_t handle)

◆ qos_change()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::qos_change ( const DDS::DataReaderQos qos)
inlinevirtual

Reimplemented from OpenDDS::DCPS::DataReaderImpl.

Definition at line 1002 of file DataReaderImpl_T.h.

1003  {
1004  // reliability is not changeable, just time_based_filter
1009  if (qos.time_based_filter.minimum_separation != zero) {
1012  FilterDelayedSampleQueue queue;
1013 
1015  for (typename FilterDelayedSampleMap::iterator pos = filter_delayed_sample_map_.begin(), limit = filter_delayed_sample_map_.end(); pos != limit; ++pos) {
1016  FilterDelayedSample& sample = pos->second;
1017  sample.expiration_time = now + (interval - (sample.expiration_time - now));
1018  queue.insert(std::make_pair(sample.expiration_time, pos->first));
1019  }
1021 
1022  if (!filter_delayed_sample_queue_.empty()) {
1023  filter_delayed_sample_task_->cancel();
1024  filter_delayed_sample_task_->schedule(interval);
1025  }
1026 
1027  } else {
1028  filter_delayed_sample_task_->cancel();
1032  }
1033  }
1034  // else no existing timers to change/cancel
1035  }
1036  // else no qos change so nothing to change
1037  }
1038 
1040  }
void swap(MessageBlock &lhs, MessageBlock &rhs)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
FilterDelayedSampleMap filter_delayed_sample_map_
TimeBasedFilterQosPolicy time_based_filter
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
ReliabilityQosPolicyKind kind
const unsigned long DURATION_ZERO_NSEC
Definition: DdsDcpsCore.idl:76
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
ReliabilityQosPolicy reliability
const long DURATION_ZERO_SEC
Definition: DdsDcpsCore.idl:75
RcHandle< DRISporadicTask > filter_delayed_sample_task_
FilterDelayedSampleQueue filter_delayed_sample_queue_
virtual void qos_change(const DDS::DataReaderQos &qos)

◆ read()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 161 of file DataReaderImpl_T.h.

168  {
169  DDS::ReturnCode_t const precond =
170  check_inputs("read", received_data, info_seq, max_samples);
171  if (DDS::RETCODE_OK != precond)
172  {
173  return precond;
174  }
175 
177  guard,
178  sample_lock_,
180 
181  return read_i(received_data, info_seq, max_samples, sample_states,
183  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK

◆ read_generic() [1/3]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 21 of file DynamicDataReaderImpl.cpp.

26  {
28  }
const ReturnCode_t RETCODE_UNSUPPORTED

◆ read_generic() [2/3]

template<>
OpenDDS_Dcps_Export DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< XTypes::DynamicSample >::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count 
)
virtual

◆ read_generic() [3/3]

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_generic ( GenericBundle gen,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
bool  adjust_ref_count = false 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 736 of file DataReaderImpl_T.h.

741  {
742  MessageSequenceType data;
744  {
746  rc = read_i(data, gen.info_, DDS::LENGTH_UNLIMITED,
748  if (adjust_ref_count) {
749  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(data);
750  received_data_p.increment_references();
751  }
752  }
753  gen.samples_.reserve(data.length());
754  for (CORBA::ULong i = 0; i < data.length(); ++i) {
755  gen.samples_.push_back(&data[i]);
756  }
757  return rc;
758  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_ERROR
TraitsType::MessageSequenceType MessageSequenceType
const long LENGTH_UNLIMITED

◆ read_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1328 of file DataReaderImpl_T.h.

1339 {
1340 
1341  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1342 
1343 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1346  }
1347 
1348  const bool group_coherent_ordered =
1352 
1353  if (group_coherent_ordered && coherent_) {
1354  max_samples = 1;
1355  }
1356 #endif
1357 
1358  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1359 #ifndef OPENDDS_NO_QUERY_CONDITION
1360  a_condition,
1361 #endif
1363 
1365 
1366 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1367  if (!group_coherent_ordered) {
1368 #endif
1370  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1371  ++next; // pre-increment iterator, in case updates cause changes to match set
1372  const DDS::InstanceHandle_t handle = *it;
1373  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1374  if (!inst) continue;
1375 
1376  size_t i(0);
1377  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1378  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1379  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1380 
1381  const ValueDispatcher* vd = get_value_dispatcher();
1382  if (observer && item->registered_data_ && vd) {
1383  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1384  observer->on_sample_read(this, s);
1385  }
1386  }
1387  }
1388 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1389  } else {
1391  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1392  const ValueDispatcher* vd = get_value_dispatcher();
1393  if (observer && item.rde_->registered_data_ && vd) {
1394  typename InstanceMap::iterator i = instance_map_.begin();
1395  const DDS::InstanceHandle_t handle = (i != instance_map_.end()) ? i->second : DDS::HANDLE_NIL;
1396  Observer::Sample s(handle, item.si_->instance_state_->instance_state(), *item.rde_, *vd);
1397  observer->on_sample_read(this, s);
1398  }
1399  }
1400 #endif
1401 
1402  results.copy_to_user();
1403 
1405  if (received_data.length()) {
1406  ret = DDS::RETCODE_OK;
1407  if (received_data.maximum() == 0) { // using ZeroCopy
1408  received_data_p.set_loaner(this);
1409  }
1410  }
1411 
1413  return ret;
1414 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const ValueDispatcher * get_value_dispatcher() const
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
PresentationQosPolicyAccessScopeKind access_scope
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
SubscriptionInstance_rch si_
Definition: RakeData.h:27
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
bool coherent_
Is accessing to Group coherent changes ?
ReceivedDataElementList * rdel_
Definition: RakeData.h:26
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
Access instance state.
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
const ReturnCode_t RETCODE_NO_DATA
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
ReceivedDataElement * rde_
Definition: RakeData.h:25
const InstanceState_rch instance_state_
Instance state for this instance.
PresentationQosPolicy presentation

◆ read_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 385 of file DataReaderImpl_T.h.

393  {
394  DDS::ReturnCode_t const precond =
395  check_inputs("read_instance", received_data, info_seq, max_samples);
396  if (DDS::RETCODE_OK != precond)
397  {
398  return precond;
399  }
400 
402  guard,
403  sample_lock_,
405  return read_instance_i(received_data, info_seq, max_samples, a_handle,
407  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ read_instance_generic()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 788 of file DataReaderImpl_T.h.

792  {
793  MessageSequenceType dataseq;
794  DDS::SampleInfoSeq infoseq;
795  const DDS::ReturnCode_t rc = read_instance_i(dataseq, infoseq,
797  instance_states, 0);
798  if (rc != DDS::RETCODE_NO_DATA)
799  {
800  const CORBA::ULong last = dataseq.length() - 1;
801  data = new MessageType(dataseq[last]);
802  info = infoseq[last];
803  }
804  return rc;
805  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
sequence< SampleInfo > SampleInfoSeq
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
ACE_CDR::ULong ULong
const ReturnCode_t RETCODE_NO_DATA
TraitsType::MessageSequenceType MessageSequenceType
const long LENGTH_UNLIMITED
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ read_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1496 of file DataReaderImpl_T.h.

1508 {
1509  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1510  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1511 
1512  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1513 
1514  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1515 #ifndef OPENDDS_NO_QUERY_CONDITION
1516  a_condition,
1517 #endif
1519 
1520  const InstanceState_rch state_obj = inst->instance_state_;
1521  if (state_obj->match(view_states, instance_states)) {
1523  size_t i(0);
1524  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1525  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1526  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1527  const ValueDispatcher* vd = get_value_dispatcher();
1528  if (observer && item->registered_data_ && vd) {
1529  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1530  observer->on_sample_read(this, s);
1531  }
1532  }
1533  } else if (DCPS_debug_level >= 8) {
1534  OPENDDS_STRING msg;
1535  if ((state_obj->view_state() & view_states) == 0) {
1536  msg = "view state is not valid";
1537  }
1538  if ((state_obj->instance_state() & instance_states) == 0) {
1539  if (!msg.empty()) msg += " and ";
1540  msg += "instance state is ";
1541  msg += state_obj->instance_state_string();
1542  msg += " while the validity mask is " + InstanceState::instance_state_mask_string(instance_states);
1543  }
1544  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataReaderImpl_T::read_instance_i: ")
1545  ACE_TEXT("will return no data reading sub %C because:\n %C\n"),
1546  LogGuid(get_guid()).c_str(), msg.c_str()));
1547  }
1548 
1549  results.copy_to_user();
1550 
1552  if (received_data.length()) {
1553  ret = DDS::RETCODE_OK;
1554  if (received_data.maximum() == 0) { // using ZeroCopy
1555  received_data_p.set_loaner(this);
1556  }
1557  }
1558 
1560  return ret;
1561 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
#define ACE_DEBUG(X)
const ValueDispatcher * get_value_dispatcher() const
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
const char * c_str() const
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
static OPENDDS_STRING instance_state_mask_string(DDS::InstanceStateMask mask)
Return string representation of the instance state mask passed.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define OPENDDS_STRING
const char * instance_state_string() const
Return string of the name of the current instance state.
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
ACE_TEXT("TCP_Factory")
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
Access instance state.
const ReturnCode_t RETCODE_NO_DATA
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const ReturnCode_t RETCODE_OK
DDS::ViewStateKind view_state() const
Access view state.
const InstanceState_rch instance_state_
Instance state for this instance.
PresentationQosPolicy presentation
const ReturnCode_t RETCODE_BAD_PARAMETER

◆ read_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 433 of file DataReaderImpl_T.h.

439  {
440  DDS::ReturnCode_t const precond =
441  check_inputs("read_instance_w_condition", received_data, info_seq,
442  max_samples);
443  if (DDS::RETCODE_OK != precond)
444  {
445  return precond;
446  }
447 
450 
451  if (!has_readcondition(a_condition))
452  {
454  }
455 
456 #ifndef OPENDDS_NO_QUERY_CONDITION
457  DDS::QueryCondition_ptr query_condition =
458  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
459 #endif
460 
461  return read_instance_i(received_data, info_seq, max_samples, a_handle,
462  a_condition->get_sample_state_mask(),
463  a_condition->get_view_state_mask(),
464  a_condition->get_instance_state_mask(),
465 #ifndef OPENDDS_NO_QUERY_CONDITION
466  query_condition
467 #else
468  0
469 #endif
470  );
471  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ read_next_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 513 of file DataReaderImpl_T.h.

521  {
522  DDS::ReturnCode_t const precond =
523  check_inputs("read_next_instance", received_data, info_seq, max_samples);
524  if (DDS::RETCODE_OK != precond)
525  {
526  return precond;
527  }
528 
529  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
531  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_OK

◆ read_next_instance_generic()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_generic ( void *&  data,
DDS::SampleInfo info,
DDS::InstanceHandle_t  previous_instance,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 807 of file DataReaderImpl_T.h.

811  {
812  MessageSequenceType dataseq;
813  DDS::SampleInfoSeq infoseq;
814  const DDS::ReturnCode_t rc = read_next_instance_i(dataseq, infoseq,
815  DDS::LENGTH_UNLIMITED, previous_instance, sample_states, view_states,
816  instance_states, 0);
817  if (rc != DDS::RETCODE_NO_DATA)
818  {
819  const CORBA::ULong last = dataseq.length() - 1;
820  data = new MessageType(dataseq[last]);
821  info = infoseq[last];
822  }
823  return rc;
824  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
sequence< SampleInfo > SampleInfoSeq
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
ACE_CDR::ULong ULong
const ReturnCode_t RETCODE_NO_DATA
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
TraitsType::MessageSequenceType MessageSequenceType
const long LENGTH_UNLIMITED

◆ read_next_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1616 of file DataReaderImpl_T.h.

1628 {
1630 
1631  typename InstanceMap::iterator it = instance_map_.begin();
1632  const typename InstanceMap::iterator the_end = instance_map_.end();
1633  if (a_handle != DDS::HANDLE_NIL) {
1634  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1635  if (pos != reverse_instance_map_.end()) {
1636  it = pos->second;
1637  ++it;
1638  } else {
1639  it = the_end;
1640  }
1641  }
1642 
1644  for (; it != the_end; ++it) {
1645  handle = it->second;
1646  const DDS::ReturnCode_t status =
1647  read_instance_i(received_data, info_seq, max_samples, handle,
1649 #ifndef OPENDDS_NO_QUERY_CONDITION
1650  a_condition);
1651 #else
1652  0);
1653 #endif
1654  if (status != DDS::RETCODE_NO_DATA) {
1656  return status;
1657  }
1658  }
1659 
1661  return DDS::RETCODE_NO_DATA;
1662 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const InstanceHandle_t HANDLE_NIL
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
ReverseInstanceMap reverse_instance_map_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_NO_DATA
const ReturnCode_t RETCODE_ERROR
DDS::ReturnCode_t read_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ read_next_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 553 of file DataReaderImpl_T.h.

559  {
560  DDS::ReturnCode_t const precond =
561  check_inputs("read_next_instance_w_condition", received_data, info_seq,
562  max_samples);
563  if (DDS::RETCODE_OK != precond)
564  {
565  return precond;
566  }
567 
570 
571  if (!has_readcondition(a_condition))
572  {
574  }
575 
576 #ifndef OPENDDS_NO_QUERY_CONDITION
577  DDS::QueryCondition_ptr query_condition =
578  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
579 #endif
580 
581  return read_next_instance_i(received_data, info_seq, max_samples, a_handle,
582  a_condition->get_sample_state_mask(),
583  a_condition->get_view_state_mask(),
584  a_condition->get_instance_state_mask(),
585 #ifndef OPENDDS_NO_QUERY_CONDITION
586  query_condition
587 #else
588  0
589 #endif
590  );
591  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t read_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ read_next_sample()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info_ref 
)
inlinevirtual

Definition at line 274 of file DataReaderImpl_T.h.

276  {
277  bool found_data = false;
279 
281 
283  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
284  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
285  ++next; // pre-increment iterator, in case updates cause changes to match set
286  const DDS::InstanceHandle_t handle = *it;
287  const SubscriptionInstance_rch inst = get_handle_instance(handle);
288  if (!inst) continue;
289 
290  bool most_recent_generation = false;
291  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
292  !found_data && item; item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
293  if (item->registered_data_) {
294  received_data = *static_cast<MessageType*>(item->registered_data_);
295  }
296  inst->instance_state_->sample_info(sample_info_ref, item);
297  inst->rcvd_samples_.mark_read(item);
298 
300  if (observer && item->registered_data_ && vd) {
301  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
302  observer->on_sample_read(this, s);
303  }
304 
305  if (!most_recent_generation) {
306  most_recent_generation = inst->instance_state_->most_recent_generation(item);
307  }
308 
309  found_data = true;
310  }
311 
312  if (found_data) {
313  if (most_recent_generation) {
314  inst->instance_state_->accessed();
315  }
316  // Get the sample_ranks, generation_ranks, and
317  // absolute_generation_ranks for this info_seq
318  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
319 
320  break;
321  }
322  }
323 
325  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
326  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const ValueDispatcher * get_value_dispatcher() const
InstanceStateKind instance_state
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
void accessed()
A read or take operation has been performed on this instance.
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
ACE_CDR::ULong ULong
InstanceHandle_t instance_handle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const InstanceStateMask ANY_INSTANCE_STATE
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ViewStateMask ANY_VIEW_STATE
bool most_recent_generation(ReceivedDataElement *item) const
virtual void on_sample_read(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:93
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const ReturnCode_t RETCODE_NO_DATA
const ReturnCode_t RETCODE_ERROR
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
const InstanceState_rch instance_state_
Instance state for this instance.

◆ read_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::read_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 209 of file DataReaderImpl_T.h.

214  {
215  DDS::ReturnCode_t const precond =
216  check_inputs("read_w_condition", received_data, sample_info, max_samples);
217  if (DDS::RETCODE_OK != precond)
218  {
219  return precond;
220  }
221 
224 
225  if (!has_readcondition(a_condition))
226  {
228  }
229 
230  return read_i(received_data, sample_info, max_samples,
231  a_condition->get_sample_state_mask(),
232  a_condition->get_view_state_mask(),
233  a_condition->get_instance_state_mask(),
234 #ifndef OPENDDS_NO_QUERY_CONDITION
235  dynamic_cast< DDS::QueryCondition_ptr >(a_condition));
236 #else
237  0);
238 #endif
239  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
DDS::ReturnCode_t read_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)

◆ release_all_instances()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_all_instances ( )
inlinevirtual

Release all instances held by the reader.

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1052 of file DataReaderImpl_T.h.

1053  {
1055 
1056  const typename InstanceMap::iterator end = instance_map_.end();
1057  typename InstanceMap::iterator it = instance_map_.begin();
1058  while (it != end) {
1059  const DDS::InstanceHandle_t handle = it->second;
1060  ++it; // it will be invalid, so iterate now.
1061  release_instance(handle);
1062  }
1063  }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release_instance(DDS::InstanceHandle_t handle)
Release the instance with the handle.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51

◆ release_instance_i()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_instance_i ( DDS::InstanceHandle_t  handle)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1235 of file DataReaderImpl_T.h.

1236  {
1237  const typename ReverseInstanceMap::iterator pos = reverse_instance_map_.find(handle);
1238  if (pos != reverse_instance_map_.end()) {
1239  remove_from_lookup_maps(handle);
1240  instance_map_.erase(pos->second);
1241  reverse_instance_map_.erase(pos);
1242  }
1243  }
void remove_from_lookup_maps(DDS::InstanceHandle_t handle)
ReverseInstanceMap reverse_instance_map_

◆ release_loan()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::release_loan ( MessageSequenceType received_data)
inline

Definition at line 695 of file DataReaderImpl_T.h.

696  {
697  received_data.length(0);
698  }

◆ return_loan()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::return_loan ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq 
)
inlinevirtual

Definition at line 633 of file DataReaderImpl_T.h.

636  {
637  // Some incomplete tests to see that the data and info are from the
638  // same read.
639  if (received_data.length() != info_seq.length())
640  {
642  }
643 
644  if (received_data.release())
645  {
646  // nothing to do because this is not zero-copy data
647  return DDS::RETCODE_OK;
648  }
649  else
650  {
651  info_seq.length(0);
652  received_data.length(0);
653  }
654  return DDS::RETCODE_OK;
655  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_OK

◆ set_instance_state_i()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_instance_state_i ( DDS::InstanceHandle_t  instance,
DDS::InstanceHandle_t  publication_handle,
DDS::InstanceStateKind  state,
const SystemTimePoint timestamp,
const GUID_t publication_id 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 887 of file DataReaderImpl_T.h.

892  {
893  // sample_lock_ must be held.
894  using namespace OpenDDS::DCPS;
895 
897  if (si && state != DDS::ALIVE_INSTANCE_STATE) {
898  const DDS::Time_t now = timestamp.to_dds_time();
900  header.publication_id_ = publication_id;
901  header.source_timestamp_sec_ = now.sec;
902  header.source_timestamp_nanosec_ = now.nanosec;
903  const int msg = (state == DDS::NOT_ALIVE_DISPOSED_INSTANCE_STATE)
905  header.message_id_ = static_cast<char>(msg);
906  bool just_registered, filtered;
907  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator);
908  get_key_value(*data, instance);
909  store_instance_data(move(data), publication_handle, header, si, just_registered, filtered);
910  if (!filtered) {
912  }
913  }
914  }
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
char message_id_
The enum MessageId.
YARD is all original work While it may rely on standard YARD does not include code from other sources We have chosen to release our work as public domain code This means that YARD has been released outside the copyright system Feel free to use the code in any way you wish especially in an academic plagiarism has very little to do with copyright In an academic or in any situation where you are expected to give credit to other people s you will need to cite YARD as a source The author is Christopher and the appropriate date is December the release date for we can t make any promises regarding whether YARD will do what you or whether we will make any changes you ask for You are free to hire your own expert for that If you choose to distribute YARD you ll probably want to read up on the laws covering warranties in your state
Definition: COPYING.txt:14
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
virtual DDS::ReturnCode_t get_key_value(MessageType &key_holder, DDS::InstanceHandle_t handle)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
unsigned long nanosec
const InstanceStateKind NOT_ALIVE_DISPOSED_INSTANCE_STATE
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
unique_ptr< DataAllocator > & data_allocator()
const InstanceStateKind ALIVE_INSTANCE_STATE

◆ set_marshal_skip_serialize()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::set_marshal_skip_serialize ( bool  value)
inline

Definition at line 1042 of file DataReaderImpl_T.h.

1043  {
1045  }
const LogLevel::Value value
Definition: debug.cpp:61

◆ state_updated_i()

template<typename MessageType>
virtual void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::state_updated_i ( DDS::InstanceHandle_t  handle)
inlineprotectedvirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 1245 of file DataReaderImpl_T.h.

1246  {
1247  const typename SubscriptionInstanceMapType::iterator pos = instances_.find(handle);
1248  if (pos != instances_.end()) {
1249  update_lookup_maps(pos);
1250  }
1251  }
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)

◆ store_instance_data()

template<typename MessageType>
void OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data ( unique_ptr< MessageTypeWithAllocator instance_data,
DDS::InstanceHandle_t  publication_handle,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance_rch instance_ptr,
bool &  just_registered,
bool &  filtered 
)
inlineprivate

!! caller should already have the sample_lock_

Definition at line 1713 of file DataReaderImpl_T.h.

1719 {
1720  ACE_UNUSED_ARG(publication_handle);
1721 
1722  const bool is_dispose_msg =
1725  const bool is_unregister_msg =
1728 
1729  if (!store_instance_data_check(instance_data, publication_handle, header, instance_ptr)) {
1730  return;
1731  }
1732 
1733  // not filtering any data, except what is specifically identified as filtered below
1734  filtered = false;
1735 
1737 
1738  //!!! caller should already have the sample_lock_
1739  //We will unlock it before calling into listeners
1740 
1741  typename InstanceMap::const_iterator const it = instance_map_.find(*instance_data);
1742 
1743  if (it == instance_map_.end()) {
1744  if (is_dispose_msg || is_unregister_msg) {
1745  return;
1746  }
1747 
1748  std::size_t instances_size = 0;
1749  {
1751  instances_size = instances_.size();
1752  }
1754  ((::CORBA::Long) instances_size >= qos_.resource_limits.max_instances))
1755  {
1756  DDS::DataReaderListener_var listener
1758 
1760 
1765 
1766  if (!CORBA::is_nil(listener.in()))
1767  {
1769 
1770  listener->on_sample_rejected(this, sample_rejected_status_);
1772  } // do we want to do something if listener is nil???
1774 
1775  return;
1776  }
1777 
1778  {
1780 
1781 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1782  SharedInstanceMap_rch inst;
1783  OwnershipManagerScopedAccess ownership_scoped_access;
1784  OwnershipManagerPtr owner_manager = ownership_manager();
1785 
1786  bool new_handle = true;
1788  OwnershipManagerScopedAccess temp(owner_manager);
1789  temp.swap(ownership_scoped_access);
1790  if (!owner_manager || ownership_scoped_access.lock_result_ != 0) {
1791  if (DCPS_debug_level > 0) {
1792  ACE_ERROR ((LM_ERROR,
1793  ACE_TEXT("(%P|%t) ")
1794  ACE_TEXT("%CDataReaderImpl::")
1795  ACE_TEXT("store_instance_data, ")
1796  ACE_TEXT("acquire instance_lock failed.\n"), TraitsType::type_name()));
1797  }
1798  return;
1799  }
1800 
1801  inst = dynamic_rchandle_cast<SharedInstanceMap>(
1802  owner_manager->get_instance_map(topic_servant_->type_name(), this));
1803  if (inst != 0) {
1804  typename InstanceMap::const_iterator const iter = inst->find(*instance_data);
1805  if (iter != inst->end ()) {
1806  handle = iter->second;
1807  new_handle = false;
1808  }
1809  }
1810  }
1811 #endif
1812 
1813  just_registered = true;
1814  DDS::BuiltinTopicKey_t key = OpenDDS::DCPS::keyFromSample(static_cast<MessageType*>(instance_data.get()));
1815  bool owns_handle = false;
1816  if (handle == DDS::HANDLE_NIL) {
1817  handle = get_next_handle(key);
1818  owns_handle = true;
1819  }
1821  OpenDDS::DCPS::make_rch<OpenDDS::DCPS::SubscriptionInstance>(
1822  rchandle_from(this),
1823  qos_,
1825  handle, owns_handle);
1826 
1827  const std::pair<typename SubscriptionInstanceMapType::iterator, bool> bpair =
1828  instances_.insert(typename SubscriptionInstanceMapType::value_type(handle, instance));
1829 
1830  if (bpair.second == false) {
1831  if (DCPS_debug_level > 0) {
1832  ACE_ERROR((LM_ERROR,
1833  ACE_TEXT("(%P|%t) ")
1834  ACE_TEXT("%CDataReaderImpl::")
1835  ACE_TEXT("store_instance_data, ")
1836  ACE_TEXT("insert handle failed.\n"), TraitsType::type_name()));
1837  }
1838  return;
1839  }
1840  update_lookup_maps(bpair.first);
1841 
1842 #ifndef OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE
1843  if (owner_manager) {
1844  if (!inst) {
1845  inst = make_rch<SharedInstanceMap>();
1846  owner_manager->set_instance_map(
1847  topic_servant_->type_name(),
1848  inst,
1849  this);
1850  }
1851 
1852  if (new_handle) {
1853  const std::pair<typename InstanceMap::iterator, bool> bpair =
1854  inst->insert(typename InstanceMap::value_type(*instance_data, handle));
1855  if (!bpair.second) {
1856  if (DCPS_debug_level > 0) {
1857  ACE_ERROR ((LM_ERROR,
1858  ACE_TEXT("(%P|%t) ")
1859  ACE_TEXT("%CDataReaderImpl::")
1860  ACE_TEXT("store_instance_data, ")
1861  ACE_TEXT("insert to participant scope %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1862  }
1863  return;
1864  }
1865  }
1866 
1867  OwnershipManagerScopedAccess temp;
1868  temp.swap(ownership_scoped_access);
1869  if (temp.release() != 0) {
1870  if (DCPS_debug_level > 0) {
1871  ACE_ERROR ((LM_ERROR,
1872  ACE_TEXT("(%P|%t) ")
1873  ACE_TEXT("%CDataReaderImpl::")
1874  ACE_TEXT("store_instance_data, ")
1875  ACE_TEXT("release instance_lock failed.\n"), TraitsType::type_name()));
1876  }
1877  return;
1878  }
1879  }
1880 #endif
1881  } // scope for instances_lock_
1882 
1883  std::pair<typename InstanceMap::iterator, bool> bpair =
1884  instance_map_.insert(typename InstanceMap::value_type(*instance_data,
1885  handle));
1886  if (bpair.second == false)
1887  {
1888  if (DCPS_debug_level > 0) {
1889  ACE_ERROR ((LM_ERROR,
1890  ACE_TEXT("(%P|%t) ")
1891  ACE_TEXT("%CDataReaderImpl::")
1892  ACE_TEXT("store_instance_data, ")
1893  ACE_TEXT("insert %C failed.\n"), TraitsType::type_name(), TraitsType::type_name()));
1894  }
1895  return;
1896  }
1897  reverse_instance_map_[handle] = bpair.first;
1898  }
1899  else
1900  {
1901  just_registered = false;
1902  handle = it->second;
1903  }
1904 
1906  {
1907  instance_ptr = get_handle_instance(handle);
1908  OPENDDS_ASSERT(instance_ptr);
1909 
1911  {
1912  {
1914  filtered = ownership_filter_instance(instance_ptr, header.publication_id_);
1915  }
1916 
1917  MonotonicTimePoint now;
1918  MonotonicTimePoint deadline;
1919  if (!filtered && time_based_filter_instance(instance_ptr, now, deadline)) {
1920  filtered = true;
1921  if (qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS) {
1922  delay_sample(handle, move(instance_data), header, just_registered, now, deadline);
1923  }
1924  } else {
1925  // nothing time based filtered now
1926  clear_sample(handle);
1927 
1928  }
1929 
1930  if (filtered) {
1931  return;
1932  }
1933  }
1934 
1935  finish_store_instance_data(move(instance_data), header, instance_ptr, is_dispose_msg, is_unregister_msg);
1936  }
1937  else
1938  {
1939  instance_ptr = get_handle_instance(handle);
1940  OPENDDS_ASSERT(instance_ptr);
1941  instance_ptr->instance_state_->lively(header.publication_id_);
1942  }
1943 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
ACE_CDR::Long Long
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
void lively(const GUID_t &writer_id)
LIVELINESS message received for this DataWriter.
RcHandle< SharedInstanceMap > SharedInstanceMap_rch
sequence< octet > key
bool time_based_filter_instance(const SubscriptionInstance_rch &instance, MonotonicTimePoint &now, MonotonicTimePoint &deadline)
ReverseInstanceMap reverse_instance_map_
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
SubscriptionInstanceMapType instances_
: document why the instances_ container is mutable.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void finish_store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, const DataSampleHeader &header, SubscriptionInstance_rch instance_ptr, bool is_dispose_msg, bool is_unregister_msg)
DDS::DataReaderListener_ptr listener_for(DDS::StatusKind kind)
SampleRejectedStatusKind last_reason
bool ownership_filter_instance(const SubscriptionInstance_rch &instance, const GUID_t &pubid)
bool store_instance_data_check(unique_ptr< MessageTypeWithAllocator > &instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr)
DDS::InstanceHandle_t get_next_handle(const DDS::BuiltinTopicKey_t &key)
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
void delay_sample(DDS::InstanceHandle_t handle, unique_ptr< MessageTypeWithAllocator > data, const OpenDDS::DCPS::DataSampleHeader &header, const bool just_registered, const MonotonicTimePoint &now, const MonotonicTimePoint &deadline)
DDS::BuiltinTopicKey_t keyFromSample(TopicType *sample)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::SampleRejectedStatus sample_rejected_status_
void clear_sample(DDS::InstanceHandle_t handle)
void update_lookup_maps(const SubscriptionInstanceMapType::iterator &input)
const long LENGTH_UNLIMITED
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
OwnershipManagerPtr ownership_manager()
ACE_Recursive_Thread_Mutex instances_lock_
const InstanceState_rch instance_state_
Instance state for this instance.
TopicDescriptionPtr< TopicImpl > topic_servant_
ResourceLimitsQosPolicy resource_limits
Boolean is_nil(T x)
const StatusKind SAMPLE_REJECTED_STATUS

◆ store_instance_data_check()

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_instance_data_check ( unique_ptr< MessageTypeWithAllocator > &  instance_data,
DDS::InstanceHandle_t  publication_handle,
const OpenDDS::DCPS::DataSampleHeader header,
OpenDDS::DCPS::SubscriptionInstance_rch instance_ptr 
)
inlineprivate

Definition at line 1259 of file DataReaderImpl_T.h.

1263  {
1264 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
1265  const bool is_dispose_msg =
1268 
1269  if (!is_bit() && security_config_) {
1270  if (header.message_id_ == SAMPLE_DATA ||
1271  header.message_id_ == INSTANCE_REGISTRATION) {
1272 
1273  // Pubulisher has already gone through the check.
1274  if (instance_ptr &&
1275  instance_ptr->instance_state_ &&
1276  instance_ptr->instance_state_->writes_instance(header.publication_id_)) {
1277  return true;
1278  }
1279 
1281  const GUID_t local_participant = make_part_guid(get_guid());
1282  const GUID_t remote_participant = make_part_guid(header.publication_id_);
1283  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1284  // Construct a DynamicData around the deserialized sample.
1285  DDS::DynamicData_var dda =
1287  // The remote participant might not be using security.
1288  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1289  !security_config_->get_access_control()->check_remote_datawriter_register_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1290  if (log_level >= LogLevel::Warning) {
1291  ACE_ERROR((LM_WARNING,
1292  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to register instance SecurityException[%d.%d]: %C\n",
1293  ex.code, ex.minor_code, ex.message.in()));
1294  }
1295  return false;
1296  }
1297  } else if (is_dispose_msg) {
1298 
1300  const GUID_t local_participant = make_part_guid(get_guid());
1301  const GUID_t remote_participant = make_part_guid(header.publication_id_);
1302  const DDS::Security::ParticipantCryptoHandle remote_participant_permissions_handle = security_config_->get_handle_registry(local_participant)->get_remote_participant_permissions_handle(remote_participant);
1303  // Construct a DynamicData around the deserialized sample.
1304  DDS::DynamicData_var dda =
1306  // The remote participant might not be using security.
1307  if (remote_participant_permissions_handle != DDS::HANDLE_NIL &&
1308  !security_config_->get_access_control()->check_remote_datawriter_dispose_instance(remote_participant_permissions_handle, this, publication_handle, dda, ex)) {
1309  if (log_level >= LogLevel::Warning) {
1310  ACE_ERROR((LM_WARNING,
1311  "(%P|%t) WARNING: DataReaderImpl_T::store_instance_data_check: unable to dispose instance SecurityException[%d.%d]: %C\n",
1312  ex.code, ex.minor_code, ex.message.in()));
1313  }
1314  return false;
1315  }
1316  }
1317  }
1318 #else
1319  ACE_UNUSED_ARG(instance_data);
1320  ACE_UNUSED_ARG(publication_handle);
1321  ACE_UNUSED_ARG(header);
1322  ACE_UNUSED_ARG(instance_ptr);
1323 #endif
1324 
1325  return true;
1326  }
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
DDS::DynamicType_var dynamic_type_
bool writes_instance(const GUID_t &writer_id) const
Returns true if the writer is a writer of this instance.
OpenDDS_Dcps_Export GUID_t make_part_guid(const GuidPrefix_t &prefix)
Definition: GuidUtils.h:216
OpenDDS_Dcps_Export LogLevel log_level
Security::SecurityConfig_rch security_config_
DDS::DynamicData_ptr get_dynamic_data_adapter(DDS::DynamicType_ptr type, const T &value)
const InstanceState_rch instance_state_
Instance state for this instance.

◆ store_synthetic_data()

template<typename MessageType>
DDS::InstanceHandle_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::store_synthetic_data ( const MessageType &  sample,
DDS::ViewStateKind  view,
const SystemTimePoint timestamp = SystemTimePoint::now() 
)
inline

Definition at line 828 of file DataReaderImpl_T.h.

Referenced by OpenDDS::DCPS::BitSubscriber::add_thread_status(), and OpenDDS::DCPS::StaticEndpointManager::init_bit().

831  {
832  using namespace OpenDDS::DCPS;
835 #ifndef OPENDDS_NO_MULTI_TOPIC
836  DDS::TopicDescription_var descr = get_topicdescription();
837  if (MultiTopicImpl* mt = dynamic_cast<MultiTopicImpl*>(descr.in())) {
838  if (!mt->filter(sample)) {
839  return DDS::HANDLE_NIL;
840  }
841  }
842 #endif
843 
844  get_subscriber_servant()->data_received(this);
845 
846  DDS::InstanceHandle_t inst = lookup_instance(sample);
847  bool filtered = false;
848  SubscriptionInstance_rch instance;
849 
850  const DDS::Time_t now = timestamp.to_dds_time();
852  header.source_timestamp_sec_ = now.sec;
853  header.source_timestamp_nanosec_ = now.nanosec;
854 
855  // Call store_instance_data() once or twice, depending on if we need to
856  // process the INSTANCE_REGISTRATION. In either case, store_instance_data()
857  // owns the memory for the sample and it must come from the correct allocator.
858  for (int i = 0; i < 2; ++i) {
859  if (i == 0 && inst != DDS::HANDLE_NIL) continue;
860 
861  const int msg = i ? SAMPLE_DATA : INSTANCE_REGISTRATION;
862  header.message_id_ = static_cast<char>(msg);
863 
864  bool just_registered;
865  unique_ptr<MessageTypeWithAllocator> data(new (*data_allocator()) MessageTypeWithAllocator(sample));
866  store_instance_data(move(data), DDS::HANDLE_NIL, header, instance, just_registered, filtered);
867  if (instance) inst = instance->instance_handle_;
868  }
869 
870  if (!filtered) {
871  if (view == DDS::NOT_NEW_VIEW_STATE) {
872  if (instance) instance->instance_state_->accessed();
873  }
875  }
876 
879  if (observer && vd) {
880  Observer::Sample s(instance ? instance->instance_handle_ : DDS::HANDLE_NIL, header.instance_state(), now, header.sequence_, &sample, *vd);
881  observer->on_sample_received(this, s);
882  }
883 
884  return inst;
885  }
virtual DDS::InstanceHandle_t lookup_instance(const MessageType &instance_data)
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
const ValueDispatcher * get_value_dispatcher() const
const InstanceHandle_t HANDLE_NIL
char message_id_
The enum MessageId.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
virtual DDS::TopicDescription_ptr get_topicdescription()
virtual void on_sample_received(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:92
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
void accessed()
A read or take operation has been performed on this instance.
DDS::InstanceStateKind instance_state() const
RcHandle< SubscriberImpl > get_subscriber_servant()
const ViewStateKind NOT_NEW_VIEW_STATE
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
void notify_read_conditions()
Data has arrived into the cache, unblock waiting ReadConditions.
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
unsigned long nanosec
void store_instance_data(unique_ptr< MessageTypeWithAllocator > instance_data, DDS::InstanceHandle_t publication_handle, const OpenDDS::DCPS::DataSampleHeader &header, OpenDDS::DCPS::SubscriptionInstance_rch &instance_ptr, bool &just_registered, bool &filtered)
unique_ptr< DataAllocator > & data_allocator()
const InstanceState_rch instance_state_
Instance state for this instance.
const DDS::InstanceHandle_t instance_handle_
The instance handle for the registered object.

◆ take() [1/4]

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 32 of file DynamicDataReaderImpl.cpp.

36  {
38  }
const ReturnCode_t RETCODE_UNSUPPORTED

◆ take() [2/4]

◆ take() [3/4]

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 185 of file DataReaderImpl_T.h.

192  {
193  DDS::ReturnCode_t const precond =
194  check_inputs("take", received_data, info_seq, max_samples);
195  if (DDS::RETCODE_OK != precond)
196  {
197  return precond;
198  }
199 
201  guard,
202  sample_lock_,
204 
205  return take_i(received_data, info_seq, max_samples, sample_states,
207  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ take() [4/4]

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take ( AbstractSamples samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Implements OpenDDS::DCPS::DataReaderImpl.

Definition at line 765 of file DataReaderImpl_T.h.

768  {
770  guard,
771  sample_lock_,
773 
774  MessageSequenceType data;
775  DDS::SampleInfoSeq infos;
776  const DDS::ReturnCode_t rc = take_i(data, infos, DDS::LENGTH_UNLIMITED,
778 
779  samples.reserve(data.length());
780 
781  for (CORBA::ULong i = 0; i < data.length(); ++i) {
782  samples.push_back(infos[i], &data[i]);
783  }
784 
785  return rc;
786  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
sequence< SampleInfo > SampleInfoSeq
virtual void push_back(const DDS::SampleInfo &info, const void *sample)=0
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
virtual void reserve(CORBA::ULong size)=0
ACE_CDR::ULong ULong
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ReturnCode_t RETCODE_ERROR
TraitsType::MessageSequenceType MessageSequenceType
const long LENGTH_UNLIMITED
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

◆ take_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1416 of file DataReaderImpl_T.h.

1427 {
1428  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1429 
1430 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1433  }
1434 
1435  const bool group_coherent_ordered =
1439 
1440  if (group_coherent_ordered && coherent_) {
1441  max_samples = 1;
1442  }
1443 #endif
1444 
1445  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1446 #ifndef OPENDDS_NO_QUERY_CONDITION
1447  a_condition,
1448 #endif
1450 
1452 
1453 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1454  if (!group_coherent_ordered) {
1455 #endif
1457  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
1458  ++next; // pre-increment iterator, in case updates cause changes to match set
1459  const DDS::InstanceHandle_t handle = *it;
1460  const SubscriptionInstance_rch inst = get_handle_instance(handle);
1461  if (!inst) continue;
1462 
1463  size_t i(0);
1464  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1465  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1466  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1467 
1468  const ValueDispatcher* vd = get_value_dispatcher();
1469  if (observer && item->registered_data_ && vd) {
1470  Observer::Sample s(handle, inst->instance_state_->instance_state(), *item, *vd);
1471  observer->on_sample_taken(this, s);
1472  }
1473  }
1474  }
1475 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
1476  } else {
1478  results.insert_sample(item.rde_, item.rdel_, item.si_, item.index_in_instance_);
1479  }
1480 #endif
1481 
1482  results.copy_to_user();
1483 
1485  if (received_data.length()) {
1486  ret = DDS::RETCODE_OK;
1487  if (received_data.maximum() == 0) { // using ZeroCopy
1488  received_data_p.set_loaner(this);
1489  }
1490  }
1491 
1493  return ret;
1494 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const ValueDispatcher * get_value_dispatcher() const
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
PresentationQosPolicyAccessScopeKind access_scope
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
SubscriptionInstance_rch si_
Definition: RakeData.h:27
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
bool coherent_
Is accessing to Group coherent changes ?
ReceivedDataElementList * rdel_
Definition: RakeData.h:26
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
Access instance state.
GroupRakeData group_coherent_ordered_data_
Ordered group samples.
const ReturnCode_t RETCODE_NO_DATA
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const ReturnCode_t RETCODE_OK
ReceivedDataElement * rde_
Definition: RakeData.h:25
const InstanceState_rch instance_state_
Instance state for this instance.
PresentationQosPolicy presentation

◆ take_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 409 of file DataReaderImpl_T.h.

417  {
418  DDS::ReturnCode_t const precond =
419  check_inputs("take_instance", received_data, info_seq, max_samples);
420  if (DDS::RETCODE_OK != precond)
421  {
422  return precond;
423  }
424 
426  guard,
427  sample_lock_,
429  return take_instance_i(received_data, info_seq, max_samples, a_handle,
431  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK

◆ take_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1563 of file DataReaderImpl_T.h.

1575 {
1576  const SubscriptionInstance_rch inst = get_handle_instance(a_handle);
1577  if (!inst) return DDS::RETCODE_BAD_PARAMETER;
1578 
1579  typename DDSTraits<MessageType>::MessageSequenceAdapterType received_data_p(received_data);
1580 
1581  RakeResults<MessageType> results(this, received_data, info_seq, max_samples, subqos_.presentation,
1582 #ifndef OPENDDS_NO_QUERY_CONDITION
1583  a_condition,
1584 #endif
1586 
1587  const InstanceState_rch state_obj = inst->instance_state_;
1588  if (state_obj->match(view_states, instance_states)) {
1590  size_t i(0);
1591  for (ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0); item;
1592  item = inst->rcvd_samples_.get_next_match(sample_states, item)) {
1593  results.insert_sample(item, &inst->rcvd_samples_, inst, ++i);
1594  const ValueDispatcher* vd = get_value_dispatcher();
1595  if (observer && item->registered_data_ && vd) {
1596  Observer::Sample s(a_handle, inst->instance_state_->instance_state(), *item, *vd);
1597  observer->on_sample_taken(this, s);
1598  }
1599  }
1600  }
1601 
1602  results.copy_to_user();
1603 
1605  if (received_data.length()) {
1606  ret = DDS::RETCODE_OK;
1607  if (received_data.maximum() == 0) { // using ZeroCopy
1608  received_data_p.set_loaner(this);
1609  }
1610  }
1611 
1613  return ret;
1614 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const ValueDispatcher * get_value_dispatcher() const
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::InstanceStateKind instance_state() const
Access instance state.
const ReturnCode_t RETCODE_NO_DATA
bool match(DDS::ViewStateMask view, DDS::InstanceStateMask inst) const
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const ReturnCode_t RETCODE_OK
const InstanceState_rch instance_state_
Instance state for this instance.
PresentationQosPolicy presentation
const ReturnCode_t RETCODE_BAD_PARAMETER

◆ take_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 473 of file DataReaderImpl_T.h.

479  {
480  DDS::ReturnCode_t const precond =
481  check_inputs("take_instance_w_condition", received_data, info_seq,
482  max_samples);
483  if (DDS::RETCODE_OK != precond)
484  {
485  return precond;
486  }
487 
490 
491  if (!has_readcondition(a_condition))
492  {
494  }
495 
496 #ifndef OPENDDS_NO_QUERY_CONDITION
497  DDS::QueryCondition_ptr query_condition =
498  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
499 #endif
500 
501  return take_instance_i(received_data, info_seq, max_samples, a_handle,
502  a_condition->get_sample_state_mask(),
503  a_condition->get_view_state_mask(),
504  a_condition->get_instance_state_mask(),
505 #ifndef OPENDDS_NO_QUERY_CONDITION
506  query_condition
507 #else
508  0
509 #endif
510  );
511  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ take_next_instance()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states 
)
inlinevirtual

Definition at line 533 of file DataReaderImpl_T.h.

541  {
542  DDS::ReturnCode_t const precond =
543  check_inputs("take_next_instance", received_data, info_seq, max_samples);
544  if (DDS::RETCODE_OK != precond)
545  {
546  return precond;
547  }
548 
549  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
551  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_OK

◆ take_next_instance_i()

template<typename MessageType>
DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_i ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::SampleStateMask  sample_states,
DDS::ViewStateMask  view_states,
DDS::InstanceStateMask  instance_states,
DDS::QueryCondition_ptr  a_condition 
)
inlineprivate

Definition at line 1664 of file DataReaderImpl_T.h.

1676 {
1678 
1679  typename InstanceMap::iterator it = instance_map_.begin();
1680  const typename InstanceMap::iterator the_end = instance_map_.end();
1681  if (a_handle != DDS::HANDLE_NIL) {
1682  const typename ReverseInstanceMap::const_iterator pos = reverse_instance_map_.find(a_handle);
1683  if (pos != reverse_instance_map_.end()) {
1684  it = pos->second;
1685  ++it;
1686  } else {
1687  it = the_end;
1688  }
1689  }
1690 
1692  for (; it != the_end; ++it) {
1693  handle = it->second;
1694  const DDS::ReturnCode_t status =
1695  take_instance_i(received_data, info_seq, max_samples, handle,
1697 #ifndef OPENDDS_NO_QUERY_CONDITION
1698  a_condition);
1699 #else
1700  0);
1701 #endif
1702  if (status != DDS::RETCODE_NO_DATA) {
1703  total_samples(); // see if we are empty
1705  return status;
1706  }
1707  }
1708 
1710  return DDS::RETCODE_NO_DATA;
1711 }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const InstanceHandle_t HANDLE_NIL
CORBA::Long total_samples() const
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask in ::DDS::InstanceStateMask instance_states
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t take_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
ReverseInstanceMap reverse_instance_map_
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask in ::DDS::ViewStateMask view_states
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_NO_DATA
const ReturnCode_t RETCODE_ERROR

◆ take_next_instance_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_instance_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq info_seq,
::CORBA::Long  max_samples,
DDS::InstanceHandle_t  a_handle,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 593 of file DataReaderImpl_T.h.

599  {
600  DDS::ReturnCode_t const precond =
601  check_inputs("take_next_instance_w_condition", received_data, info_seq,
602  max_samples);
603  if (DDS::RETCODE_OK != precond)
604  {
605  return precond;
606  }
607 
610 
611  if (!has_readcondition(a_condition))
612  {
614  }
615 
616 #ifndef OPENDDS_NO_QUERY_CONDITION
617  DDS::QueryCondition_ptr query_condition =
618  dynamic_cast< DDS::QueryCondition_ptr >(a_condition);
619 #endif
620 
621  return take_next_instance_i(received_data, info_seq, max_samples, a_handle,
622  a_condition->get_sample_state_mask(),
623  a_condition->get_view_state_mask(),
624  a_condition->get_instance_state_mask(),
625 #ifndef OPENDDS_NO_QUERY_CONDITION
626  query_condition
627 #else
628  0
629 #endif
630  );
631  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq info_seq
Definition: IDLTemplate.txt:72
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
DDS::ReturnCode_t take_next_instance_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::InstanceHandle_t a_handle, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)

◆ take_next_sample()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_next_sample ( MessageType &  received_data,
DDS::SampleInfo sample_info_ref 
)
inlinevirtual

Definition at line 328 of file DataReaderImpl_T.h.

330  {
331  bool found_data = false;
333 
335 
337  const HandleSet& matches = lookup_matching_instances(sample_states, DDS::ANY_VIEW_STATE, DDS::ANY_INSTANCE_STATE);
338  for (HandleSet::const_iterator it = matches.begin(), next = it; it != matches.end(); it = next) {
339  ++next; // pre-increment iterator, in case updates cause changes to match set
340  const DDS::InstanceHandle_t handle = *it;
341  const SubscriptionInstance_rch inst = get_handle_instance(handle);
342  if (!inst) continue;
343 
344  bool most_recent_generation = false;
345  ReceivedDataElement* item = inst->rcvd_samples_.get_next_match(sample_states, 0);
346  if (item) {
347  if (item->registered_data_) {
348  received_data = *static_cast<MessageType*>(item->registered_data_);
349  }
350  inst->instance_state_->sample_info(sample_info_ref, item);
351  inst->rcvd_samples_.mark_read(item);
352 
354  if (observer && item->registered_data_ && vd) {
355  Observer::Sample s(sample_info_ref.instance_handle, sample_info_ref.instance_state, *item, *vd);
356  observer->on_sample_taken(this, s);
357  }
358 
359  if (!most_recent_generation) {
360  most_recent_generation = inst->instance_state_->most_recent_generation(item);
361  }
362 
363  if (most_recent_generation) {
364  inst->instance_state_->accessed();
365  }
366 
367  // Get the sample_ranks, generation_ranks, and
368  // absolute_generation_ranks for this info_seq
369  sample_info(sample_info_ref, inst->rcvd_samples_.peek_tail());
370 
371  inst->rcvd_samples_.remove(item);
372  item->dec_ref();
373  item = 0;
374 
375  found_data = true;
376 
377  break;
378  }
379  }
380 
382  return found_data ? DDS::RETCODE_OK : DDS::RETCODE_NO_DATA;
383  }
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long in ::DDS::SampleStateMask sample_states
Definition: IDLTemplate.txt:72
const ValueDispatcher * get_value_dispatcher() const
InstanceStateKind instance_state
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
virtual void on_sample_taken(DDS::DataReader_ptr, const Sample &)
Definition: Observer.h:94
ReceivedDataElement * get_next_match(CORBA::ULong sample_states, ReceivedDataElement *prev)
SubscriptionInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
void accessed()
A read or take operation has been performed on this instance.
void sample_info(DDS::SampleInfo &si, const ReceivedDataElement *de)
Populate the SampleInfo structure.
ACE_CDR::ULong ULong
InstanceHandle_t instance_handle
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const InstanceStateMask ANY_INSTANCE_STATE
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
const ViewStateMask ANY_VIEW_STATE
bool most_recent_generation(ReceivedDataElement *item) const
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
const ReturnCode_t RETCODE_NO_DATA
bool remove(ReceivedDataElement *data_sample)
const ReturnCode_t RETCODE_ERROR
ReceivedDataElementList rcvd_samples_
Data sample(s) in this instance.
const HandleSet & lookup_matching_instances(CORBA::ULong sample_states, CORBA::ULong view_states, CORBA::ULong instance_states) const
const SampleStateKind NOT_READ_SAMPLE_STATE
const ReturnCode_t RETCODE_OK
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
const InstanceState_rch instance_state_
Instance state for this instance.

◆ take_w_condition()

template<typename MessageType>
virtual DDS::ReturnCode_t OpenDDS::DCPS::DataReaderImpl_T< MessageType >::take_w_condition ( MessageSequenceType received_data,
DDS::SampleInfoSeq sample_info,
::CORBA::Long  max_samples,
DDS::ReadCondition_ptr  a_condition 
)
inlinevirtual

Definition at line 241 of file DataReaderImpl_T.h.

246  {
247  DDS::ReturnCode_t const precond =
248  check_inputs("take_w_condition", received_data, sample_info, max_samples);
249  if (DDS::RETCODE_OK != precond)
250  {
251  return precond;
252  }
253 
256 
257  if (!has_readcondition(a_condition))
258  {
260  }
261 
262  return take_i(received_data, sample_info, max_samples,
263  a_condition->get_sample_state_mask(),
264  a_condition->get_view_state_mask(),
265  a_condition->get_instance_state_mask(),
266 #ifndef OPENDDS_NO_QUERY_CONDITION
267  dynamic_cast< DDS::QueryCondition_ptr >(a_condition)
268 #else
269  0
270 #endif
271  );
272  }
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DDS::ReturnCode_t check_inputs(const char *method_name, MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, ::CORBA::Long max_samples)
Common input read* & take* input processing and precondition checks.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex sample_lock_
lock protecting sample container as well as statuses.
local interface<%TYPE%> inout ::DDS::SampleInfoSeq in long max_samples
Definition: IDLTemplate.txt:72
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
bool has_readcondition(DDS::ReadCondition_ptr a_condition)
void sample_info(DDS::SampleInfo &sample_info, const ReceivedDataElement *ptr)
DDS::ReturnCode_t take_i(MessageSequenceType &received_data, DDS::SampleInfoSeq &info_seq, CORBA::Long max_samples, DDS::SampleStateMask sample_states, DDS::ViewStateMask view_states, DDS::InstanceStateMask instance_states, DDS::QueryCondition_ptr a_condition)

Member Data Documentation

◆ data_allocator_

template<typename MessageType>
unique_ptr<DataAllocator> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::data_allocator_
private

Definition at line 2415 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_map_

template<typename MessageType>
FilterDelayedSampleMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_map_
private

Definition at line 2440 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_queue_

template<typename MessageType>
FilterDelayedSampleQueue OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_queue_
private

Definition at line 2442 of file DataReaderImpl_T.h.

◆ filter_delayed_sample_task_

template<typename MessageType>
RcHandle<DRISporadicTask> OpenDDS::DCPS::DataReaderImpl_T< MessageType >::filter_delayed_sample_task_
private

Definition at line 2422 of file DataReaderImpl_T.h.

◆ instance_map_

template<typename MessageType>
InstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::instance_map_
private

Definition at line 2417 of file DataReaderImpl_T.h.

◆ marshal_skip_serialize_

template<typename MessageType>
bool OpenDDS::DCPS::DataReaderImpl_T< MessageType >::marshal_skip_serialize_
private

Definition at line 2444 of file DataReaderImpl_T.h.

◆ reverse_instance_map_

template<typename MessageType>
ReverseInstanceMap OpenDDS::DCPS::DataReaderImpl_T< MessageType >::reverse_instance_map_
private

Definition at line 2418 of file DataReaderImpl_T.h.


The documentation for this class was generated from the following file: