OpenDDS  Snapshot(2023/04/28-20:55)
Classes | Public Types | Public Member Functions | Public Attributes | Protected Member Functions | Protected Attributes | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::DataWriterImpl Class Reference

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces. More...

#include <DataWriterImpl.h>

Inheritance diagram for OpenDDS::DCPS::DataWriterImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::DataWriterImpl:
Collaboration graph
[legend]

Classes

struct  AckCustomization
 
struct  AckToken
 
class  EncodingMode
 
struct  ReaderInfo
 

Public Types

typedef Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_MutexDataAllocator
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::DataWriter >
typedef DDS::DataWriter ::_ptr_type _ptr_type
 
typedef DDS::DataWriter ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 

Public Member Functions

typedef OPENDDS_MAP_CMP (GUID_t, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap
 
 DataWriterImpl ()
 
virtual ~DataWriterImpl ()
 
void set_marshal_skip_serialize (bool value)
 
bool get_marshal_skip_serialize () const
 
DataAllocatordata_allocator () const
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::DataWriterQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::DataWriterQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::DataWriterListener_ptr get_listener ()
 
virtual DDS::Topic_ptr get_topic ()
 
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
 
virtual DDS::Publisher_ptr get_publisher ()
 
virtual DDS::ReturnCode_t get_liveliness_lost_status (DDS::LivelinessLostStatus &status)
 
virtual DDS::ReturnCode_t get_offered_deadline_missed_status (DDS::OfferedDeadlineMissedStatus &status)
 
virtual DDS::ReturnCode_t get_offered_incompatible_qos_status (DDS::OfferedIncompatibleQosStatus &status)
 
virtual DDS::ReturnCode_t get_publication_matched_status (DDS::PublicationMatchedStatus &status)
 
TimeDuration liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
 
bool participant_liveliness_activity_after (const MonotonicTimePoint &tv)
 
virtual DDS::ReturnCode_t assert_liveliness ()
 
DDS::ReturnCode_t assert_liveliness_by_participant ()
 
typedef OPENDDS_VECTOR (DDS::InstanceHandle_t) InstanceHandleVec
 
void get_instance_handles (InstanceHandleVec &instance_handles)
 
void get_readers (RepoIdSet &readers)
 
virtual DDS::ReturnCode_t get_matched_subscriptions (DDS::InstanceHandleSeq &subscription_handles)
 
virtual DDS::ReturnCode_t get_matched_subscription_data (DDS::SubscriptionBuiltinTopicData &subscription_data, DDS::InstanceHandle_t subscription_handle)
 
virtual DDS::ReturnCode_t enable ()
 
virtual void add_association (const GUID_t &yourId, const ReaderAssociation &reader, bool active)
 
virtual void transport_assoc_done (int flags, const GUID_t &remote_id)
 
virtual void remove_associations (const ReaderIdSeq &readers, bool callback)
 
virtual void replay_durable_data_for (const GUID_t &remote_sub_id)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void update_subscription_params (const GUID_t &readerId, const DDS::StringSeq &params)
 
void cleanup ()
 
void init (TopicImpl *topic_servant, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, const DDS::StatusMask &mask, WeakRcHandle< DomainParticipantImpl > participant_servant, PublisherImpl *publisher_servant)
 
void send_all_to_flush_control (ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
 
DDS::ReturnCode_t register_instance_i (DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
 
DDS::ReturnCode_t register_instance_from_durable_data (DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
 
DDS::ReturnCode_t unregister_instance_i (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
 
void unregister_instances (const DDS::Time_t &source_timestamp)
 
DDS::ReturnCode_t write (Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
 
DDS::ReturnCode_t write_sample (const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
 
DDS::ReturnCode_t dispose (DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
 
DDS::ReturnCode_t num_samples (DDS::InstanceHandle_t handle, size_t &size)
 
ACE_UINT64 get_unsent_data (SendStateDataSampleList &list)
 
SendStateDataSampleList get_resend_data ()
 
GUID_t get_dp_id ()
 
void unregister_all ()
 
void data_delivered (const DataSampleElement *sample)
 
void transport_discovery_change ()
 
void control_delivered (const Message_Block_Ptr &sample)
 
bool should_ack () const
 Does this writer have samples to be acknowledged? More...
 
AckToken create_ack_token (DDS::Duration_t max_wait) const
 Create an AckToken for ack operations. More...
 
virtual void retrieve_inline_qos_data (TransportSendListener::InlineQosData &qos_data) const
 
virtual bool check_transport_qos (const TransportInst &inst)
 
bool coherent_changes_pending ()
 Are coherent changes pending? More...
 
void begin_coherent_changes ()
 Starts a coherent change set; should only be called once. More...
 
void end_coherent_changes (const GroupCoherentSamples &group_samples)
 Ends a coherent change set; should only be called once. More...
 
char const * get_type_name () const
 
void data_dropped (const DataSampleElement *element, bool dropped_by_transport)
 
void control_dropped (const Message_Block_Ptr &sample, bool dropped_by_transport)
 
ACE_Recursive_Thread_Mutexget_lock () const
 
DDS::DataWriterListener_ptr listener_for (DDS::StatusKind kind)
 
virtual int handle_timeout (const ACE_Time_Value &tv, const void *arg)
 Handle the assert liveliness timeout. More...
 
void send_suspended_data ()
 
void remove_all_associations ()
 
virtual void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
virtual void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
virtual void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
void notify_publication_disconnected (const ReaderIdSeq &subids)
 
void notify_publication_reconnected (const ReaderIdSeq &subids)
 
void notify_publication_lost (const ReaderIdSeq &subids)
 
DDS::ReturnCode_t create_sample_data_message (Message_Block_Ptr data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
 
bool persist_data ()
 
void wait_pending ()
 Wait for pending data and control messages to drain. More...
 
void set_wait_pending_deadline (const MonotonicTimePoint &deadline)
 
DDS::InstanceHandle_t get_next_handle ()
 
virtual RcHandle< EntityImplparent () const
 
bool filter_out (const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
 
DataBlockLockPool::DataBlockLockget_db_lock ()
 
PublicationInstance_rch get_handle_instance (DDS::InstanceHandle_t handle)
 
virtual WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
GUID_t get_guid () const
 
SequenceNumber get_max_sn () const
 
const ValueDispatcherget_value_dispatcher () const
 
DDS::ReturnCode_t get_key_value (Sample_rch &sample, DDS::InstanceHandle_t handle)
 
DDS::InstanceHandle_t lookup_instance (const Sample &sample)
 
DDS::InstanceHandle_t register_instance_w_timestamp (const Sample &sample, const DDS::Time_t &timestamp)
 
DDS::ReturnCode_t unregister_instance_w_timestamp (const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &timestamp)
 
DDS::ReturnCode_t dispose_w_timestamp (const Sample &sample, DDS::InstanceHandle_t instance_handle, const DDS::Time_t &source_timestamp)
 
- Public Member Functions inherited from DDS::DataWriter
ReturnCode_t set_qos (in DataWriterQos qos)
 
ReturnCode_t get_qos (inout DataWriterQos qos)
 
ReturnCode_t set_listener (in DataWriterListener a_listener, in StatusMask mask)
 
ReturnCode_t wait_for_acknowledgments (in Duration_t max_wait)
 
ReturnCode_t get_liveliness_lost_status (inout LivelinessLostStatus status)
 
ReturnCode_t get_offered_deadline_missed_status (inout OfferedDeadlineMissedStatus status)
 
ReturnCode_t get_offered_incompatible_qos_status (inout OfferedIncompatibleQosStatus status)
 
ReturnCode_t get_publication_matched_status (inout PublicationMatchedStatus status)
 
ReturnCode_t get_matched_subscriptions (inout InstanceHandleSeq subscription_handles)
 
ReturnCode_t get_matched_subscription_data (inout SubscriptionBuiltinTopicData subscription_data, in InstanceHandle_t subscription_handle)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::DataWriterCallbacks
 DataWriterCallbacks ()
 
virtual ~DataWriterCallbacks ()
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendListener
virtual ~TransportSendListener ()
 
virtual void data_acked (const GUID_t &)
 
virtual SendControlStatus send_control_customized (const DataLinkSet_rch &links, const DataSampleHeader &header, ACE_Message_Block *msg, void *extra)
 

Public Attributes

Atomic< int > data_dropped_count_
 Statistics counter. More...
 
Atomic< int > data_delivered_count_
 
MessageTracker controlTracker
 

Protected Member Functions

void check_and_set_repo_id (const GUID_t &id)
 
SequenceNumber get_next_sn ()
 
SequenceNumber get_next_sn_i ()
 
DataWriterListener_ptr get_ext_listener ()
 
DDS::ReturnCode_t wait_for_specific_ack (const AckToken &token)
 
void prepare_to_delete ()
 
DDS::ReturnCode_t setup_serialization ()
 
ACE_Message_Blockserialize_sample (const Sample &sample)
 
typedef OPENDDS_MAP_CMP (GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
 
virtual SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
TypeSupportImplget_type_support () const
 
DDS::ReturnCode_t instance_must_exist (const char *method_name, const Sample &sample, DDS::InstanceHandle_t &instance_handle, bool remove=false)
 
DDS::ReturnCode_t get_or_create_instance_handle (DDS::InstanceHandle_t &handle, const Sample &sample, const DDS::Time_t &source_timestamp)
 
DDS::ReturnCode_t write_w_timestamp (const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendListener
 TransportSendListener ()
 

Protected Attributes

size_t n_chunks_
 The number of chunks for the cached allocator. More...
 
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations. More...
 
CORBA::String_var type_name_
 The type name of associated topic. More...
 
DDS::DataWriterQos qos_
 The qos policy list of this datawriter. More...
 
DDS::DataWriterQos passed_qos_
 
WeakRcHandle< DomainParticipantImplparticipant_servant_
 
ACE_Thread_Mutex reader_info_lock_
 
RepoIdToReaderInfoMap reader_info_
 
bool skip_serialize_
 
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
 
Security::SecurityConfig_rch security_config_
 
DDS::Security::PermissionsHandle participant_permissions_handle_
 
DDS::DynamicType_var dynamic_type_
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Private Member Functions

void track_sequence_number (GUIDSeq *filter_out)
 
void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
 
DDS::ReturnCode_t dispose_and_unregister (DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
 
ACE_Message_Blockcreate_control_message (MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
 
bool send_liveliness (const MonotonicTimePoint &now)
 Send the liveliness message. More...
 
void lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids. More...
 
RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
DDS::DomainId_t domain_id () const
 
CORBA::Long get_priority_value (const AssociationData &) const
 
DDS::Security::ParticipantCryptoHandle get_crypto_handle () const
 
void association_complete_i (const GUID_t &remote_id)
 
void return_handle (DDS::InstanceHandle_t handle)
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
 
bool need_sequence_repair ()
 
bool need_sequence_repair_i () const
 
DDS::ReturnCode_t send_end_historic_samples (const GUID_t &readerId)
 
DDS::ReturnCode_t send_request_ack ()
 
typedef OPENDDS_MAP (DDS::InstanceHandle_t, Sample_rch) InstanceHandlesToValues
 
typedef OPENDDS_MAP_CMP (Sample_rch, DDS::InstanceHandle_t, SampleRchCmp) InstanceValuesToHandles
 
bool insert_instance (DDS::InstanceHandle_t handle, Sample_rch &sample)
 
InstanceValuesToHandles::iterator find_instance (const Sample &sample)
 

Private Attributes

unique_ptr< DataBlockLockPooldb_lock_pool_
 
CORBA::String_var topic_name_
 The name of associated topic. More...
 
GUID_t topic_id_
 The associated topic repository id. More...
 
TopicDescriptionPtr< TopicImpltopic_servant_
 The topic servant. More...
 
TypeSupportImpltype_support_
 
ACE_Thread_Mutex listener_mutex_
 Mutex to protect listener info. More...
 
DDS::StatusMask listener_mask_
 
DDS::DataWriterListener_var listener_
 Used to notify the entity for relevant events. More...
 
DDS::DomainId_t domain_id_
 The domain id. More...
 
GUID_t dp_id_
 
WeakRcHandle< PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter. More...
 
GUID_t publication_id_
 The repository id of this datawriter/publication. More...
 
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope. More...
 
ACE_Thread_Mutex sn_lock_
 Mutex for sequence_number_. More...
 
bool coherent_
 
ACE_UINT32 coherent_samples_
 
RcHandle< WriteDataContainerdata_container_
 The sample data container. More...
 
ACE_Recursive_Thread_Mutex lock_
 
RepoIdToHandleMap id_to_handle_map_
 
RepoIdSet readers_
 
DDS::LivelinessLostStatus liveliness_lost_status_
 Status conditions. More...
 
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
 
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
 
DDS::PublicationMatchedStatus publication_match_status_
 
bool liveliness_lost_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 The message block allocator. More...
 
unique_ptr< DataBlockAllocatordb_allocator_
 The data block allocator. More...
 
unique_ptr< DataSampleHeaderAllocatorheader_allocator_
 The header data allocator. More...
 
unique_ptr< DataAllocatordata_allocator_
 
ACE_Reactor_Timer_Interfacereactor_
 
TimeDuration liveliness_check_interval_
 The time interval for sending liveliness message. More...
 
MonotonicTimePoint last_liveliness_activity_time_
 Timestamp of last write/dispose/assert_liveliness. More...
 
CORBA::Long last_deadline_missed_total_count_
 
bool is_bit_
 
ACE_UINT64 min_suspended_transaction_id_
 The cached available data while suspending and associated transaction ids. More...
 
ACE_UINT64 max_suspended_transaction_id_
 
SendStateDataSampleList available_data_list_
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
unique_ptr< Monitorperiodic_monitor_
 Periodic Monitor object for this entity. More...
 
bool liveliness_asserted_
 
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
 
RcHandle< LivenessTimerliveness_timer_
 
MonotonicTimePoint wait_pending_deadline_
 
InstanceHandlesToValues instance_handles_to_values_
 
InstanceValuesToHandles instance_values_to_handles_
 

Friends

class WriteDataContainer
 
class PublisherImpl
 
class ::DDS_TEST
 

Additional Inherited Members

- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::DataWriter >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 

Detailed Description

Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.

See the DDS specification, OMG formal/2015-04-10, for a description of the interface this class is implementing.

This class must be inherited by the type-specific datawriter which is specific to the data-type associated with the topic.

Note
: This class is responsible for allocating memory for the header message block (MessageBlock + DataBlock + DataSampleHeader) and the DataSampleElement. The data-type datawriter is responsible for allocating memory for the sample data message block. (e.g. MessageBlock + DataBlock + Foo data). But it gives up ownership to this WriteDataContainer.

Definition at line 80 of file DataWriterImpl.h.

Member Typedef Documentation

◆ DataAllocator

Definition at line 92 of file DataWriterImpl.h.

Constructor & Destructor Documentation

◆ DataWriterImpl()

OpenDDS::DCPS::DataWriterImpl::DataWriterImpl ( )

Definition at line 58 of file DataWriterImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedDeadlineMissedStatus::last_instance_handle, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, liveliness_lost_status_, monitor_, offered_deadline_missed_status_, offered_incompatible_qos_status_, periodic_monitor_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, TheServiceParticipant, DDS::LivelinessLostStatus::total_count, DDS::OfferedDeadlineMissedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count, DDS::LivelinessLostStatus::total_count_change, DDS::OfferedDeadlineMissedStatus::total_count_change, DDS::OfferedIncompatibleQosStatus::total_count_change, and DDS::PublicationMatchedStatus::total_count_change.

61  , controlTracker("DataWriterImpl")
62  , n_chunks_(TheServiceParticipant->n_chunks())
63  , association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier())
64  , qos_(TheServiceParticipant->initial_DataWriterQos())
65  , skip_serialize_(false)
66  , db_lock_pool_(new DataBlockLockPool((unsigned long)TheServiceParticipant->n_chunks()))
68  , topic_servant_(0)
69  , type_support_(0)
71  , domain_id_(0)
74  , coherent_(false)
76  , liveliness_lost_(false)
77  , reactor_(0)
80  , is_bit_(false)
83  , liveliness_asserted_(false)
84  , liveness_timer_(make_rch<LivenessTimer>(ref(*this)))
85 {
88 
92 
97 
103 
104  monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_monitor(this));
105  periodic_monitor_.reset(TheServiceParticipant->monitor_factory_->create_data_writer_periodic_monitor(this));
106 }
size_t n_chunks_
The number of chunks for the cached allocator.
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
reference_wrapper< T > ref(T &r)
Definition: RcHandle_T.h:237
ACE_Reactor_Timer_Interface * reactor_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
const DDS::StatusMask DEFAULT_STATUS_MASK
RcHandle< LivenessTimer > liveness_timer_
GUID_t topic_id_
The associated topic repository id.
Atomic< int > data_dropped_count_
Statistics counter.
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
unique_ptr< Monitor > periodic_monitor_
Periodic Monitor object for this entity.
DDS::PublicationMatchedStatus publication_match_status_
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
unique_ptr< DataBlockLockPool > db_lock_pool_
#define TheServiceParticipant
CORBA::Long last_deadline_missed_total_count_
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
GUID_t publication_id_
The repository id of this datawriter/publication.
static const TimeDuration max_value
Definition: TimeDuration.h:32

◆ ~DataWriterImpl()

OpenDDS::DCPS::DataWriterImpl::~DataWriterImpl ( )
virtual

Definition at line 110 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, participant_servant_, and publication_id_.

111 {
112  DBG_ENTRY_LVL("DataWriterImpl", "~DataWriterImpl", 6);
113 #ifndef OPENDDS_SAFETY_PROFILE
114  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
115  if (participant) {
116  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
117  if (type_lookup_service) {
118  type_lookup_service->remove_guid_from_dynamic_map(publication_id_);
119  }
120  }
121 #endif
122 }
WeakRcHandle< DomainParticipantImpl > participant_servant_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
DCPS::RcHandle< TypeLookupService > TypeLookupService_rch
GUID_t publication_id_
The repository id of this datawriter/publication.

Member Function Documentation

◆ add_association()

void OpenDDS::DCPS::DataWriterImpl::add_association ( const GUID_t yourId,
const ReaderAssociation reader,
bool  active 
)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 212 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), OpenDDS::DCPS::LogGuid::c_str(), check_and_set_repo_id(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, DDS::DataReaderQos::durability, OpenDDS::DCPS::Observer::e_ASSOCIATED, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterClassName, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::EntityImpl::get_deleted(), get_guid(), OpenDDS::DCPS::EntityImpl::get_observer(), is_bit_, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::Observer::on_associated(), OpenDDS::DCPS::AssociationData::participant_discovered_at_, participant_servant_, OpenDDS::DCPS::ReaderAssociation::participantDiscoveredAt, qos_, reader_info_, reader_info_lock_, OpenDDS::DCPS::ReaderAssociation::readerDiscInfo, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, OpenDDS::DCPS::ReaderAssociation::transportContext, DDS::TransportPriorityQosPolicy::value, and DDS::VOLATILE_DURABILITY_QOS.

215 {
216  DBG_ENTRY_LVL("DataWriterImpl", "add_association", 6);
217 
218  if (DCPS_debug_level) {
219  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association - ")
220  ACE_TEXT("bit %d local %C remote %C\n"), is_bit_,
221  LogGuid(yourId).c_str(),
222  LogGuid(reader.readerId).c_str()));
223  }
224 
225  if (get_deleted()) {
226  if (DCPS_debug_level)
227  ACE_DEBUG((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::add_association")
228  ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
229 
230  return;
231  }
232 
233  check_and_set_repo_id(yourId);
234 
235  {
236  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
237  reader_info_.insert(std::make_pair(reader.readerId,
238  ReaderInfo(reader.filterClassName,
239  TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
240  reader.exprParams, participant_servant_,
241  reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
242  }
243 
244  if (DCPS_debug_level > 4) {
245  ACE_DEBUG((LM_DEBUG,
246  ACE_TEXT("(%P|%t) DataWriterImpl::add_association(): ")
247  ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
248  LogGuid(get_guid()).c_str(),
250  }
251 
252  AssociationData data;
253  data.remote_id_ = reader.readerId;
254  data.remote_data_ = reader.readerTransInfo;
255  data.discovery_locator_ = reader.readerDiscInfo;
256  data.participant_discovered_at_ = reader.participantDiscoveredAt;
257  data.remote_transport_context_ = reader.transportContext;
258  data.remote_reliable_ =
259  (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
260  data.remote_durable_ =
261  (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
262 
263  if (associate(data, active)) {
265  if (observer) {
266  observer->on_associated(this, data.remote_id_);
267  }
268  } else {
269  //FUTURE: inform inforepo and try again as passive peer
270  if (DCPS_debug_level) {
271  ACE_ERROR((LM_ERROR,
272  ACE_TEXT("(%P|%t) DataWriterImpl::add_association: ")
273  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
274  }
275  }
276 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
RepoIdToReaderInfoMap reader_info_
WeakRcHandle< DomainParticipantImpl > participant_servant_
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
ACE_TEXT("TCP_Factory")
bool associate(const AssociationData &peer, bool active)
TransportPriorityQosPolicy transport_priority
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Thread_Mutex reader_info_lock_
void check_and_set_repo_id(const GUID_t &id)
#define TheServiceParticipant

◆ assert_liveliness()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness ( )
virtual

Implements DDS::DataWriter.

Definition at line 1190 of file DataWriterImpl.cpp.

References DDS::AUTOMATIC_LIVELINESS_QOS, DDS::LivelinessQosPolicy::kind, DDS::DataWriterQos::liveliness, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::TimePoint_T< MonotonicClock >::now(), participant_servant_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and send_liveliness().

1191 {
1192  switch (this->qos_.liveliness.kind) {
1194  // Do nothing.
1195  break;
1197  {
1198  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1199  if (participant) {
1200  return participant->assert_liveliness();
1201  }
1202  }
1203  break;
1206  return DDS::RETCODE_ERROR;
1207  }
1208  break;
1209  }
1210 
1211  return DDS::RETCODE_OK;
1212 }
bool send_liveliness(const MonotonicTimePoint &now)
Send the liveliness message.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
static TimePoint_T< MonotonicClock > now()
Definition: TimePoint_T.inl:41
WeakRcHandle< DomainParticipantImpl > participant_servant_
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
LivelinessQosPolicyKind kind
LivelinessQosPolicy liveliness

◆ assert_liveliness_by_participant()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::assert_liveliness_by_participant ( )

Definition at line 1215 of file DataWriterImpl.cpp.

References DDS::LivelinessQosPolicy::kind, DDS::DataWriterQos::liveliness, liveliness_asserted_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, qos_, and DDS::RETCODE_OK.

1216 {
1217  // This operation is called by participant.
1219  // Set a flag indicating that we should send a liveliness message on the timer if necessary.
1220  liveliness_asserted_ = true;
1221  }
1222 
1223  return DDS::RETCODE_OK;
1224 }
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
const ReturnCode_t RETCODE_OK
LivelinessQosPolicyKind kind
LivelinessQosPolicy liveliness

◆ association_complete_i()

void OpenDDS::DCPS::DataWriterImpl::association_complete_i ( const GUID_t remote_id)
private

Definition at line 370 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), available_data_list_, OpenDDS::DCPS::SendStateDataSampleList::begin(), OpenDDS::DCPS::bind(), controlTracker, create_control_message(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::STUN::encoding(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), get_max_sn(), get_resend_data(), header, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, CORBA::is_nil(), OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, DDS::PublicationMatchedStatus::last_subscription_handle, DDS::DataWriterQos::lifespan, listener_for(), LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, lock_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), monitor_, OpenDDS::DCPS::move(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OPENDDS_STRING, participant_servant_, publication_id_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, publisher_servant_, qos_, reader_info_, reader_info_lock_, readers_, OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), OpenDDS::DCPS::serialized_size(), OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by transport_assoc_done().

371 {
372  DBG_ENTRY_LVL("DataWriterImpl", "association_complete_i", 6);
373 
374  bool reader_durable = false;
375 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
376  OPENDDS_STRING filterClassName;
377  RcHandle<FilterEvaluator> eval;
378  DDS::StringSeq expression_params;
379 #endif
380  {
382 
383  if (DCPS_debug_level >= 1) {
384  ACE_DEBUG((LM_DEBUG,
385  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i - ")
386  ACE_TEXT("bit %d local %C remote %C\n"),
387  is_bit_,
388  LogGuid(this->publication_id_).c_str(),
389  LogGuid(remote_id).c_str()));
390  }
391 
392  if (insert(readers_, remote_id) == -1) {
393  ACE_ERROR((LM_ERROR,
394  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::association_complete_i: ")
395  ACE_TEXT("insert %C from pending failed.\n"),
396  LogGuid(remote_id).c_str()));
397  }
398  }
399  {
400  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
401  RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
402 
403  if (it != reader_info_.end()) {
404  reader_durable = it->second.durable_;
405 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
406  filterClassName = it->second.filter_class_name_;
407  eval = it->second.eval_;
408  expression_params = it->second.expression_params_;
409 #endif
410  }
411  }
412 
413  if (this->monitor_) {
414  this->monitor_->report();
415  }
416 
417  if (!is_bit_) {
418 
419  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
420 
421  if (!participant)
422  return;
423 
424  data_container_->add_reader_acks(remote_id, get_max_sn());
425 
426  const DDS::InstanceHandle_t handle = participant->assign_handle(remote_id);
427 
428  {
429  // protect publication_match_status_ and status changed flags.
431 
432  if (DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
433  ACE_DEBUG((LM_WARNING,
434  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::association_complete_i: ")
435  ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
436  LogGuid(remote_id).c_str(),
437  handle));
438  return;
439 
440  } else if (DCPS_debug_level > 4) {
441  ACE_DEBUG((LM_DEBUG,
442  ACE_TEXT("(%P|%t) DataWriterImpl::association_complete_i: ")
443  ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
444  LogGuid(remote_id).c_str(),
445  handle));
446  }
447 
454  }
455 
456  DDS::DataWriterListener_var listener =
458 
459  if (!CORBA::is_nil(listener.in())) {
460 
461  listener->on_publication_matched(this, publication_match_status_);
462 
463  // TBD - why does the spec say to change this but not
464  // change the ChangeFlagStatus after a listener call?
467  }
468 
470  } else {
471  data_container_->add_reader_acks(remote_id, get_max_sn());
472  }
473 
474  // Support DURABILITY QoS
475  if (reader_durable) {
476  // Tell the WriteDataContainer to resend all sending/sent
477  // samples.
478  this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
479 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
480  , filterClassName, eval.in(), expression_params
481 #endif
482  );
483 
484  // Acquire the data writer container lock to avoid deadlock. The
485  // thread calling association_complete() has to acquire lock in the
486  // same order as the write()/register() operation.
487 
488  // Since the thread calling association_complete() is the ORB
489  // thread, it may have some performance penalty. If the
490  // performance is an issue, we may need a new thread to handle the
491  // data_available() calls.
493  guard,
494  this->get_lock());
495 
496  SendStateDataSampleList list = this->get_resend_data();
497  {
498  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
499  // Update the reader's expected sequence
500  SequenceNumber& seq =
501  reader_info_.find(remote_id)->second.expected_sequence_;
502 
503  for (SendStateDataSampleList::iterator list_el = list.begin();
504  list_el != list.end(); ++list_el) {
505  list_el->get_header().historic_sample_ = true;
506 
507  if (list_el->get_header().sequence_ > seq) {
508  seq = list_el->get_header().sequence_;
509  }
510  }
511  }
512 
513  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
514  if (!publisher || publisher->is_suspended()) {
516 
517  } else {
518  if (DCPS_debug_level >= 4) {
519  ACE_DEBUG((LM_INFO, "(%P|%t) Sending historic samples\n"));
520  }
521 
522  const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
523  size_t size = 0;
524  serialized_size(encoding, size, remote_id);
525  Message_Block_Ptr data(
527  get_db_lock()));
528  Serializer ser(data.get(), encoding);
529  ser << remote_id;
530 
531  DataSampleHeader header;
532  Message_Block_Ptr end_historic_samples(
534  END_HISTORIC_SAMPLES, header, move(data),
535  SystemTimePoint::now().to_dds_time()));
536 
538  guard.release();
541  SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
542  if (ret == SEND_CONTROL_ERROR) {
543  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: ")
544  ACE_TEXT("DataWriterImpl::association_complete_i: ")
545  ACE_TEXT("send_w_control failed.\n")));
547  }
548  }
549  }
550 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
#define ACE_DEBUG(X)
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
SendStateDataSampleListIterator iterator
STL-style bidirectional iterator and const-iterator types.
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
SendStateDataSampleList get_resend_data()
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
SendStateDataSampleList available_data_list_
DataBlockLockPool::DataBlockLock * get_db_lock()
SequenceNumber get_max_sn() const
const StatusKind PUBLICATION_MATCHED_STATUS
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
RepoIdToReaderInfoMap reader_info_
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_STRING
void enqueue_tail(const DataSampleElement *element)
WeakRcHandle< DomainParticipantImpl > participant_servant_
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
DDS::PublicationMatchedStatus publication_match_status_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
LifespanQosPolicy lifespan
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_Recursive_Thread_Mutex lock_
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
RepoIdToHandleMap id_to_handle_map_
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
Boolean is_nil(T x)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
SendControlStatus
Return code type for send_control() operations.

◆ begin_coherent_changes()

void OpenDDS::DCPS::DataWriterImpl::begin_coherent_changes ( )

Starts a coherent change set; should only be called once.

Definition at line 2324 of file DataWriterImpl.cpp.

References ACE_GUARD, coherent_, and get_lock().

2325 {
2327  guard,
2328  get_lock());
2329 
2330  this->coherent_ = true;
2331 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex & get_lock() const

◆ check_and_set_repo_id()

void OpenDDS::DCPS::DataWriterImpl::check_and_set_repo_id ( const GUID_t id)
inlineprotected

Definition at line 520 of file DataWriterImpl.h.

References OpenDDS::DCPS::GUID_UNKNOWN, and lock_.

Referenced by add_association().

521  {
523  if (GUID_UNKNOWN == publication_id_) {
524  publication_id_ = id;
525  }
526  }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_Recursive_Thread_Mutex lock_
GUID_t publication_id_
The repository id of this datawriter/publication.

◆ check_transport_qos()

bool OpenDDS::DCPS::DataWriterImpl::check_transport_qos ( const TransportInst inst)
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 2303 of file DataWriterImpl.cpp.

2304 {
2305  // DataWriter does not impose any constraints on which transports
2306  // may be used based on QoS.
2307  return true;
2308 }

◆ cleanup()

void OpenDDS::DCPS::DataWriterImpl::cleanup ( void  )

cleanup the DataWriter.

Definition at line 126 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::NO_STATUS_MASK, set_listener(), topic_servant_, and type_support_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

127 {
128  // As first step set our listener to nill which will prevent us from calling
129  // back onto the listener at the moment the related DDS entity has been
130  // deleted
132  topic_servant_ = 0;
133  type_support_ = 0;
134 }
const DDS::StatusMask NO_STATUS_MASK
virtual DDS::ReturnCode_t set_listener(DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
TypeSupportImpl * type_support_

◆ coherent_changes_pending()

bool OpenDDS::DCPS::DataWriterImpl::coherent_changes_pending ( )

Are coherent changes pending?

Definition at line 2313 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, coherent_, and get_lock().

2314 {
2316  guard,
2317  get_lock(),
2318  false);
2319 
2320  return this->coherent_;
2321 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Recursive_Thread_Mutex & get_lock() const

◆ control_delivered()

void OpenDDS::DCPS::DataWriterImpl::control_delivered ( const Message_Block_Ptr sample)
virtual

This is called by transport to notify that the control message is delivered.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2257 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_delivered().

2258 {
2259  DBG_ENTRY_LVL("DataWriterImpl","control_delivered",6);
2261 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ control_dropped()

void OpenDDS::DCPS::DataWriterImpl::control_dropped ( const Message_Block_Ptr sample,
bool  dropped_by_transport 
)
virtual

This is called by transport to notify that the control message is dropped.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2403 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, and OpenDDS::DCPS::MessageTracker::message_dropped().

2405 {
2406  DBG_ENTRY_LVL("DataWriterImpl","control_dropped",6);
2408 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ create_ack_token()

DataWriterImpl::AckToken OpenDDS::DCPS::DataWriterImpl::create_ack_token ( DDS::Duration_t  max_wait) const

Create an AckToken for ack operations.

Definition at line 1025 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, get_max_sn(), OpenDDS::DCPS::SequenceNumber::getValue(), and LM_DEBUG.

Referenced by wait_for_acknowledgments().

1026 {
1027  const SequenceNumber sn = get_max_sn();
1028  if (DCPS_debug_level > 0) {
1029  ACE_DEBUG((LM_DEBUG,
1030  ACE_TEXT("(%P|%t) DataWriterImpl::create_ack_token() - ")
1031  ACE_TEXT("for sequence %q\n"),
1032  sn.getValue()));
1033  }
1034  return AckToken(max_wait, sn);
1035 }
#define ACE_DEBUG(X)
SequenceNumber get_max_sn() const
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ create_control_message()

ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::create_control_message ( MessageId  message_id,
DataSampleHeader header,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
)
private

This method create a header message block and chain with the registered sample. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is not used for the header.

Definition at line 2070 of file DataWriterImpl.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEBUG, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_GUARD_RETURN, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, get_db_lock(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), get_next_sn_i(), OpenDDS::DCPS::INSTANCE_REGISTRATION, OpenDDS::DCPS::DataSampleHeader::key_fields_only_, LM_DEBUG, ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Time_t::nanosec, need_sequence_repair(), OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, reader_info_, reader_info_lock_, ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::REQUEST_ACK, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), sn_lock_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), ACE_Message_Block::total_length(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and ACE_Time_Value::zero.

Referenced by association_complete_i(), dispose(), dispose_and_unregister(), end_coherent_changes(), register_instance_i(), replay_durable_data_for(), send_liveliness(), send_request_ack(), and unregister_instance_i().

2074 {
2075  header_data.message_id_ = message_id;
2076  header_data.byte_order_ =
2078  header_data.coherent_change_ = false;
2079 
2080  if (data) {
2081  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2082  }
2083 
2084  header_data.sequence_ = SequenceNumber::SEQUENCENUMBER_UNKNOWN();
2085  header_data.sequence_repair_ = false; // set below
2086  header_data.source_timestamp_sec_ = source_timestamp.sec;
2087  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2088  header_data.publication_id_ = publication_id_;
2089 
2090  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2091  if (!publisher) {
2092  return 0;
2093  }
2094 
2095  header_data.publisher_id_ = publisher->publisher_id_;
2096 
2098  SequenceNumber sequence = sequence_number_;
2099  if (message_id == INSTANCE_REGISTRATION
2100  || message_id == DISPOSE_INSTANCE
2101  || message_id == UNREGISTER_INSTANCE
2102  || message_id == DISPOSE_UNREGISTER_INSTANCE
2103  || message_id == REQUEST_ACK) {
2104 
2105  header_data.sequence_repair_ = need_sequence_repair();
2106  header_data.sequence_ = get_next_sn_i();
2107  header_data.key_fields_only_ = true;
2108  sequence = sequence_number_;
2109  }
2110  guard.release();
2111 
2112  ACE_Message_Block* message = 0;
2113  ACE_NEW_MALLOC_RETURN(message,
2114  static_cast<ACE_Message_Block*>(
2115  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2119  header_data.message_length_ ? data.release() : 0, //cont
2120  0, //data
2121  0, //allocator_strategy
2122  get_db_lock(), //locking_strategy
2126  db_allocator_.get(),
2127  mb_allocator_.get()),
2128  0);
2129 
2130  *message << header_data;
2131 
2132  // If we incremented sequence number for this control message
2133  if (header_data.sequence_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
2134  ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, 0);
2135  // Update the expected sequence number for all readers
2136  RepoIdToReaderInfoMap::iterator reader;
2137 
2138  for (reader = reader_info_.begin(); reader != reader_info_.end(); ++reader) {
2139  reader->second.expected_sequence_ = sequence;
2140  }
2141  }
2142  if (DCPS_debug_level >= 4) {
2143  ACE_DEBUG((LM_DEBUG,
2144  ACE_TEXT("(%P|%t) DataWriterImpl::create_control_message: ")
2145  ACE_TEXT("from publication %C sending control sample: %C .\n"),
2146  LogGuid(publication_id_).c_str(),
2147  to_string(header_data).c_str()));
2148  }
2149  return message;
2150 }
#define ACE_DEBUG(X)
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
static const ACE_Time_Value max_time
DataBlockLockPool::DataBlockLock * get_db_lock()
#define ACE_CDR_BYTE_ORDER
RepoIdToReaderInfoMap reader_info_
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
ACE_TEXT("TCP_Factory")
unsigned long nanosec
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
static const ACE_Time_Value zero
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
ACE_Thread_Mutex reader_info_lock_
const char * to_string(MessageId value)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
GUID_t publication_id_
The repository id of this datawriter/publication.
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.

◆ create_sample_data_message()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::create_sample_data_message ( Message_Block_Ptr  data,
DDS::InstanceHandle_t  instance_handle,
DataSampleHeader header_data,
Message_Block_Ptr message,
const DDS::Time_t source_timestamp,
bool  content_filter 
)

This method create a header message block and chain with the sample data. The header contains the information needed. e.g. message id, length of whole message... The fast allocator is used to allocate the message block, data block and header.

Definition at line 2153 of file DataWriterImpl.cpp.

References ACE_CDR_BYTE_ORDER, ACE_DEBUG, ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_ERROR_RETURN, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, coherent_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::LifespanQosPolicy::duration, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, get_db_lock(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), get_next_sn_i(), OpenDDS::DCPS::DataSampleHeader::group_coherent_, DDS::GROUP_PRESENTATION_QOS, header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, LM_DEBUG, LM_ERROR, ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Duration_t::nanosec, DDS::Time_t::nanosec, need_sequence_repair(), OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_, OpenDDS::DCPS::DataSampleHeader::publisher_id_, publisher_servant_, qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Duration_t::sec, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, sn_lock_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, OpenDDS::DCPS::TransportClient::swap_bytes(), OpenDDS::DCPS::to_string(), ACE_Message_Block::total_length(), and ACE_Time_Value::zero.

Referenced by write().

2159 {
2160  PublicationInstance_rch instance =
2161  data_container_->get_handle_instance(instance_handle);
2162 
2163  if (0 == instance) {
2164  ACE_ERROR_RETURN((LM_ERROR,
2165  ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message ")
2166  ACE_TEXT("failed to find instance for handle %d\n"),
2167  instance_handle),
2169  }
2170 
2171  header_data.message_id_ = SAMPLE_DATA;
2172  header_data.byte_order_ =
2174  header_data.coherent_change_ = this->coherent_;
2175 
2176  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2177 
2178  if (!publisher) {
2179  return DDS::RETCODE_ERROR;
2180  }
2181 
2182 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
2183  header_data.group_coherent_ =
2184  publisher->qos_.presentation.access_scope
2186 #endif
2187  header_data.content_filter_ = content_filter;
2188  header_data.cdr_encapsulation_ = this->cdr_encapsulation();
2189  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
2190  {
2192  header_data.sequence_repair_ = need_sequence_repair();
2193  header_data.sequence_ = get_next_sn_i();
2194  }
2195  header_data.source_timestamp_sec_ = source_timestamp.sec;
2196  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
2197 
2200  header_data.lifespan_duration_ = true;
2201  header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
2202  header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
2203  }
2204 
2205  header_data.publication_id_ = publication_id_;
2206  header_data.publisher_id_ = publisher->publisher_id_;
2207 
2208  ACE_Message_Block* tmp_message;
2209  ACE_NEW_MALLOC_RETURN(tmp_message,
2210  static_cast<ACE_Message_Block*>(
2211  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2214  data.release(), //cont
2215  0, //data
2216  header_allocator_.get(), //alloc_strategy
2217  get_db_lock(), //locking_strategy
2221  db_allocator_.get(),
2222  mb_allocator_.get()),
2224  message.reset(tmp_message);
2225  *message << header_data;
2226  if (DCPS_debug_level >= 4) {
2227  ACE_DEBUG((LM_DEBUG,
2228  ACE_TEXT("(%P|%t) DataWriterImpl::create_sample_data_message: ")
2229  ACE_TEXT("from publication %C sending data sample: %C .\n"),
2230  LogGuid(publication_id_).c_str(),
2231  to_string(header_data).c_str()));
2232  }
2233  return DDS::RETCODE_OK;
2234 }
RcHandle< PublicationInstance > PublicationInstance_rch
#define ACE_DEBUG(X)
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
static const ACE_Time_Value max_time
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
DataBlockLockPool::DataBlockLock * get_db_lock()
#define ACE_CDR_BYTE_ORDER
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
ACE_TEXT("TCP_Factory")
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unsigned long nanosec
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
static const ACE_Time_Value zero
const ReturnCode_t RETCODE_ERROR
LifespanQosPolicy lifespan
const ReturnCode_t RETCODE_OK
const char * to_string(MessageId value)
#define ACE_ERROR_RETURN(X, Y)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.

◆ data_allocator()

DataAllocator* OpenDDS::DCPS::DataWriterImpl::data_allocator ( ) const
inline

Definition at line 134 of file DataWriterImpl.h.

References dispose(), init(), OpenDDS::DCPS::OPENDDS_VECTOR(), and write().

135  {
136  return data_allocator_.get();
137  }
unique_ptr< DataAllocator > data_allocator_

◆ data_delivered()

void OpenDDS::DCPS::DataWriterImpl::data_delivered ( const DataSampleElement sample)
virtual

This is called by transport to notify that the sample is delivered and it is delegated to WriteDataContainer to adjust the internal data sample threads.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2237 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_container_, data_delivered_count_, DBG_ENTRY_LVL, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, and publication_id_.

2238 {
2239  DBG_ENTRY_LVL("DataWriterImpl","data_delivered",6);
2240 
2241  if (!(sample->get_pub_id() == this->publication_id_)) {
2242  ACE_ERROR((LM_ERROR,
2243  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::data_delivered: ")
2244  ACE_TEXT("The publication id %C from delivered element ")
2245  ACE_TEXT("does not match the datawriter's id %C\n"),
2246  LogGuid(sample->get_pub_id()).c_str(),
2247  LogGuid(publication_id_).c_str()));
2248  return;
2249  }
2250  //provided for statistics tracking in tests
2252 
2253  this->data_container_->data_delivered(sample);
2254 }
#define ACE_ERROR(X)
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ data_dropped()

void OpenDDS::DCPS::DataWriterImpl::data_dropped ( const DataSampleElement element,
bool  dropped_by_transport 
)
virtual

This mothod is called by transport to notify the instance sample is dropped and it delegates to WriteDataContainer to update the internal list.

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2391 of file DataWriterImpl.cpp.

References data_container_, data_dropped_count_, and DBG_ENTRY_LVL.

2393 {
2394  DBG_ENTRY_LVL("DataWriterImpl","data_dropped",6);
2395 
2396  //provided for statistics tracking in tests
2398 
2399  this->data_container_->data_dropped(element, dropped_by_transport);
2400 }
Atomic< int > data_dropped_count_
Statistics counter.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ dispose()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to dispose all data samples for a given instance and tell the transport to broadcast the disposed instance.

Definition at line 1983 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_INSTANCE, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by dispose_w_timestamp().

1985 {
1986  DBG_ENTRY_LVL("DataWriterImpl","dispose",6);
1987 
1988  if (!enabled_) {
1989  ACE_ERROR_RETURN((LM_ERROR,
1990  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::dispose: ")
1991  ACE_TEXT("Entity is not enabled.\n")),
1993  }
1994 
1996 
1998 
1999  Message_Block_Ptr registered_sample_data;
2000  ret = this->data_container_->dispose(handle, registered_sample_data);
2001 
2002  if (ret != DDS::RETCODE_OK) {
2003  ACE_ERROR_RETURN((LM_ERROR,
2004  ACE_TEXT("(%P|%t) ERROR: ")
2005  ACE_TEXT("DataWriterImpl::dispose: ")
2006  ACE_TEXT("dispose failed.\n")),
2007  ret);
2008  }
2009 
2010  DataSampleElement* element = 0;
2011  ret = this->data_container_->obtain_buffer_for_control(element);
2012 
2013  if (ret != DDS::RETCODE_OK) {
2014  ACE_ERROR_RETURN((LM_ERROR,
2015  ACE_TEXT("(%P|%t) ERROR: ")
2016  ACE_TEXT("DataWriterImpl::dispose: ")
2017  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
2018  ret),
2019  ret);
2020  }
2021 
2023  element->get_header(),
2024  move(registered_sample_data),
2025  source_timestamp));
2026  element->set_sample(move(sample));
2027 
2028  ret = this->data_container_->enqueue_control(element);
2029 
2030  if (ret != DDS::RETCODE_OK) {
2031  data_container_->release_buffer(element);
2032  ACE_ERROR_RETURN((LM_ERROR,
2033  ACE_TEXT("(%P|%t) ERROR: ")
2034  ACE_TEXT("DataWriterImpl::dispose: ")
2035  ACE_TEXT("enqueue_control failed.\n")),
2036  ret);
2037  }
2038 
2040 
2041  return DDS::RETCODE_OK;
2042 }
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ dispose_and_unregister()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_and_unregister ( DDS::InstanceHandle_t  handle,
const DDS::Time_t timestamp 
)
private

Definition at line 1750 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DISPOSE_UNREGISTER_INSTANCE, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by unregister_instance_i().

1752 {
1753  DBG_ENTRY_LVL("DataWriterImpl", "dispose_and_unregister", 6);
1754 
1757 
1758  Message_Block_Ptr data_sample;
1759  ret = this->data_container_->dispose(handle, data_sample);
1760 
1761  if (ret != DDS::RETCODE_OK) {
1762  ACE_ERROR_RETURN((LM_ERROR,
1763  ACE_TEXT("(%P|%t) ERROR: ")
1764  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1765  ACE_TEXT("dispose on container failed.\n")),
1766  ret);
1767  }
1768 
1769  ret = this->data_container_->unregister(handle, data_sample, false);
1770 
1771  if (ret != DDS::RETCODE_OK) {
1772  ACE_ERROR_RETURN((LM_ERROR,
1773  ACE_TEXT("(%P|%t) ERROR: ")
1774  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1775  ACE_TEXT("unregister with container failed.\n")),
1776  ret);
1777  }
1778 
1779  DataSampleElement* element = 0;
1780  ret = this->data_container_->obtain_buffer_for_control(element);
1781 
1782  if (ret != DDS::RETCODE_OK) {
1783  ACE_ERROR_RETURN((LM_ERROR,
1784  ACE_TEXT("(%P|%t) ERROR: ")
1785  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1786  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1787  ret),
1788  ret);
1789  }
1790 
1792  element->get_header(),
1793  move(data_sample),
1794  source_timestamp));
1795  element->set_sample(move(sample));
1796 
1797  ret = this->data_container_->enqueue_control(element);
1798 
1799  if (ret != DDS::RETCODE_OK) {
1800  data_container_->release_buffer(element);
1801  ACE_ERROR_RETURN((LM_ERROR,
1802  ACE_TEXT("(%P|%t) ERROR: ")
1803  ACE_TEXT("DataWriterImpl::dispose_and_unregister: ")
1804  ACE_TEXT("enqueue_control failed.\n")),
1805  ret);
1806  }
1807 
1809  return DDS::RETCODE_OK;
1810 }
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ dispose_w_timestamp()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::dispose_w_timestamp ( const Sample sample,
DDS::InstanceHandle_t  instance_handle,
const DDS::Time_t source_timestamp 
)

Definition at line 2942 of file DataWriterImpl.cpp.

References ACE_ERROR, DDS::Security::SecurityException::code, dispose(), dynamic_type_, OpenDDS::DCPS::Sample::get_dynamic_data(), DDS::HANDLE_NIL, instance_must_exist(), LM_NOTICE, OpenDDS::DCPS::log_level, DDS::Security::SecurityException::message, DDS::Security::SecurityException::minor_code, OpenDDS::DCPS::LogLevel::Notice, participant_permissions_handle_, DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY, DDS::RETCODE_OK, and security_config_.

Referenced by OpenDDS::XTypes::DynamicDataWriterImpl::dispose_w_timestamp(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::dispose_w_timestamp().

2946 {
2947 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
2948  DDS::DynamicData_var dynamic_data = sample.get_dynamic_data(dynamic_type_);
2950  if (dynamic_data && security_config_ &&
2952  !security_config_->get_access_control()->check_local_datawriter_dispose_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
2953  if (log_level >= LogLevel::Notice) {
2954  ACE_ERROR((LM_NOTICE,
2955  "(%P|%t) NOTICE: DataWriterImpl::dispose_w_timestamp: unable to dispose instance SecurityException[%d.%d]: %C\n",
2956  ex.code, ex.minor_code, ex.message.in()));
2957  }
2959  }
2960 #endif
2961 
2963  "dispose_w_timestamp", sample, instance_handle);
2964  if (rc != DDS::RETCODE_OK) {
2965  return rc;
2966  }
2967  return dispose(instance_handle, source_timestamp);
2968 }
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
DDS::Security::PermissionsHandle participant_permissions_handle_
DDS::ReturnCode_t dispose(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
Security::SecurityConfig_rch security_config_
DDS::ReturnCode_t instance_must_exist(const char *method_name, const Sample &sample, DDS::InstanceHandle_t &instance_handle, bool remove=false)
OpenDDS_Dcps_Export LogLevel log_level
DDS::DynamicType_var dynamic_type_
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY

◆ domain_id()

DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id ( ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 716 of file DataWriterImpl.h.

Referenced by transport_discovery_change().

717  {
718  return this->domain_id_;
719  }
DDS::DomainId_t domain_id_
The domain id.

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::enable ( )
virtual

Implements DDS::Entity.

Definition at line 1314 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TypeSupportImpl::add_types(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::TransportClient::connection_info(), data_container_, db_allocator_, OpenDDS::DCPS::DCPS_debug_level, DDS::DataWriterQos::deadline, DDS::HistoryQosPolicy::depth, domain_id_, dp_id_, DDS::DataWriterQos::durability, DDS::DataWriterQos::durability_service, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, dynamic_type_, OpenDDS::DCPS::Observer::e_ENABLED, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::EntityImpl::get_observer(), OpenDDS::DCPS::TypeSupportImpl::get_type(), get_type_name(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, DDS::DataWriterQos::history, if(), TAO::String_var< charT >::in(), OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::KEEP_ALL_HISTORY_QOS, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, DDS::HistoryQosPolicy::kind, last_deadline_missed_total_count_, DDS::LivelinessQosPolicy::lease_duration, DDS::LENGTH_UNLIMITED, DDS::DataWriterQos::lifespan, DDS::DataWriterQos::liveliness, liveliness_check_interval_, liveness_timer_, LM_DEBUG, LM_ERROR, LM_WARNING, lock_, DDS::ReliabilityQosPolicy::max_blocking_time, DDS::ResourceLimitsQosPolicy::max_instances, DDS::ResourceLimitsQosPolicy::max_samples, DDS::ResourceLimitsQosPolicy::max_samples_per_instance, mb_allocator_, monitor_, n_chunks_, DDS::Duration_t::nanosec, offered_deadline_missed_status_, OpenDDS::DCPS::Observer::on_enabled(), participant_permissions_handle_, participant_servant_, DDS::DeadlineQosPolicy::period, publication_id_, publisher_servant_, qos_, OpenDDS::DCPS::rchandle_from(), reactor_, ACE_Guard< ACE_LOCK >::release(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataWriterQos::representation, DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, ACE_Reactor_Timer_Interface::schedule_timer(), DDS::Duration_t::sec, security_config_, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::set_writer_effective_data_rep_qos(), setup_serialization(), TheServiceParticipant, OpenDDS::DCPS::TypeSupportImpl::to_type_info(), topic_name_, topic_servant_, type_support_, OpenDDS::DCPS::TimeDuration::value(), DDS::DataRepresentationQosPolicy::value, DDS::VOLATILE_DURABILITY_QOS, and WriteDataContainer.

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().

1315 {
1316  //According spec:
1317  // - Calling enable on an already enabled Entity returns OK and has no
1318  // effect.
1319  // - Calling enable on an Entity whose factory is not enabled will fail
1320  // and return PRECONDITION_NOT_MET.
1321 
1322  if (this->is_enabled()) {
1323  return DDS::RETCODE_OK;
1324  }
1325 
1326  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1327  if (!publisher || !publisher->is_enabled()) {
1329  }
1330 
1331  if (!topic_servant_->is_enabled()) {
1333  }
1334 
1335  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
1336  if (participant) {
1337  dp_id_ = participant->get_id();
1338  }
1339 
1340  // Note: do configuration based on QoS in enable() because
1341  // before enable is called the QoS can be changed -- even
1342  // for Changeable=NO
1343 
1344  // Configure WriteDataContainer constructor parameters from qos.
1345 
1346  const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
1347 
1348  CORBA::Long const max_samples_per_instance =
1351 
1352  CORBA::Long max_instances = 0, max_total_samples = 0;
1353 
1356 
1360  (qos_.resource_limits.max_instances * max_samples_per_instance))) {
1361  max_total_samples = reliable ? qos_.resource_limits.max_samples : 0;
1362  }
1363  }
1364 
1366  max_instances = qos_.resource_limits.max_instances;
1367 
1368  const CORBA::Long history_depth =
1371 
1372  const CORBA::Long max_durable_per_instance =
1373  qos_.durability.kind == DDS::VOLATILE_DURABILITY_QOS ? 0 : history_depth;
1374 
1375 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1376  // Get data durability cache if DataWriter QoS requires durable
1377  // samples. Publisher servant retains ownership of the cache.
1378  DataDurabilityCache* const durability_cache =
1379  TheServiceParticipant->get_data_durability_cache(qos_.durability);
1380 #endif
1381 
1382  //Note: the QoS used to set n_chunks_ is Changeable=No so
1383  // it is OK that we cannot change the size of our allocators.
1384  data_container_ = RcHandle<WriteDataContainer>(
1385  new WriteDataContainer(
1386  this,
1387  max_samples_per_instance,
1388  history_depth,
1389  max_durable_per_instance,
1391  n_chunks_,
1392  domain_id_,
1393  topic_name_,
1394  get_type_name(),
1395 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1396  durability_cache,
1398 #endif
1399  max_instances,
1400  max_total_samples,
1401  lock_,
1404  keep_count());
1405 
1406  // +1 because we might allocate one before releasing another
1407  // TBD - see if this +1 can be removed.
1411 
1412  if (DCPS_debug_level >= 2) {
1413  ACE_DEBUG((LM_DEBUG,
1414  "(%P|%t) DataWriterImpl::enable-mb"
1415  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1416  mb_allocator_.get(),
1417  n_chunks_));
1418 
1419  ACE_DEBUG((LM_DEBUG,
1420  "(%P|%t) DataWriterImpl::enable-db"
1421  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1422  db_allocator_.get(),
1423  n_chunks_));
1424 
1425  ACE_DEBUG((LM_DEBUG,
1426  "(%P|%t) DataWriterImpl::enable-header"
1427  " Cached_Allocator_With_Overflow %x with %B chunks\n",
1428  header_allocator_.get(),
1429  n_chunks_));
1430  }
1431 
1434  // Must be at least 1 micro second.
1435  liveliness_check_interval_ = std::max(
1436  TimeDuration(qos_.liveliness.lease_duration) * (TheServiceParticipant->liveliness_factor() / 100.0),
1437  TimeDuration(0, 1));
1438 
1440  0,
1442  liveliness_check_interval_.value()) == -1) {
1443  ACE_ERROR((LM_ERROR,
1444  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: %p.\n"),
1445  ACE_TEXT("schedule_timer")));
1446 
1447  }
1448  }
1449 
1450  if (!participant) {
1451  return DDS::RETCODE_ERROR;
1452  }
1453 
1454  participant->add_adjust_liveliness_timers(this);
1455 
1456  data_container_->set_deadline_period(TimeDuration(qos_.deadline.period));
1457 
1458  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
1459  disco->pre_writer(this);
1460 
1461  this->set_enabled();
1462 
1463  try {
1464  this->enable_transport(reliable,
1466 
1467  } catch (const Transport::Exception&) {
1468  ACE_ERROR((LM_ERROR,
1469  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable, ")
1470  ACE_TEXT("Transport Exception.\n")));
1471  data_container_->shutdown_ = true;
1472  return DDS::RETCODE_ERROR;
1473  }
1474 
1475  // Must be done after transport enabled.
1477  if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
1478  data_container_->shutdown_ = true;
1479  return DDS::RETCODE_ERROR;
1480  }
1481 
1482  // Done after enable_transport so we know its swap_bytes.
1483  const DDS::ReturnCode_t setup_serialization_result = setup_serialization();
1484  if (setup_serialization_result != DDS::RETCODE_OK) {
1485  data_container_->shutdown_ = true;
1486  return setup_serialization_result;
1487  }
1488 
1489  const TransportLocatorSeq& trans_conf_info = connection_info();
1490  DDS::PublisherQos pub_qos;
1491  publisher->get_qos(pub_qos);
1492 
1493  XTypes::TypeInformation type_info;
1494  type_support_->to_type_info(type_info);
1495 
1496  XTypes::TypeLookupService_rch type_lookup_service = participant->get_type_lookup_service();
1497  type_support_->add_types(type_lookup_service);
1498 
1499  const GUID_t publication_id =
1500  disco->add_publication(this->domain_id_,
1501  this->dp_id_,
1502  this->topic_servant_->get_id(),
1503  rchandle_from(this),
1504  this->qos_,
1505  trans_conf_info,
1506  pub_qos,
1507  type_info);
1508 
1510  publication_id_ = publication_id;
1511 
1512  if (publication_id_ == GUID_UNKNOWN) {
1513  if (DCPS_debug_level >= 1) {
1514  ACE_DEBUG((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::enable: "
1515  "add_publication failed\n"));
1516  }
1517  data_container_->shutdown_ = true;
1518  return DDS::RETCODE_ERROR;
1519  }
1520 
1521 #if defined(OPENDDS_SECURITY)
1522  security_config_ = participant->get_security_config();
1523  participant_permissions_handle_ = participant->permissions_handle();
1525 #endif
1526 
1527  if (DCPS_debug_level >= 2) {
1528  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::enable: "
1529  "got GUID %C, publishing to topic name \"%C\" type \"%C\"\n",
1530  LogGuid(publication_id_).c_str(),
1531  topic_servant_->topic_name(), topic_servant_->type_name()));
1532  }
1533 
1534  this->data_container_->publication_id_ = this->publication_id_;
1535 
1536  guard.release();
1537 
1538  const DDS::ReturnCode_t writer_enabled_result =
1539  publisher->writer_enabled(topic_name_.in(), this);
1540 
1541  if (this->monitor_) {
1542  this->monitor_->report();
1543  }
1544 
1545 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
1546 
1547  // Move cached data from the durability cache to the unsent data
1548  // queue.
1549  if (durability_cache != 0) {
1550 
1551  if (!durability_cache->get_data(this->domain_id_,
1552  this->topic_name_,
1553  get_type_name(),
1554  this,
1555  this->mb_allocator_.get(),
1556  this->db_allocator_.get(),
1557  this->qos_.lifespan)) {
1558  ACE_ERROR((LM_ERROR,
1559  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::enable: ")
1560  ACE_TEXT("unable to retrieve durable data\n")));
1561  }
1562  }
1563 
1564 #endif
1565 
1566  if (writer_enabled_result == DDS::RETCODE_OK) {
1567  const Observer_rch observer = get_observer(Observer::e_ENABLED);
1568  if (observer) {
1569  observer->on_enabled(this);
1570  }
1571  }
1572 
1573  return writer_enabled_result;
1574 }
#define ACE_DEBUG(X)
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
HistoryQosPolicy history
ACE_CDR::Long Long
size_t n_chunks_
The number of chunks for the cached allocator.
#define ACE_ERROR(X)
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
CORBA::String_var topic_name_
The name of associated topic.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
void enable_transport(bool reliable, bool durable)
ReliabilityQosPolicy reliability
if(!(yy_init))
const TransportLocatorSeq & connection_info() const
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
DurabilityQosPolicy durability
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
HistoryQosPolicyKind kind
DDS::Security::PermissionsHandle participant_permissions_handle_
unique_ptr< DataSampleHeaderAllocator > header_allocator_
The header data allocator.
DeadlineQosPolicy deadline
const ACE_Time_Value & value() const
sequence< TransportLocator > TransportLocatorSeq
DataRepresentationQosPolicy representation
ACE_Reactor_Timer_Interface * reactor_
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
unique_ptr< Monitor > monitor_
Monitor object for this entity.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
Security::SecurityConfig_rch security_config_
RcHandle< LivenessTimer > liveness_timer_
DurabilityServiceQosPolicy durability_service
ReliabilityQosPolicyKind kind
DurabilityQosPolicyKind kind
WeakRcHandle< DomainParticipantImpl > participant_servant_
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
void add_types(const XTypes::TypeLookupService_rch &tls) const
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
virtual DDS::DynamicType_ptr get_type() const
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
ResourceLimitsQosPolicy resource_limits
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
DDS::ReturnCode_t setup_serialization()
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
char const * get_type_name() const
ACE_TEXT("TCP_Factory")
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
void to_type_info(XTypes::TypeInformation &type_info) const
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
const ReturnCode_t RETCODE_ERROR
DCPS::RcHandle< TypeLookupService > TypeLookupService_rch
LifespanQosPolicy lifespan
DDS::DynamicType_var dynamic_type_
const ReturnCode_t RETCODE_OK
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
const long LENGTH_UNLIMITED
ACE_Recursive_Thread_Mutex lock_
LivelinessQosPolicy liveliness
const character_type * in(void) const
DataRepresentationIdSeq value
#define TheServiceParticipant
CORBA::Long last_deadline_missed_total_count_
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.

◆ end_coherent_changes()

void OpenDDS::DCPS::DataWriterImpl::end_coherent_changes ( const GroupCoherentSamples &  group_samples)

Ends a coherent change set; should only be called once.

Definition at line 2334 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_GUARD, ACE_TEXT(), coherent_, OpenDDS::DCPS::CoherentChangeControl::coherent_samples_, coherent_samples_, create_control_message(), OpenDDS::DCPS::END_COHERENT_CHANGES, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), OpenDDS::DCPS::CoherentChangeControl::get_max_serialized_size(), get_max_sn(), OpenDDS::DCPS::CoherentChangeControl::group_coherent_, OpenDDS::DCPS::CoherentChangeControl::group_coherent_samples_, DDS::GROUP_PRESENTATION_QOS, header, OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, OpenDDS::DCPS::WriterCoherentSample::last_sample_, LM_ERROR, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::move(), OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OpenDDS::DCPS::WriterCoherentSample::num_samples_, OpenDDS::DCPS::CoherentChangeControl::publisher_id_, publisher_servant_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, and OpenDDS::DCPS::TransportClient::swap_bytes().

2335 {
2336  // PublisherImpl::pi_lock_ should be held.
2338  guard,
2339  get_lock());
2340 
2341  CoherentChangeControl end_msg;
2342  end_msg.coherent_samples_.num_samples_ = this->coherent_samples_;
2343  end_msg.coherent_samples_.last_sample_ = get_max_sn();
2344 
2345  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2346 
2347  if (publisher) {
2348  end_msg.group_coherent_
2349  = publisher->qos_.presentation.access_scope == DDS::GROUP_PRESENTATION_QOS;
2350  }
2351 
2352  if (publisher && end_msg.group_coherent_) {
2353  end_msg.publisher_id_ = publisher->publisher_id_;
2354  end_msg.group_coherent_samples_ = group_samples;
2355  }
2356 
2357  Message_Block_Ptr data(
2358  new ACE_Message_Block(
2359  end_msg.get_max_serialized_size(),
2361  0, // cont
2362  0, // data
2363  0, // alloc_strategy
2364  get_db_lock()));
2365 
2366  Serializer serializer(data.get(), Encoding::KIND_UNALIGNED_CDR,
2367  this->swap_bytes());
2368 
2369  serializer << end_msg;
2370 
2371  DataSampleHeader header;
2372  Message_Block_Ptr control(
2374  END_COHERENT_CHANGES, header, move(data),
2375  SystemTimePoint::now().to_dds_time()));
2376 
2377  this->coherent_ = false;
2378  this->coherent_samples_ = 0;
2379 
2380  guard.release();
2381  if (this->send_control(header, move(control)) == SEND_CONTROL_ERROR) {
2382  ACE_ERROR((LM_ERROR,
2383  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::end_coherent_changes:")
2384  ACE_TEXT(" unable to send END_COHERENT_CHANGES control message!\n")));
2385  }
2386 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DataBlockLockPool::DataBlockLock * get_db_lock()
SequenceNumber get_max_sn() const
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
virtual SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ filter_out()

bool OpenDDS::DCPS::DataWriterImpl::filter_out ( const DataSampleElement elt,
const OPENDDS_STRING filterClassName,
const FilterEvaluator evaluator,
const DDS::StringSeq expression_params 
) const

Definition at line 2271 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_Message_Block::cont(), OpenDDS::DCPS::DataWriterImpl::EncodingMode::encoding(), encoding_mode_, OpenDDS::DCPS::LogLevel::Error, OpenDDS::DCPS::FilterEvaluator::eval(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::FilterEvaluator::has_non_key_fields(), LM_ERROR, OpenDDS::DCPS::log_level, type_support_, and OpenDDS::DCPS::DataSampleHeader::valid_data().

Referenced by OpenDDS::DCPS::WriteDataContainer::copy_and_prepend(), and write_w_timestamp().

2275 {
2276  if (!type_support_) {
2277  if (log_level >= LogLevel::Error) {
2278  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::filter_out: Could not cast type support, not filtering\n"));
2279  }
2280  return false;
2281  }
2282 
2283  if (filterClassName == "DDSSQL" ||
2284  filterClassName == "OPENDDSSQL") {
2285  if (!elt.get_header().valid_data() && evaluator.has_non_key_fields(*type_support_)) {
2286  return true;
2287  }
2288  try {
2289  return !evaluator.eval(elt.get_sample()->cont(), encoding_mode_.encoding(),
2290  *type_support_, expression_params);
2291  } catch (const std::runtime_error&) {
2292  // if the eval fails, the throws will do the logging
2293  // return false here so that the sample is not filtered
2294  return false;
2295  }
2296  } else {
2297  return false;
2298  }
2299 }
#define ACE_ERROR(X)
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
TypeSupportImpl * type_support_
OpenDDS_Dcps_Export LogLevel log_level

◆ find_instance()

DataWriterImpl::InstanceValuesToHandles::iterator OpenDDS::DCPS::DataWriterImpl::find_instance ( const Sample sample)
private

Definition at line 3068 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::RcHandle< T >::_retn(), and instance_values_to_handles_.

Referenced by instance_must_exist(), and lookup_instance().

3069 {
3070  Sample_rch dummy_rch(const_cast<Sample*>(&sample), keep_count());
3071  InstanceValuesToHandles::iterator pos = instance_values_to_handles_.find(dummy_rch);
3072  dummy_rch._retn();
3073  return pos;
3074 }
RcHandle< Sample > Sample_rch
Definition: Sample.h:33
InstanceValuesToHandles instance_values_to_handles_

◆ get_builtin_subscriber_proxy()

RcHandle< BitSubscriber > OpenDDS::DCPS::DataWriterImpl::get_builtin_subscriber_proxy ( ) const
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 201 of file DataWriterImpl.cpp.

References participant_servant_.

202 {
203  RcHandle<DomainParticipantImpl> participant_servant = participant_servant_.lock();
204  if (participant_servant) {
205  return participant_servant->get_builtin_subscriber_proxy();
206  }
207 
208  return RcHandle<BitSubscriber>();
209 }
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_crypto_handle()

DDS::Security::ParticipantCryptoHandle OpenDDS::DCPS::DataWriterImpl::get_crypto_handle ( ) const
privatevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 2739 of file DataWriterImpl.cpp.

References DDS::HANDLE_NIL, and participant_servant_.

2740 {
2741  RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
2742  return participant ? participant->crypto_handle() : DDS::HANDLE_NIL;
2743 }
const InstanceHandle_t HANDLE_NIL
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_db_lock()

DataBlockLockPool::DataBlockLock* OpenDDS::DCPS::DataWriterImpl::get_db_lock ( )
inline

◆ get_dp_id()

GUID_t OpenDDS::DCPS::DataWriterImpl::get_dp_id ( )

Accessor of the repository id of the domain participant.

Definition at line 2058 of file DataWriterImpl.cpp.

References dp_id_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

2059 {
2060  return dp_id_;
2061 }

◆ get_ext_listener()

DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_ext_listener ( )
protected

Definition at line 1003 of file DataWriterImpl.cpp.

References listener_, and listener_mutex_.

Referenced by notify_publication_disconnected(), notify_publication_lost(), and notify_publication_reconnected().

1004 {
1006  return DataWriterListener::_narrow(listener_.in());
1007 }
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.

◆ get_guid()

GUID_t OpenDDS::DCPS::DataWriterImpl::get_guid ( ) const
inlinevirtual

◆ get_handle_instance()

PublicationInstance_rch OpenDDS::DCPS::DataWriterImpl::get_handle_instance ( DDS::InstanceHandle_t  handle)

Attempt to locate an existing instance for the given handle.

Definition at line 2562 of file DataWriterImpl.cpp.

References data_container_.

Referenced by get_or_create_instance_handle().

2563 {
2564 
2565  if (0 != data_container_) {
2566  return data_container_->get_handle_instance(handle);
2567  }
2568 
2569  return PublicationInstance_rch();
2570 }
RcHandle< PublicationInstance > PublicationInstance_rch
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ get_ice_endpoint()

WeakRcHandle< ICE::Endpoint > OpenDDS::DCPS::DataWriterImpl::get_ice_endpoint ( )
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 2782 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::get_ice_endpoint().

2783 {
2785 }
WeakRcHandle< ICE::Endpoint > get_ice_endpoint()

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 176 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::EntityImpl::get_entity_instance_handle(), participant_servant_, and publication_id_.

177 {
178  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
179  return get_entity_instance_handle(publication_id_, participant);
180 }
WeakRcHandle< DomainParticipantImpl > participant_servant_
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
GUID_t publication_id_
The repository id of this datawriter/publication.

◆ get_instance_handles()

void OpenDDS::DCPS::DataWriterImpl::get_instance_handles ( InstanceHandleVec &  instance_handles)

Definition at line 2715 of file DataWriterImpl.cpp.

References data_container_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

2716 {
2717  this->data_container_->get_instance_handles(instance_handles);
2718 }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ get_key_value()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_key_value ( Sample_rch sample,
DDS::InstanceHandle_t  handle 
)

Definition at line 2898 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, get_lock(), instance_handles_to_values_, OpenDDS::DCPS::Sample::Mutable, DDS::RETCODE_BAD_PARAMETER, DDS::RETCODE_ERROR, and DDS::RETCODE_OK.

Referenced by OpenDDS::XTypes::DynamicDataWriterImpl::get_key_value(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::get_key_value().

2899 {
2901  const InstanceHandlesToValues::iterator it = instance_handles_to_values_.find(handle);
2902  if (it == instance_handles_to_values_.end()) {
2904  }
2905  sample = it->second->copy(Sample::Mutable);
2906  return DDS::RETCODE_OK;
2907 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
InstanceHandlesToValues instance_handles_to_values_
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
const ReturnCode_t RETCODE_BAD_PARAMETER

◆ get_listener()

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::get_listener ( )
virtual

Implements DDS::DataWriter.

Definition at line 996 of file DataWriterImpl.cpp.

References listener_, and listener_mutex_.

997 {
999  return DDS::DataWriterListener::_duplicate(listener_.in());
1000 }
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.

◆ get_liveliness_lost_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_liveliness_lost_status ( DDS::LivelinessLostStatus status)
virtual

Definition at line 1121 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::LivelinessLostStatus::total_count_change.

1123 {
1125  guard,
1126  this->lock_,
1129  status = liveliness_lost_status_;
1131  return DDS::RETCODE_OK;
1132 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind LIVELINESS_LOST_STATUS
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_

◆ get_lock()

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::DataWriterImpl::get_lock ( ) const
inline

◆ get_marshal_skip_serialize()

bool OpenDDS::DCPS::DataWriterImpl::get_marshal_skip_serialize ( ) const
inline

Definition at line 129 of file DataWriterImpl.h.

130  {
131  return skip_serialize_;
132  }

◆ get_matched_subscription_data()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscription_data ( DDS::SubscriptionBuiltinTopicData subscription_data,
DDS::InstanceHandle_t  subscription_handle 
)
virtual

Definition at line 1281 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::BUILT_IN_SUBSCRIPTION_TOPIC, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, participant_servant_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

1284 {
1285  if (!enabled_) {
1286  ACE_ERROR_RETURN((LM_ERROR,
1287  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::")
1288  ACE_TEXT("get_matched_subscription_data: ")
1289  ACE_TEXT("Entity is not enabled.\n")),
1291  }
1292  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
1293 
1295  DDS::SubscriptionBuiltinTopicDataSeq data;
1296 
1297  if (participant) {
1298  ret = instance_handle_to_bit_data<DDS::SubscriptionBuiltinTopicDataDataReader_var>(
1299  participant.in(),
1301  subscription_handle,
1302  data);
1303  }
1304 
1305  if (ret == DDS::RETCODE_OK) {
1306  subscription_data = data[0];
1307  }
1308 
1309  return ret;
1310 }
WeakRcHandle< DomainParticipantImpl > participant_servant_
ACE_TEXT("TCP_Factory")
const char *const BUILT_IN_SUBSCRIPTION_TOPIC
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
#define ACE_ERROR_RETURN(X, Y)

◆ get_matched_subscriptions()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_matched_subscriptions ( DDS::InstanceHandleSeq subscription_handles)
virtual

Definition at line 1248 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::EntityImpl::enabled_, id_to_handle_map_, LM_ERROR, lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

1250 {
1251  if (!enabled_) {
1252  ACE_ERROR_RETURN((LM_ERROR,
1253  ACE_TEXT("(%P|%t) ERROR: ")
1254  ACE_TEXT("DataWriterImpl::get_matched_subscriptions: ")
1255  ACE_TEXT(" Entity is not enabled.\n")),
1257  }
1258 
1260  guard,
1261  this->lock_,
1263 
1264  // Copy out the handles for the current set of subscriptions.
1265  int index = 0;
1266  subscription_handles.length(
1267  static_cast<CORBA::ULong>(this->id_to_handle_map_.size()));
1268 
1269  for (RepoIdToHandleMap::iterator
1270  current = this->id_to_handle_map_.begin();
1271  current != this->id_to_handle_map_.end();
1272  ++current, ++index) {
1273  subscription_handles[index] = current->second;
1274  }
1275 
1276  return DDS::RETCODE_OK;
1277 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_
#define ACE_ERROR_RETURN(X, Y)
RepoIdToHandleMap id_to_handle_map_

◆ get_max_sn()

SequenceNumber OpenDDS::DCPS::DataWriterImpl::get_max_sn ( ) const
inlinevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 494 of file DataWriterImpl.h.

Referenced by association_complete_i(), create_ack_token(), end_coherent_changes(), and track_sequence_number().

495  {
497  return sequence_number_;
498  }
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.

◆ get_next_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::get_next_handle ( )

Get an instance handle for a new instance.

Definition at line 183 of file DataWriterImpl.cpp.

References DDS::HANDLE_NIL, and participant_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::register_instance().

184 {
185  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
186  if (participant) {
187  return participant->assign_handle();
188  }
189  return DDS::HANDLE_NIL;
190 }
const InstanceHandle_t HANDLE_NIL
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ get_next_sn()

SequenceNumber OpenDDS::DCPS::DataWriterImpl::get_next_sn ( )
inlineprotected

Definition at line 528 of file DataWriterImpl.h.

529  {
531  return get_next_sn_i();
532  }
ACE_Thread_Mutex sn_lock_
Mutex for sequence_number_.

◆ get_next_sn_i()

SequenceNumber OpenDDS::DCPS::DataWriterImpl::get_next_sn_i ( )
inlineprotected

Definition at line 534 of file DataWriterImpl.h.

References OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN().

Referenced by create_control_message(), and create_sample_data_message().

535  {
537  sequence_number_ = SequenceNumber();
538  } else {
540  }
541  return sequence_number_;
542  }
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.

◆ get_offered_deadline_missed_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_deadline_missed_status ( DDS::OfferedDeadlineMissedStatus status)
virtual

Definition at line 1135 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, last_deadline_missed_total_count_, lock_, DDS::OFFERED_DEADLINE_MISSED_STATUS, offered_deadline_missed_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), DDS::OfferedDeadlineMissedStatus::total_count, and DDS::OfferedDeadlineMissedStatus::total_count_change.

1137 {
1139  guard,
1140  this->lock_,
1142 
1144 
1148 
1149  // Update for next status check.
1152 
1154 
1156 
1157  return DDS::RETCODE_OK;
1158 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
DDS::OfferedDeadlineMissedStatus offered_deadline_missed_status_
const StatusKind OFFERED_DEADLINE_MISSED_STATUS
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_
CORBA::Long last_deadline_missed_total_count_

◆ get_offered_incompatible_qos_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_offered_incompatible_qos_status ( DDS::OfferedIncompatibleQosStatus status)
virtual

Definition at line 1161 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, lock_, DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::OfferedIncompatibleQosStatus::total_count_change.

1163 {
1165  guard,
1166  this->lock_,
1171  return DDS::RETCODE_OK;
1172 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind OFFERED_INCOMPATIBLE_QOS_STATUS
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_

◆ get_or_create_instance_handle()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_or_create_instance_handle ( DDS::InstanceHandle_t handle,
const Sample sample,
const DDS::Time_t source_timestamp 
)
protected

Definition at line 3076 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, DDS::Security::SecurityException::code, OpenDDS::DCPS::Sample::copy(), OpenDDS::XTypes::copy(), dynamic_type_, get_handle_instance(), get_lock(), get_type_support(), DDS::HANDLE_NIL, insert_instance(), OpenDDS::DCPS::Sample::KeyOnly, LM_NOTICE, OpenDDS::DCPS::log_level, lookup_instance(), DDS::Security::SecurityException::message, DDS::Security::SecurityException::minor_code, OpenDDS::DCPS::move(), OpenDDS::DCPS::TypeSupportImpl::name(), OpenDDS::DCPS::LogLevel::Notice, participant_permissions_handle_, OpenDDS::DCPS::Sample::ReadOnly, register_instance_i(), DDS::RETCODE_ERROR, DDS::Security::RETCODE_NOT_ALLOWED_BY_SECURITY, DDS::RETCODE_OK, security_config_, send_all_to_flush_control(), and serialize_sample().

Referenced by register_instance_w_timestamp(), and write_w_timestamp().

3080 {
3082 
3083  handle = lookup_instance(sample);
3084  if (handle == DDS::HANDLE_NIL || !get_handle_instance(handle)) {
3086 #if defined(OPENDDS_SECURITY) && OPENDDS_HAS_DYNAMIC_DATA_ADAPTER
3087  DDS::DynamicData_var dynamic_data = copy->get_dynamic_data(dynamic_type_);
3089  if (dynamic_data && security_config_ &&
3091  !security_config_->get_access_control()->check_local_datawriter_register_instance(participant_permissions_handle_, this, dynamic_data, ex)) {
3092  if (log_level >= LogLevel::Notice) {
3093  ACE_ERROR((LM_NOTICE,
3094  "(%P|%t) NOTICE: DataWriterImpl::get_or_create_instance_handle: unable to register instance SecurityException[%d.%d]: %C\n",
3095  ex.code, ex.minor_code, ex.message.in()));
3096  }
3098  }
3099 #endif
3100 
3101  // don't use fast allocator for registration.
3102  const TypeSupportImpl* const ts = get_type_support();
3103  Message_Block_Ptr serialized(serialize_sample(*copy));
3104  if (!serialized) {
3105  if (log_level >= LogLevel::Notice) {
3106  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3107  "failed to serialize sample\n", ts->name()));
3108  }
3109  return DDS::RETCODE_ERROR;
3110  }
3111 
3112  // tell DataWriterLocal and Publisher about the instance.
3113  const DDS::ReturnCode_t ret = register_instance_i(handle, move(serialized), source_timestamp);
3114  // note: the WriteDataContainer/PublicationInstance maintains ownership
3115  // of the marshalled sample.
3116  if (ret != DDS::RETCODE_OK) {
3117  handle = DDS::HANDLE_NIL;
3118  return ret;
3119  }
3120 
3121  if (!insert_instance(handle, copy)) {
3122  handle = DDS::HANDLE_NIL;
3123  if (log_level >= LogLevel::Notice) {
3124  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::get_or_create_instance_handle: "
3125  "insert instance failed\n", ts->name()));
3126  }
3127  return DDS::RETCODE_ERROR;
3128  }
3129 
3131  }
3132 
3133  return DDS::RETCODE_OK;
3134 }
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
DDS::Security::PermissionsHandle participant_permissions_handle_
DDS::InstanceHandle_t lookup_instance(const Sample &sample)
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Message_Block * serialize_sample(const Sample &sample)
bool insert_instance(DDS::InstanceHandle_t handle, Sample_rch &sample)
Security::SecurityConfig_rch security_config_
PublicationInstance_rch get_handle_instance(DDS::InstanceHandle_t handle)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
OpenDDS_Dcps_Export LogLevel log_level
RcHandle< Sample > Sample_rch
Definition: Sample.h:33
DDS::ReturnCode_t copy(DDS::DynamicData_ptr dest, DDS::DynamicData_ptr src)
const ReturnCode_t RETCODE_ERROR
DDS::DynamicType_var dynamic_type_
TypeSupportImpl * get_type_support() const
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_NOT_ALLOWED_BY_SECURITY
ACE_Recursive_Thread_Mutex & get_lock() const
DDS::ReturnCode_t register_instance_i(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ get_priority_value()

CORBA::Long OpenDDS::DCPS::DataWriterImpl::get_priority_value ( const AssociationData ) const
inlineprivatevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 721 of file DataWriterImpl.h.

References DDS::DataWriterQos::transport_priority, and DDS::TransportPriorityQosPolicy::value.

722  {
723  return this->qos_.transport_priority.value;
724  }
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
TransportPriorityQosPolicy transport_priority

◆ get_publication_matched_status()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_publication_matched_status ( DDS::PublicationMatchedStatus status)
virtual

Definition at line 1175 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, DDS::PublicationMatchedStatus::current_count_change, lock_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), and DDS::PublicationMatchedStatus::total_count_change.

1177 {
1179  guard,
1180  this->lock_,
1183  status = publication_match_status_;
1186  return DDS::RETCODE_OK;
1187 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
const StatusKind PUBLICATION_MATCHED_STATUS
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
DDS::PublicationMatchedStatus publication_match_status_
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex lock_

◆ get_publisher()

DDS::Publisher_ptr OpenDDS::DCPS::DataWriterImpl::get_publisher ( )
virtual

Implements DDS::DataWriter.

Definition at line 1115 of file DataWriterImpl.cpp.

References publisher_servant_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), OpenDDS::DCPS::StaticDiscovery::pre_writer(), and OpenDDS::DCPS::DWMonitorImpl::report().

1116 {
1117  return publisher_servant_.lock()._retn();
1118 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::get_qos ( DDS::DataWriterQos qos)
virtual

Definition at line 978 of file DataWriterImpl.cpp.

References passed_qos_, and DDS::RETCODE_OK.

Referenced by OpenDDS::DCPS::StaticDiscovery::pre_writer().

979 {
980  qos = passed_qos_;
981  return DDS::RETCODE_OK;
982 }
DDS::DataWriterQos passed_qos_
const ReturnCode_t RETCODE_OK

◆ get_readers()

void OpenDDS::DCPS::DataWriterImpl::get_readers ( RepoIdSet readers)

Definition at line 2721 of file DataWriterImpl.cpp.

References ACE_GUARD, lock_, and readers_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

2722 {
2724  readers = this->readers_;
2725 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_Recursive_Thread_Mutex lock_

◆ get_resend_data()

SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::get_resend_data ( )
inline

Definition at line 306 of file DataWriterImpl.h.

Referenced by association_complete_i(), and replay_durable_data_for().

307  {
308  return data_container_->get_resend_data();
309  }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ get_topic()

DDS::Topic_ptr OpenDDS::DCPS::DataWriterImpl::get_topic ( )
virtual

Implements DDS::DataWriter.

Definition at line 1010 of file DataWriterImpl.cpp.

References topic_servant_.

Referenced by OpenDDS::DCPS::DWMonitorImpl::report().

1011 {
1012  return DDS::Topic::_duplicate(topic_servant_.get());
1013 }
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.

◆ get_type_name()

char const * OpenDDS::DCPS::DataWriterImpl::get_type_name ( ) const

Get associated topic type name.

Definition at line 2064 of file DataWriterImpl.cpp.

References TAO::String_var< charT >::in(), and type_name_.

Referenced by enable().

2065 {
2066  return type_name_.in();
2067 }
const character_type * in(void) const
CORBA::String_var type_name_
The type name of associated topic.

◆ get_type_support()

TypeSupportImpl* OpenDDS::DCPS::DataWriterImpl::get_type_support ( ) const
inlineprotected

Definition at line 665 of file DataWriterImpl.h.

References write_w_timestamp().

Referenced by get_or_create_instance_handle(), and write_w_timestamp().

666  {
667  return type_support_;
668  }
TypeSupportImpl * type_support_

◆ get_unsent_data()

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::get_unsent_data ( SendStateDataSampleList list)
inline

Retrieve the unsent data from the WriteDataContainer.

Definition at line 301 of file DataWriterImpl.h.

Referenced by send_all_to_flush_control(), and write().

302  {
303  return data_container_->get_unsent_data(list);
304  }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ get_value_dispatcher()

const ValueDispatcher* OpenDDS::DCPS::DataWriterImpl::get_value_dispatcher ( ) const
inline

Definition at line 500 of file DataWriterImpl.h.

References dispose_w_timestamp(), get_key_value(), lookup_instance(), register_instance_w_timestamp(), timestamp(), and unregister_instance_w_timestamp().

Referenced by write().

501  {
502  return dynamic_cast<const ValueDispatcher*>(type_support_);
503  }
TypeSupportImpl * type_support_

◆ handle_timeout()

int OpenDDS::DCPS::DataWriterImpl::handle_timeout ( const ACE_Time_Value tv,
const void *  arg 
)
virtual

Handle the assert liveliness timeout.

Definition at line 2431 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_TEXT(), DDS::AUTOMATIC_LIVELINESS_QOS, ACE_Reactor_Timer_Interface::cancel_timer(), CORBA::is_nil(), DDS::LivelinessQosPolicy::kind, last_liveliness_activity_time_, DDS::LivelinessQosPolicy::lease_duration, listener_for(), DDS::DataWriterQos::liveliness, liveliness_asserted_, liveliness_check_interval_, liveliness_lost_, DDS::LIVELINESS_LOST_STATUS, liveliness_lost_status_, liveness_timer_, LM_ERROR, lock_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, qos_, reactor_, ACE_Reactor_Timer_Interface::schedule_timer(), send_liveliness(), TheServiceParticipant, DDS::LivelinessLostStatus::total_count, DDS::LivelinessLostStatus::total_count_change, value, and OpenDDS::DCPS::TimeDuration::value().

2433 {
2434  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
2435 
2436  const MonotonicTimePoint now(tv);
2437  bool liveliness_lost = false;
2438 
2440 
2441  TimeDuration elapsed = now - last_liveliness_activity_time_;
2442 
2443  // Do we need to send a liveliness message?
2444  if (elapsed >= liveliness_check_interval_) {
2445  switch (this->qos_.liveliness.kind) {
2447  if (!send_liveliness(now)) {
2448  liveliness_lost = true;
2449  }
2450  break;
2451 
2453  if (liveliness_asserted_) {
2454  if (!send_liveliness(now)) {
2455  liveliness_lost = true;
2456  }
2457  }
2458  break;
2459 
2461  // Do nothing.
2462  break;
2463  }
2464  }
2465  else {
2466  // Reschedule.
2467  if (reactor_->cancel_timer(liveness_timer_.in()) == -1) {
2468  ACE_ERROR((LM_ERROR,
2469  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2470  ACE_TEXT("cancel_timer")));
2471  }
2473  (liveliness_check_interval_ - elapsed).value(),
2475  {
2476  ACE_ERROR((LM_ERROR,
2477  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::handle_timeout: %p.\n"),
2478  ACE_TEXT("schedule_timer")));
2479  }
2480  return 0;
2481  }
2482 
2483  liveliness_asserted_ = false;
2484  elapsed = now - last_liveliness_activity_time_;
2485 
2486  // Have we lost liveliness?
2487  if (elapsed >= TimeDuration(qos_.liveliness.lease_duration)) {
2488  liveliness_lost = true;
2489  }
2490 
2491  if (!this->liveliness_lost_ && liveliness_lost) {
2494 
2495  DDS::DataWriterListener_var listener =
2497 
2498  if (!CORBA::is_nil(listener.in())) {
2499  {
2502  listener->on_liveliness_lost(this, this->liveliness_lost_status_);
2503  }
2505  }
2506  }
2507 
2508  this->liveliness_lost_ = liveliness_lost;
2509  return 0;
2510 }
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
#define ACE_ERROR(X)
const StatusKind LIVELINESS_LOST_STATUS
const LogLevel::Value value
Definition: debug.cpp:61
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
bool send_liveliness(const MonotonicTimePoint &now)
Send the liveliness message.
DDS::LivelinessLostStatus liveliness_lost_status_
Status conditions.
const ACE_Time_Value & value() const
ACE_Reactor_Timer_Interface * reactor_
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
RcHandle< LivenessTimer > liveness_timer_
TimePoint_T< MonotonicClock > MonotonicTimePoint
Definition: TimeTypes.h:51
virtual int cancel_timer(long timer_id, const void **arg=0, int dont_call_handle_close=1)=0
ACE_TEXT("TCP_Factory")
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
LivelinessQosPolicyKind kind
virtual long schedule_timer(ACE_Event_Handler *event_handler, const void *arg, const ACE_Time_Value &delay, const ACE_Time_Value &interval=ACE_Time_Value::zero)=0
ACE_Recursive_Thread_Mutex lock_
LivelinessQosPolicy liveliness
#define TheServiceParticipant
Boolean is_nil(T x)

◆ init()

void OpenDDS::DCPS::DataWriterImpl::init ( TopicImpl topic_servant,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
const DDS::StatusMask mask,
WeakRcHandle< DomainParticipantImpl participant_servant,
PublisherImpl publisher_servant 
)

Initialize the data members.

Definition at line 137 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), TAO::String_var< charT >::in(), is_bit_, OpenDDS::DCPS::WeakRcHandle< T >::lock(), participant_servant_, passed_qos_, publisher_servant_, qos_, reactor_, set_listener(), TheServiceParticipant, topic_id_, topic_name_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), type_name_, and type_support_.

Referenced by OpenDDS::DCPS::PublisherImpl::create_datawriter().

144 {
145  DBG_ENTRY_LVL("DataWriterImpl", "init", 6);
146  topic_servant_ = topic_servant;
147  type_support_ = dynamic_cast<TypeSupportImpl*>(topic_servant->get_type_support());
148  topic_name_ = topic_servant_->get_name();
149  topic_id_ = topic_servant_->get_id();
150  type_name_ = topic_servant_->get_type_name();
151 
152 #if !defined (DDS_HAS_MINIMUM_BIT)
154 #endif // !defined (DDS_HAS_MINIMUM_BIT)
155 
156  qos_ = qos;
157  passed_qos_ = qos;
158 
159  set_listener(a_listener, mask);
160 
161  // Only store the participant pointer, since it is our "grand"
162  // parent, we will exist as long as it does.
163  participant_servant_ = participant_servant;
164 
165  RcHandle<DomainParticipantImpl> participant = participant_servant.lock();
166  domain_id_ = participant->get_domain_id();
167 
168  // Only store the publisher pointer, since it is our parent, we will
169  // exist as long as it does.
170  publisher_servant_ = *publisher_servant;
171 
172  this->reactor_ = TheServiceParticipant->timer();
173 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
virtual DDS::ReturnCode_t set_listener(DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
CORBA::String_var topic_name_
The name of associated topic.
bool topicIsBIT(const char *name, const char *type)
ACE_Reactor_Timer_Interface * reactor_
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
DDS::DataWriterQos passed_qos_
WeakRcHandle< DomainParticipantImpl > participant_servant_
GUID_t topic_id_
The associated topic repository id.
TypeSupportImpl * type_support_
DDS::DomainId_t domain_id_
The domain id.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const character_type * in(void) const
#define TheServiceParticipant
CORBA::String_var type_name_
The type name of associated topic.

◆ insert_instance()

bool OpenDDS::DCPS::DataWriterImpl::insert_instance ( DDS::InstanceHandle_t  handle,
Sample_rch sample 
)
private

Definition at line 3052 of file DataWriterImpl.cpp.

References instance_handles_to_values_, instance_values_to_handles_, and OPENDDS_ASSERT.

Referenced by get_or_create_instance_handle().

3053 {
3054  OPENDDS_ASSERT(sample->key_only());
3055  if (!instance_handles_to_values_.insert(
3056  InstanceHandlesToValues::value_type(handle, sample)).second) {
3057  return false;
3058  }
3059  if (!instance_values_to_handles_.insert(
3060  InstanceValuesToHandles::value_type(sample, handle)).second) {
3061  instance_handles_to_values_.erase(handle);
3062  return false;
3063  }
3064  return true;
3065 }
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
InstanceHandlesToValues instance_handles_to_values_
InstanceValuesToHandles instance_values_to_handles_

◆ instance_must_exist()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::instance_must_exist ( const char *  method_name,
const Sample sample,
DDS::InstanceHandle_t instance_handle,
bool  remove = false 
)
protected

Definition at line 3136 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, find_instance(), get_lock(), DDS::HANDLE_NIL, instance_handles_to_values_, instance_values_to_handles_, OpenDDS::DCPS::Sample::key_only(), LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, OPENDDS_ASSERT, DDS::RETCODE_ERROR, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

Referenced by dispose_w_timestamp(), and unregister_instance_w_timestamp().

3141 {
3142  OPENDDS_ASSERT(sample.key_only());
3143 
3145 
3146  const InstanceValuesToHandles::iterator pos = find_instance(sample);
3147  if (pos == instance_values_to_handles_.end()) {
3148  if (log_level >= LogLevel::Notice) {
3149  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::%C: "
3150  "The instance sample is not registered\n",
3151  method_name));
3152  }
3153  return DDS::RETCODE_ERROR;
3154  }
3155 
3156  if (instance_handle != DDS::HANDLE_NIL && instance_handle != pos->second) {
3158  }
3159 
3160  instance_handle = pos->second;
3161 
3162  if (remove) {
3163  instance_values_to_handles_.erase(pos);
3164  instance_handles_to_values_.erase(instance_handle);
3165  }
3166 
3167  return DDS::RETCODE_OK;
3168 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
const InstanceHandle_t HANDLE_NIL
#define OPENDDS_ASSERT(C)
Definition: Definitions.h:72
InstanceValuesToHandles::iterator find_instance(const Sample &sample)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
InstanceHandlesToValues instance_handles_to_values_
OpenDDS_Dcps_Export LogLevel log_level
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
InstanceValuesToHandles instance_values_to_handles_

◆ listener_for()

DDS::DataWriterListener_ptr OpenDDS::DCPS::DataWriterImpl::listener_for ( DDS::StatusKind  kind)

This is used to retrieve the listener for a certain status change.

If this datawriter has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for the listener is propagated up to the factory/publisher.

Definition at line 2411 of file DataWriterImpl.cpp.

References CORBA::is_nil(), listener_, listener_mask_, listener_mutex_, publisher_servant_, and ACE_Guard< ACE_LOCK >::release().

Referenced by association_complete_i(), handle_timeout(), OpenDDS::DCPS::WriteDataContainer::process_deadlines(), remove_associations(), and update_incompatible_qos().

2412 {
2413  // per 2.1.4.3.1 Listener Access to Plain Communication Status
2414  // use this entities factory if listener is mask not enabled
2415  // for this kind.
2416  RcHandle<PublisherImpl> publisher = publisher_servant_.lock();
2417  if (!publisher)
2418  return 0;
2419 
2421  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
2422  g.release();
2423  return publisher->listener_for(kind);
2424 
2425  } else {
2426  return DDS::DataWriterListener::_duplicate(listener_.in());
2427  }
2428 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
Boolean is_nil(T x)

◆ liveliness_check_interval()

TimeDuration OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind)

Definition at line 1227 of file DataWriterImpl.cpp.

References DDS::LivelinessQosPolicy::kind, DDS::DataWriterQos::liveliness, liveliness_check_interval_, OpenDDS::DCPS::TimeDuration::max_value, and qos_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::LivelinessTimer::add_adjust().

1228 {
1229  if (this->qos_.liveliness.kind == kind) {
1231  } else {
1232  return TimeDuration::max_value;
1233  }
1234 }
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
TimeDuration liveliness_check_interval_
The time interval for sending liveliness message.
LivelinessQosPolicyKind kind
LivelinessQosPolicy liveliness
static const TimeDuration max_value
Definition: TimeDuration.h:32

◆ lookup_instance()

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::lookup_instance ( const Sample sample)

Definition at line 2909 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, find_instance(), get_lock(), DDS::HANDLE_NIL, instance_values_to_handles_, and DDS::RETCODE_ERROR.

Referenced by get_or_create_instance_handle(), OpenDDS::XTypes::DynamicDataWriterImpl::lookup_instance(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::lookup_instance().

2910 {
2912  const InstanceValuesToHandles::iterator it = find_instance(sample);
2913  return it == instance_values_to_handles_.end() ? DDS::HANDLE_NIL : it->second;
2914 }
const InstanceHandle_t HANDLE_NIL
InstanceValuesToHandles::iterator find_instance(const Sample &sample)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
ACE_Recursive_Thread_Mutex & get_lock() const
InstanceValuesToHandles instance_values_to_handles_

◆ lookup_instance_handles()

void OpenDDS::DCPS::DataWriterImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
)
private

Lookup the instance handles by the subscription repo ids.

Definition at line 2667 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::conv_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OPENDDS_STRING, and participant_servant_.

Referenced by notify_publication_disconnected(), notify_publication_lost(), notify_publication_reconnected(), and remove_associations().

2669 {
2670  CORBA::ULong const num_rds = ids.length();
2671  RcHandle<DomainParticipantImpl> participant = this->participant_servant_.lock();
2672 
2673  if (!participant)
2674  return;
2675 
2676  if (DCPS_debug_level > 9) {
2677  OPENDDS_STRING separator;
2678  OPENDDS_STRING buffer;
2679 
2680  for (CORBA::ULong i = 0; i < num_rds; ++i) {
2681  buffer += separator + LogGuid(ids[i]).conv_;
2682  separator = ", ";
2683  }
2684 
2685  ACE_DEBUG((LM_DEBUG,
2686  ACE_TEXT("(%P|%t) DataWriterImpl::lookup_instance_handles: ")
2687  ACE_TEXT("searching for handles for reader Ids: %C.\n"),
2688  buffer.c_str()));
2689  }
2690 
2691  hdls.length(num_rds);
2692 
2693  for (CORBA::ULong i = 0; i < num_rds; ++i) {
2694  hdls[i] = participant->lookup_handle(ids[i]);
2695  }
2696 }
#define ACE_DEBUG(X)
#define OPENDDS_STRING
ACE_CDR::ULong ULong
WeakRcHandle< DomainParticipantImpl > participant_servant_
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ need_sequence_repair()

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair ( )
private

Definition at line 2747 of file DataWriterImpl.cpp.

References ACE_GUARD_RETURN, need_sequence_repair_i(), and reader_info_lock_.

Referenced by create_control_message(), and create_sample_data_message().

2748 {
2749  ACE_GUARD_RETURN(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_, false);
2750  return need_sequence_repair_i();
2751 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_Thread_Mutex reader_info_lock_

◆ need_sequence_repair_i()

bool OpenDDS::DCPS::DataWriterImpl::need_sequence_repair_i ( ) const
private

Definition at line 2754 of file DataWriterImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by need_sequence_repair().

2755 {
2756  for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
2757  end = reader_info_.end(); it != end; ++it) {
2758  if (it->second.expected_sequence_ != sequence_number_) {
2759  return true;
2760  }
2761  }
2762 
2763  return false;
2764 }
RepoIdToReaderInfoMap reader_info_
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.

◆ notify_publication_disconnected()

void OpenDDS::DCPS::DataWriterImpl::notify_publication_disconnected ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2573 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, get_ext_listener(), is_bit_, CORBA::is_nil(), lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

2574 {
2575  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_disconnected",6);
2576 
2577  if (!is_bit_) {
2578  // Narrow to DDS::DCPS::DataWriterListener. If a DDS::DataWriterListener
2579  // is given to this DataWriter then narrow() fails.
2580  DataWriterListener_var the_listener = get_ext_listener();
2581 
2582  if (!CORBA::is_nil(the_listener.in())) {
2584  // Since this callback may come after remove_association which
2585  // removes the reader from id_to_handle map, we can ignore this
2586  // error.
2587  this->lookup_instance_handles(subids,
2588  status.subscription_handles);
2589  the_listener->on_publication_disconnected(this, status);
2590  }
2591  }
2592 }
DataWriterListener_ptr get_ext_listener()
PublicationLostStatus PublicationDisconnectedStatus
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Boolean is_nil(T x)

◆ notify_publication_lost() [1/2]

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2617 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, get_ext_listener(), is_bit_, CORBA::is_nil(), lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

Referenced by remove_associations().

2618 {
2619  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2620 
2621  if (!is_bit_) {
2622  // Narrow to DDS::DCPS::DataWriterListener. If a
2623  // DDS::DataWriterListener is given to this DataWriter then
2624  // narrow() fails.
2625  DataWriterListener_var the_listener = get_ext_listener();
2626 
2627  if (!CORBA::is_nil(the_listener.in())) {
2628  PublicationLostStatus status;
2629 
2630  // Since this callback may come after remove_association which removes
2631  // the reader from id_to_handle map, we can ignore this error.
2632  this->lookup_instance_handles(subids,
2633  status.subscription_handles);
2634  the_listener->on_publication_lost(this, status);
2635  }
2636  }
2637 }
DataWriterListener_ptr get_ext_listener()
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Boolean is_nil(T x)

◆ notify_publication_lost() [2/2]

void OpenDDS::DCPS::DataWriterImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles)
private

Definition at line 2640 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, get_ext_listener(), is_bit_, CORBA::is_nil(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

2641 {
2642  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_lost",6);
2643 
2644  if (!is_bit_) {
2645  // Narrow to DDS::DCPS::DataWriterListener. If a
2646  // DDS::DataWriterListener is given to this DataWriter then
2647  // narrow() fails.
2648  DataWriterListener_var the_listener = get_ext_listener();
2649 
2650  if (!CORBA::is_nil(the_listener.in())) {
2651  PublicationLostStatus status;
2652 
2653  CORBA::ULong len = handles.length();
2654  status.subscription_handles.length(len);
2655 
2656  for (CORBA::ULong i = 0; i < len; ++ i) {
2657  status.subscription_handles[i] = handles[i];
2658  }
2659 
2660  the_listener->on_publication_lost(this, status);
2661  }
2662  }
2663 }
DataWriterListener_ptr get_ext_listener()
ACE_CDR::ULong ULong
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Boolean is_nil(T x)

◆ notify_publication_reconnected()

void OpenDDS::DCPS::DataWriterImpl::notify_publication_reconnected ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 2595 of file DataWriterImpl.cpp.

References DBG_ENTRY_LVL, get_ext_listener(), is_bit_, CORBA::is_nil(), lookup_instance_handles(), and OpenDDS::DCPS::PublicationLostStatus::subscription_handles.

2596 {
2597  DBG_ENTRY_LVL("DataWriterImpl","notify_publication_reconnected",6);
2598 
2599  if (!is_bit_) {
2600  // Narrow to DDS::DCPS::DataWriterListener. If a
2601  // DDS::DataWriterListener is given to this DataWriter then
2602  // narrow() fails.
2603  DataWriterListener_var the_listener = get_ext_listener();
2604 
2605  if (!CORBA::is_nil(the_listener.in())) {
2607 
2608  // If it's reconnected then the reader should be in id_to_handle
2609  this->lookup_instance_handles(subids, status.subscription_handles);
2610 
2611  the_listener->on_publication_reconnected(this, status);
2612  }
2613  }
2614 }
DataWriterListener_ptr get_ext_listener()
PublicationLostStatus PublicationDisconnectedStatus
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
Boolean is_nil(T x)

◆ num_samples()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::num_samples ( DDS::InstanceHandle_t  handle,
size_t &  size 
)

Return the number of samples for a given instance.

Definition at line 2045 of file DataWriterImpl.cpp.

References data_container_.

2047 {
2048  return data_container_->num_samples(handle, size);
2049 }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ OPENDDS_MAP()

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP ( DDS::InstanceHandle_t  ,
Sample_rch   
)
private

◆ OPENDDS_MAP_CMP() [1/4]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( GUID_t  ,
SequenceNumber  ,
GUID_tKeyLessThan   
)

◆ OPENDDS_MAP_CMP() [2/4]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( GUID_t  ,
ReaderInfo  ,
GUID_tKeyLessThan   
)
protected

◆ OPENDDS_MAP_CMP() [3/4]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [4/4]

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_MAP_CMP ( Sample_rch  ,
DDS::InstanceHandle_t  ,
SampleRchCmp   
)
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::DataWriterImpl::OPENDDS_VECTOR ( DDS::InstanceHandle_t  )

◆ parent()

RcHandle< EntityImpl > OpenDDS::DCPS::DataWriterImpl::parent ( void  ) const
virtual

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 2264 of file DataWriterImpl.cpp.

References publisher_servant_.

2265 {
2266  return this->publisher_servant_.lock();
2267 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.

◆ participant_liveliness_activity_after()

bool OpenDDS::DCPS::DataWriterImpl::participant_liveliness_activity_after ( const MonotonicTimePoint tv)

Definition at line 1237 of file DataWriterImpl.cpp.

References DDS::LivelinessQosPolicy::kind, last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, lock_, DDS::MANUAL_BY_PARTICIPANT_LIVELINESS_QOS, and qos_.

1238 {
1241  return last_liveliness_activity_time_ > tv;
1242  } else {
1243  return false;
1244  }
1245 }
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
LivelinessQosPolicyKind kind
ACE_Recursive_Thread_Mutex lock_
LivelinessQosPolicy liveliness

◆ persist_data()

bool OpenDDS::DCPS::DataWriterImpl::persist_data ( )

Make sent data available beyond the lifetime of this DataWriter.

Definition at line 2700 of file DataWriterImpl.cpp.

References data_container_.

Referenced by prepare_to_delete().

2701 {
2702  return this->data_container_->persist_data();
2703 }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ prepare_to_delete()

void OpenDDS::DCPS::DataWriterImpl::prepare_to_delete ( )
protected

Definition at line 2536 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::Observer::e_DELETED, OpenDDS::DCPS::EntityImpl::get_observer(), LM_ERROR, OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OpenDDS::DCPS::Observer::on_deleted(), persist_data(), OpenDDS::DCPS::EntityImpl::set_deleted(), OpenDDS::DCPS::TransportClient::stop_associating(), OpenDDS::DCPS::TransportClient::terminate_send_if_suspended(), and unregister_instances().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), and OpenDDS::DCPS::PublisherImpl::prepare_to_delete_datawriters().

2537 {
2538  const Observer_rch observer = get_observer(Observer::e_DELETED);
2539  if (observer) {
2540  observer->on_deleted(this);
2541  }
2542 
2543  this->set_deleted(true);
2544  this->stop_associating();
2546 
2547 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
2548  // Trigger data to be persisted, i.e. made durable, if so
2549  // configured. This needs be called before unregister_instances
2550  // because unregister_instances may cause instance dispose.
2551  if (!persist_data() && DCPS_debug_level >= 2) {
2552  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::prepare_to_delete: ")
2553  ACE_TEXT("failed to make data durable.\n")));
2554  }
2555 #endif
2556 
2557  // Unregister all registered instances prior to deletion.
2558  unregister_instances(SystemTimePoint::now().to_dds_time());
2559 }
#define ACE_ERROR(X)
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
void unregister_instances(const DDS::Time_t &source_timestamp)
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30

◆ register_for_reader()

void OpenDDS::DCPS::DataWriterImpl::register_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
virtual

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 822 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_reader().

827 {
828  TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
829 }
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)

◆ register_instance_from_durable_data()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_from_durable_data ( DDS::InstanceHandle_t handle,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register and tell the transport to broadcast the registered instance.

Definition at line 1658 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), DBG_ENTRY_LVL, get_lock(), LM_ERROR, OpenDDS::DCPS::move(), register_instance_i(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::retcode_to_string(), and send_all_to_flush_control().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data().

1662 {
1663  DBG_ENTRY_LVL("DataWriterImpl","register_instance_from_durable_data",6);
1664 
1666  guard,
1667  get_lock(),
1669 
1670  const DDS::ReturnCode_t ret = register_instance_i(handle, move(data), source_timestamp);
1671  if (ret != DDS::RETCODE_OK) {
1672  ACE_ERROR_RETURN((LM_ERROR,
1673  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_from_durable_data: ")
1674  ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1675  retcode_to_string(ret)),
1676  ret);
1677  }
1678 
1680 
1681  return ret;
1682 }
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
#define ACE_ERROR_RETURN(X, Y)
DDS::ReturnCode_t register_instance_i(DDS::InstanceHandle_t &handle, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ register_instance_i()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::register_instance_i ( DDS::InstanceHandle_t handle,
Message_Block_Ptr  data,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to register Must tell the transport to broadcast the registered instance upon returning.

Definition at line 1594 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::INSTANCE_REGISTRATION, LM_ERROR, monitor_, OpenDDS::DCPS::move(), DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, OpenDDS::DCPS::retcode_to_string(), and OpenDDS::DCPS::DataSampleElement::set_sample().

Referenced by get_or_create_instance_handle(), and register_instance_from_durable_data().

1597 {
1598  DBG_ENTRY_LVL("DataWriterImpl","register_instance_i",6);
1599 
1600  if (!enabled_) {
1601  ACE_ERROR_RETURN((LM_ERROR,
1602  ACE_TEXT("(%P|%t) ERROR: ")
1603  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1604  ACE_TEXT("Entity is not enabled.\n")),
1606  }
1607 
1608  DDS::ReturnCode_t ret = data_container_->register_instance(handle, data);
1609  if (ret != DDS::RETCODE_OK) {
1610  ACE_ERROR_RETURN((LM_ERROR,
1611  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::register_instance_i: ")
1612  ACE_TEXT("register instance with container failed, returned <%C>.\n"),
1613  retcode_to_string(ret)),
1614  ret);
1615  }
1616 
1617  if (this->monitor_) {
1618  this->monitor_->report();
1619  }
1620 
1621  DataSampleElement* element = 0;
1622  ret = this->data_container_->obtain_buffer_for_control(element);
1623  if (ret != DDS::RETCODE_OK) {
1624  ACE_ERROR_RETURN((LM_ERROR,
1625  ACE_TEXT("(%P|%t) ERROR: ")
1626  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1627  ACE_TEXT("obtain_buffer_for_control failed, returned <%C>.\n"),
1628  retcode_to_string(ret)),
1629  ret);
1630  }
1631 
1632  // Add header with the registration sample data.
1633  Message_Block_Ptr sample(
1636  element->get_header(),
1637  move(data),
1638  source_timestamp));
1639 
1640  element->set_sample(move(sample));
1641 
1642  ret = this->data_container_->enqueue_control(element);
1643 
1644  if (ret != DDS::RETCODE_OK) {
1645  data_container_->release_buffer(element);
1646  ACE_ERROR_RETURN((LM_ERROR,
1647  ACE_TEXT("(%P|%t) ERROR: ")
1648  ACE_TEXT("DataWriterImpl::register_instance_i: ")
1649  ACE_TEXT("enqueue_control failed, returned <%C>\n"),
1650  retcode_to_string(ret)),
1651  ret);
1652  }
1653 
1654  return ret;
1655 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
unique_ptr< Monitor > monitor_
Monitor object for this entity.
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
const ReturnCode_t RETCODE_OK
#define ACE_ERROR_RETURN(X, Y)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ register_instance_w_timestamp()

DDS::InstanceHandle_t OpenDDS::DCPS::DataWriterImpl::register_instance_w_timestamp ( const Sample sample,
const DDS::Time_t timestamp 
)

Definition at line 2916 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_TEXT(), get_or_create_instance_handle(), DDS::HANDLE_NIL, LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::LogLevel::Notice, DDS::RETCODE_OK, and OpenDDS::DCPS::retcode_to_string().

Referenced by OpenDDS::XTypes::DynamicDataWriterImpl::register_instance_w_timestamp(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::register_instance_w_timestamp().

2918 {
2919  DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
2920  const DDS::ReturnCode_t ret = get_or_create_instance_handle(registered_handle, sample, timestamp);
2921  if (ret != DDS::RETCODE_OK && log_level >= LogLevel::Notice) {
2922  ACE_ERROR((LM_NOTICE, ACE_TEXT("(%P|%t) NOTICE: DataWriterImpl::register_instance_w_timestamp: ")
2923  ACE_TEXT("register failed: %C\n"),
2924  retcode_to_string(ret)));
2925  }
2926  return registered_handle;
2927 }
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export LogLevel log_level
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
DDS::ReturnCode_t get_or_create_instance_handle(DDS::InstanceHandle_t &handle, const Sample &sample, const DDS::Time_t &source_timestamp)
const ReturnCode_t RETCODE_OK

◆ remove_all_associations()

void OpenDDS::DCPS::DataWriterImpl::remove_all_associations ( )

Definition at line 783 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DBG_ENTRY_LVL, LM_WARNING, lock_, readers_, remove_associations(), OpenDDS::DCPS::TransportClient::stop_associating(), and OpenDDS::DCPS::TransportClient::transport_stop().

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

784 {
785  DBG_ENTRY_LVL("DataWriterImpl", "remove_all_associations", 6);
786  // stop pending associations
787  this->stop_associating();
788 
789  ReaderIdSeq readers;
790  CORBA::ULong size;
791  {
793 
794  size = static_cast<CORBA::ULong>(readers_.size());
795  readers.length(size);
796 
797  RepoIdSet::iterator itEnd = readers_.end();
798  int i = 0;
799 
800  for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
801  readers[i ++] = *it;
802  }
803  }
804 
805  try {
806  if (0 < size) {
807  CORBA::Boolean dont_notify_lost = false;
808 
809  this->remove_associations(readers, dont_notify_lost);
810  }
811 
812  } catch (const CORBA::Exception&) {
813  ACE_DEBUG((LM_WARNING,
814  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::remove_all_associations() - ")
815  ACE_TEXT("caught exception from remove_associations.\n")));
816  }
817 
818  transport_stop();
819 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
virtual void remove_associations(const ReaderIdSeq &readers, bool callback)
sequence< GUID_t > ReaderIdSeq
ACE_CDR::ULong ULong
ACE_CDR::Boolean Boolean
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Recursive_Thread_Mutex lock_

◆ remove_associations()

void OpenDDS::DCPS::DataWriterImpl::remove_associations ( const ReaderIdSeq readers,
bool  callback 
)
virtual

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 553 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, data_container_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), OpenDDS::DCPS::Observer::e_DISASSOCIATED, OpenDDS::DCPS::EntityImpl::get_observer(), id_to_handle_map_, is_bit_, CORBA::is_nil(), DDS::PublicationMatchedStatus::last_subscription_handle, listener_for(), LM_DEBUG, lock_, lookup_instance_handles(), notify_publication_lost(), OpenDDS::DCPS::EntityImpl::notify_status_condition(), OpenDDS::DCPS::Observer::on_disassociated(), participant_servant_, publication_id_, publication_match_status_, DDS::PUBLICATION_MATCHED_STATUS, reader_info_, reader_info_lock_, readers_, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::TransportClient::stop_associating(), sync_unreg_rem_assocs_lock_, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by remove_all_associations().

555 {
556  if (readers.length() == 0) {
557  return;
558  }
559 
561  if (observer) {
562  for (CORBA::ULong i = 0; i < readers.length(); ++i) {
563  observer->on_disassociated(this, readers[i]);
564  }
565  }
566 
567  if (DCPS_debug_level >= 1) {
568  ACE_DEBUG((LM_DEBUG,
569  ACE_TEXT("(%P|%t) DataWriterImpl::remove_associations: ")
570  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
571  is_bit_,
572  LogGuid(publication_id_).c_str(),
573  LogGuid(readers[0]).c_str(),
574  readers.length()));
575  }
576 
577  // stop pending associations for these reader ids
578  this->stop_associating(readers.get_buffer(), readers.length());
579 
580  ReaderIdSeq fully_associated_readers;
581  CORBA::ULong fully_associated_len = 0;
582  ReaderIdSeq rds;
583  CORBA::ULong rds_len = 0;
584  DDS::InstanceHandleSeq handles;
585 
587  {
588  // Ensure the same acquisition order as in wait_for_acknowledgments().
590  //Remove the readers from fully associated reader list.
591  //If the supplied reader is not in the cached reader list then it is
592  //already removed. We just need remove the readers in the list that have
593  //not been removed.
594 
595  CORBA::ULong len = readers.length();
596 
597  for (CORBA::ULong i = 0; i < len; ++i) {
598  //Remove the readers from fully associated reader list. If it's not
599  //in there, the association_complete() is not called yet and remove it
600  //from pending list.
601 
602  if (remove(readers_, readers[i]) == 0) {
603  ++ fully_associated_len;
604  fully_associated_readers.length(fully_associated_len);
605  fully_associated_readers [fully_associated_len - 1] = readers[i];
606 
607  ++ rds_len;
608  rds.length(rds_len);
609  rds [rds_len - 1] = readers[i];
610  }
611 
612  data_container_->remove_reader_acks(readers[i]);
613 
614  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
615  reader_info_.erase(readers[i]);
616  //else reader is already removed which indicates remove_association()
617  //is called multiple times.
618  }
619 
620  if (fully_associated_len > 0 && !is_bit_) {
621  // The reader should be in the id_to_handle map at this time
622  this->lookup_instance_handles(fully_associated_readers, handles);
623 
624  for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
625  id_to_handle_map_.erase(fully_associated_readers[i]);
626  }
627  }
628 
629  // Mirror the PUBLICATION_MATCHED_STATUS processing from
630  // association_complete() here.
631  if (!this->is_bit_) {
632 
633  // Derive the change in the number of subscriptions reading this writer.
634  int matchedSubscriptions =
635  static_cast<int>(this->id_to_handle_map_.size());
637  matchedSubscriptions - this->publication_match_status_.current_count;
638 
639  // Only process status if the number of subscriptions has changed.
641  this->publication_match_status_.current_count = matchedSubscriptions;
642 
643  /// Section 7.1.4.1: total_count will not decrement.
644 
645  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
647  handles[fully_associated_len - 1];
648 
650 
651  DDS::DataWriterListener_var listener =
653 
654  if (!CORBA::is_nil(listener.in())) {
655  listener->on_publication_matched(this, this->publication_match_status_);
656 
657  // Listener consumes the change.
660  }
661 
662  this->notify_status_condition();
663  }
664  }
665  }
666 
667  for (CORBA::ULong i = 0; i < rds.length(); ++i) {
668  this->disassociate(rds[i]);
669  }
670 
671  // If this remove_association is invoked when the InfoRepo
672  // detects a lost reader then make a callback to notify
673  // subscription lost.
674  if (notify_lost && handles.length() > 0) {
675  this->notify_publication_lost(handles);
676  }
677 
678  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
679  for (unsigned int i = 0; i < handles.length(); ++i) {
680  participant->return_handle(handles[i]);
681  }
682 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
#define ACE_DEBUG(X)
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
#define ACE_GUARD(MUTEX, OBJ, LOCK)
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind PUBLICATION_MATCHED_STATUS
void disassociate(const GUID_t &peerId)
RepoIdToReaderInfoMap reader_info_
sequence< GUID_t > ReaderIdSeq
ACE_CDR::ULong ULong
WeakRcHandle< DomainParticipantImpl > participant_servant_
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.
ACE_TEXT("TCP_Factory")
DDS::PublicationMatchedStatus publication_match_status_
void notify_publication_lost(const ReaderIdSeq &subids)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex lock_
RepoIdToHandleMap id_to_handle_map_
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.
Boolean is_nil(T x)

◆ replay_durable_data_for()

void OpenDDS::DCPS::DataWriterImpl::replay_durable_data_for ( const GUID_t remote_sub_id)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 684 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), available_data_list_, OpenDDS::DCPS::SendStateDataSampleList::begin(), controlTracker, create_control_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::STUN::encoding(), OpenDDS::DCPS::SendStateDataSampleList::end(), OpenDDS::DCPS::END_HISTORIC_SAMPLES, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), get_lock(), get_resend_data(), header, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, DDS::DataWriterQos::lifespan, LM_ERROR, LM_INFO, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::move(), OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OPENDDS_STRING, publisher_servant_, qos_, reader_info_, reader_info_lock_, OpenDDS::DCPS::SEND_CONTROL_ERROR, OpenDDS::DCPS::TransportClient::send_w_control(), and OpenDDS::DCPS::serialized_size().

685 {
686  DBG_ENTRY_LVL("DataWriterImpl", "replay_durable_data_for", 6);
687 
688  bool reader_durable = false;
689 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
690  OPENDDS_STRING filterClassName;
691  RcHandle<FilterEvaluator> eval;
692  DDS::StringSeq expression_params;
693 #endif
694 
695  {
696  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
697  RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
698 
699  if (it != reader_info_.end()) {
700  reader_durable = it->second.durable_;
701 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
702  filterClassName = it->second.filter_class_name_;
703  eval = it->second.eval_;
704  expression_params = it->second.expression_params_;
705 #endif
706  }
707  }
708 
709  // Support DURABILITY QoS
710  if (reader_durable) {
711  // Tell the WriteDataContainer to resend all sending/sent
712  // samples.
713  this->data_container_->reenqueue_all(remote_id, this->qos_.lifespan
714 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
715  , filterClassName, eval.in(), expression_params
716 #endif
717  );
718 
719  // Acquire the data writer container lock to avoid deadlock. The
720  // thread calling association_complete() has to acquire lock in the
721  // same order as the write()/register() operation.
722 
723  // Since the thread calling association_complete() is the ORB
724  // thread, it may have some performance penalty. If the
725  // performance is an issue, we may need a new thread to handle the
726  // data_available() calls.
728  guard,
729  this->get_lock());
730 
731  SendStateDataSampleList list = this->get_resend_data();
732  {
733  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
734  // Update the reader's expected sequence
735  SequenceNumber& seq =
736  reader_info_.find(remote_id)->second.expected_sequence_;
737 
738  for (SendStateDataSampleList::iterator list_el = list.begin();
739  list_el != list.end(); ++list_el) {
740  list_el->get_header().historic_sample_ = true;
741 
742  if (list_el->get_header().sequence_ > seq) {
743  seq = list_el->get_header().sequence_;
744  }
745  }
746  }
747 
748  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
749  if (!publisher || publisher->is_suspended()) {
751 
752  } else {
753  if (DCPS_debug_level >= 4) {
754  ACE_DEBUG((LM_INFO, ACE_TEXT("(%P|%t) DataWriterImpl::replay_durable_data_for: Sending historic samples\n")));
755  }
756 
757  const Encoding encoding(Encoding::KIND_UNALIGNED_CDR);
758  size_t size = 0;
759  serialized_size(encoding, size, remote_id);
760  Message_Block_Ptr data(
762  get_db_lock()));
763  Serializer ser(data.get(), encoding);
764  ser << remote_id;
765 
766  DataSampleHeader header;
767  Message_Block_Ptr end_historic_samples(create_control_message(END_HISTORIC_SAMPLES, header, move(data),
768  SystemTimePoint::now().to_dds_time()));
769 
771  guard.release();
772  const SendControlStatus ret = send_w_control(list, header, move(end_historic_samples), remote_id);
773  if (ret == SEND_CONTROL_ERROR) {
774  ACE_ERROR((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: ")
775  ACE_TEXT("DataWriterImpl::replay_durable_data_for: ")
776  ACE_TEXT("send_w_control failed.\n")));
778  }
779  }
780  }
781 }
#define ACE_DEBUG(X)
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
SendStateDataSampleListIterator iterator
STL-style bidirectional iterator and const-iterator types.
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
SendStateDataSampleList get_resend_data()
SendControlStatus send_w_control(SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
SendStateDataSampleList available_data_list_
DataBlockLockPool::DataBlockLock * get_db_lock()
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
void serialized_size(const Encoding &encoding, size_t &size, const SequenceNumber &)
RepoIdToReaderInfoMap reader_info_
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
#define OPENDDS_STRING
void enqueue_tail(const DataSampleElement *element)
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
LifespanQosPolicy lifespan
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex & get_lock() const
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
SendControlStatus
Return code type for send_control() operations.

◆ retrieve_inline_qos_data()

void OpenDDS::DCPS::DataWriterImpl::retrieve_inline_qos_data ( TransportSendListener::InlineQosData qos_data) const
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2728 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, TAO::String_var< charT >::in(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_servant_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.

2729 {
2730  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
2731  if (publisher) {
2732  publisher->get_qos(qos_data.pub_qos);
2733  }
2734  qos_data.dw_qos = this->qos_;
2735  qos_data.topic_name = this->topic_name_.in();
2736 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
CORBA::String_var topic_name_
The name of associated topic.
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
const character_type * in(void) const

◆ return_handle()

void OpenDDS::DCPS::DataWriterImpl::return_handle ( DDS::InstanceHandle_t  handle)
private

Definition at line 192 of file DataWriterImpl.cpp.

References participant_servant_.

Referenced by OpenDDS::DCPS::WriteDataContainer::unregister_all().

193 {
194  const RcHandle<DomainParticipantImpl> participant = participant_servant_.lock();
195  if (participant) {
196  participant->return_handle(handle);
197  }
198 }
WeakRcHandle< DomainParticipantImpl > participant_servant_

◆ send_all_to_flush_control()

void OpenDDS::DCPS::DataWriterImpl::send_all_to_flush_control ( ACE_Guard< ACE_Recursive_Thread_Mutex > &  guard)

Definition at line 1577 of file DataWriterImpl.cpp.

References controlTracker, DBG_ENTRY_LVL, get_unsent_data(), OpenDDS::DCPS::MessageTracker::message_sent(), ACE_Guard< ACE_LOCK >::release(), and OpenDDS::DCPS::TransportClient::send().

Referenced by dispose(), dispose_and_unregister(), get_or_create_instance_handle(), register_instance_from_durable_data(), send_request_ack(), and unregister_instance_i().

1578 {
1579  DBG_ENTRY_LVL("DataWriterImpl","send_all_to_flush_control",6);
1580 
1581  SendStateDataSampleList list;
1582 
1583  ACE_UINT64 transaction_id = this->get_unsent_data(list);
1584 
1586 
1587  //need to release guard to call down to transport
1588  guard.release();
1589 
1590  this->send(list, transaction_id);
1591 }
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
int release(void)
unsigned long long ACE_UINT64
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)

◆ send_control()

SendControlStatus OpenDDS::DCPS::DataWriterImpl::send_control ( const DataSampleHeader header,
Message_Block_Ptr  msg 
)
protectedvirtual

Definition at line 2767 of file DataWriterImpl.cpp.

References controlTracker, OpenDDS::DCPS::MessageTracker::message_dropped(), OpenDDS::DCPS::MessageTracker::message_sent(), OpenDDS::DCPS::move(), OpenDDS::DCPS::TransportClient::send_control(), and OpenDDS::DCPS::SEND_CONTROL_OK.

Referenced by end_coherent_changes(), and send_liveliness().

2769 {
2771 
2773 
2774  if (status != SEND_CONTROL_OK) {
2776  }
2777 
2778  return status;
2779 }
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
SendControlStatus
Return code type for send_control() operations.

◆ send_end_historic_samples()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_end_historic_samples ( const GUID_t readerId)
private

◆ send_liveliness()

bool OpenDDS::DCPS::DataWriterImpl::send_liveliness ( const MonotonicTimePoint now)
private

Send the liveliness message.

Definition at line 2513 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), create_control_message(), OpenDDS::DCPS::DATAWRITER_LIVELINESS, domain_id_, header, DDS::LivelinessQosPolicy::kind, last_liveliness_activity_time_, DDS::DataWriterQos::liveliness, LM_ERROR, DDS::MANUAL_BY_TOPIC_LIVELINESS_QOS, OpenDDS::DCPS::move(), OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), qos_, send_control(), OpenDDS::DCPS::SEND_CONTROL_ERROR, and TheServiceParticipant.

Referenced by assert_liveliness(), and handle_timeout().

2514 {
2516  !TheServiceParticipant->get_discovery(domain_id_)->supports_liveliness()) {
2517  DataSampleHeader header;
2518  Message_Block_Ptr empty;
2519  Message_Block_Ptr liveliness_msg(
2521  DATAWRITER_LIVELINESS, header, move(empty),
2522  SystemTimePoint::now().to_dds_time()));
2523 
2524  if (this->send_control(header, move(liveliness_msg)) == SEND_CONTROL_ERROR) {
2525  ACE_ERROR_RETURN((LM_ERROR,
2526  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::send_liveliness: ")
2527  ACE_TEXT("send_control failed.\n")),
2528  false);
2529  }
2530  }
2532  return true;
2533 }
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
DDS::DomainId_t domain_id_
The domain id.
ACE_TEXT("TCP_Factory")
virtual SendControlStatus send_control(const DataSampleHeader &header, Message_Block_Ptr msg)
LivelinessQosPolicyKind kind
#define ACE_ERROR_RETURN(X, Y)
LivelinessQosPolicy liveliness
#define TheServiceParticipant
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ send_request_ack()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::send_request_ack ( )
private

Definition at line 1040 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), create_control_message(), data_container_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), OpenDDS::DCPS::TimePoint_T< SystemClock >::now(), OpenDDS::DCPS::REQUEST_ACK, DDS::RETCODE_ERROR, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), and OpenDDS::DCPS::TimePoint_T< AceClock >::to_dds_time().

Referenced by wait_for_acknowledgments().

1041 {
1043  guard,
1044  get_lock(),
1046 
1047 
1048  DataSampleElement* element = 0;
1049  DDS::ReturnCode_t ret = this->data_container_->obtain_buffer_for_control(element);
1050 
1051  if (ret != DDS::RETCODE_OK) {
1052  ACE_ERROR_RETURN((LM_ERROR,
1053  ACE_TEXT("(%P|%t) ERROR: ")
1054  ACE_TEXT("DataWriterImpl::send_request_ack: ")
1055  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1056  ret),
1057  ret);
1058  }
1059 
1060  Message_Block_Ptr blk;
1061  // Add header with the registration sample data.
1062  Message_Block_Ptr sample(
1064  REQUEST_ACK,
1065  element->get_header(),
1066  move(blk),
1068 
1069  element->set_sample(move(sample));
1070 
1071  ret = this->data_container_->enqueue_control(element);
1072 
1073  if (ret != DDS::RETCODE_OK) {
1074  data_container_->release_buffer(element);
1075  ACE_ERROR_RETURN((LM_ERROR,
1076  ACE_TEXT("(%P|%t) ERROR: ")
1077  ACE_TEXT("DataWriterImpl::send_request_ack: ")
1078  ACE_TEXT("enqueue_control failed.\n")),
1079  ret);
1080  }
1081 
1082 
1084 
1085  return DDS::RETCODE_OK;
1086 }
DDS::Time_t to_dds_time() const
Definition: TimePoint_T.inl:89
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
static TimePoint_T< SystemClock > now()
Definition: TimePoint_T.inl:41
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ send_suspended_data()

void OpenDDS::DCPS::DataWriterImpl::send_suspended_data ( )

Called by the PublisherImpl to indicate that the Publisher is now resumed and any data collected while it was suspended should now be sent.

Definition at line 1964 of file DataWriterImpl.cpp.

References available_data_list_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::SendStateDataSampleList::reset(), and OpenDDS::DCPS::TransportClient::send().

1965 {
1966  //this serves to get TransportClient's max_transaction_id_seen_
1967  //to the correct value for this list of transactions
1968  if (max_suspended_transaction_id_ != 0) {
1971  }
1972 
1973  //this serves to actually have the send proceed in
1974  //sending the samples to the datalinks by passing it
1975  //the min_suspended_transaction_id_ which should be the
1976  //TransportClient's expected_transaction_id_
1979  this->available_data_list_.reset();
1980 }
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
SendStateDataSampleList available_data_list_
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.

◆ serialize_sample()

ACE_Message_Block * OpenDDS::DCPS::DataWriterImpl::serialize_sample ( const Sample sample)
protected

Definition at line 2970 of file DataWriterImpl.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_ERROR, ACE_NEW_MALLOC_RETURN, ACE_NEW_RETURN, OpenDDS::DCPS::TypeSupportImpl::base_extensibility(), OpenDDS::DCPS::DataWriterImpl::EncodingMode::buffer_size(), OpenDDS::DCPS::TransportClient::cdr_encapsulation(), data_allocator_, db_allocator_, OpenDDS::STUN::encoding(), OpenDDS::DCPS::DataWriterImpl::EncodingMode::encoding(), encoding_mode_, OpenDDS::DCPS::LogLevel::Error, OpenDDS::DCPS::EncapsulationHeader::from_encoding(), OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), get_db_lock(), OpenDDS::DCPS::Sample::key_only(), LM_ERROR, OpenDDS::DCPS::log_level, ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), OpenDDS::DCPS::Sample::serialize(), OpenDDS::DCPS::EncapsulationHeader::set_encapsulation_options(), skip_serialize_, OpenDDS::DCPS::Sample::to_message_block(), type_support_, and ACE_Time_Value::zero.

Referenced by get_or_create_instance_handle(), and write_sample().

2971 {
2972  const bool encapsulated = cdr_encapsulation();
2973  const Encoding& encoding = encoding_mode_.encoding();
2974  Message_Block_Ptr mb;
2975  ACE_Message_Block* tmp_mb;
2976 
2977  // Don't use the cached allocator for the registered sample message
2978  // block.
2979  if (sample.key_only() && !skip_serialize_) {
2980  ACE_NEW_RETURN(tmp_mb,
2982  encoding_mode_.buffer_size(sample),
2984  0, // cont
2985  0, // data
2986  0, // alloc_strategy
2987  get_db_lock()),
2988  0);
2989  } else {
2990  ACE_NEW_MALLOC_RETURN(tmp_mb,
2991  static_cast<ACE_Message_Block*>(
2992  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
2994  encoding_mode_.buffer_size(sample),
2996  0, // cont
2997  0, // data
2998  data_allocator_.get(), // allocator_strategy
2999  get_db_lock(), // data block locking_strategy
3003  db_allocator_.get(),
3004  mb_allocator_.get()),
3005  0);
3006  }
3007  mb.reset(tmp_mb);
3008 
3009  if (skip_serialize_) {
3010  if (!sample.to_message_block(*mb)) {
3011  if (log_level >= LogLevel::Error) {
3012  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3013  "to_message_block failed\n"));
3014  }
3015  return 0;
3016  }
3017  } else {
3018  Serializer serializer(mb.get(), encoding);
3019  if (encapsulated) {
3020  EncapsulationHeader encap;
3021  if (!encap.from_encoding(encoding, type_support_->base_extensibility())) {
3022  // from_encoding logged the error
3023  return 0;
3024  }
3025  if (!(serializer << encap)) {
3026  if (log_level >= LogLevel::Error) {
3027  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3028  "failed to serialize data encapsulation header\n"));
3029  }
3030  return 0;
3031  }
3032  }
3033  if (!sample.serialize(serializer)) {
3034  if (log_level >= LogLevel::Error) {
3035  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3036  "failed to serialize sample data\n"));
3037  }
3038  return 0;
3039  }
3040  if (encapsulated && !EncapsulationHeader::set_encapsulation_options(mb)) {
3041  if (log_level >= LogLevel::Error) {
3042  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: DataWriterImpl::serialize_sample: "
3043  "set_encapsulation_options failed\n"));
3044  }
3045  return 0;
3046  }
3047  }
3048 
3049  return mb.release();
3050 }
#define ACE_ERROR(X)
static const ACE_Time_Value max_time
DataBlockLockPool::DataBlockLock * get_db_lock()
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
static bool set_encapsulation_options(Message_Block_Ptr &mb)
Definition: Serializer.cpp:251
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
TypeSupportImpl * type_support_
unique_ptr< DataAllocator > data_allocator_
unique_ptr< MessageBlockAllocator > mb_allocator_
The message block allocator.
OpenDDS_Dcps_Export LogLevel log_level
static const ACE_Time_Value zero
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
size_t buffer_size(const Sample &sample) const
virtual Extensibility base_extensibility() const =0
Returns the extensibility of just the topic type.
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
const DCPS::Encoding encoding(DCPS::Encoding::KIND_UNALIGNED_CDR, DCPS::ENDIAN_BIG)
unique_ptr< DataBlockAllocator > db_allocator_
The data block allocator.

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_listener ( DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 985 of file DataWriterImpl.cpp.

References listener_, listener_mask_, listener_mutex_, and DDS::RETCODE_OK.

Referenced by cleanup(), and init().

987 {
989  listener_mask_ = mask;
990  //note: OK to duplicate a nil object ref
991  listener_ = DDS::DataWriterListener::_duplicate(a_listener);
992  return DDS::RETCODE_OK;
993 }
DDS::DataWriterListener_var listener_
Used to notify the entity for relevant events.
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
const ReturnCode_t RETCODE_OK

◆ set_marshal_skip_serialize()

void OpenDDS::DCPS::DataWriterImpl::set_marshal_skip_serialize ( bool  value)
inline

Definition at line 124 of file DataWriterImpl.h.

References value.

125  {
127  }
const LogLevel::Value value
Definition: debug.cpp:61

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::set_qos ( const DDS::DataWriterQos qos)
virtual

Definition at line 916 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), data_container_, DDS::DataWriterQos::deadline, domain_id_, dp_id_, OpenDDS::DCPS::Observer::e_QOS_CHANGED, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::EntityImpl::get_observer(), LM_ERROR, OpenDDS::DCPS::Observer::on_qos_changed(), OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, passed_qos_, DDS::DeadlineQosPolicy::period, publication_id_, publisher_servant_, qos_, DDS::DataWriterQos::representation, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, OpenDDS::DCPS::Qos_Helper::valid(), and DDS::DataRepresentationQosPolicy::value.

917 {
923 
924  DDS::DataWriterQos new_qos = qos;
926  if (Qos_Helper::valid(new_qos) && Qos_Helper::consistent(new_qos)) {
927  if (qos_ == new_qos)
928  return DDS::RETCODE_OK;
929 
930  if (enabled_) {
931  if (!Qos_Helper::changeable(qos_, new_qos)) {
933  }
934 
935  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
936  DDS::PublisherQos publisherQos;
937  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
938 
939  bool status = false;
940  if (publisher) {
941  publisher->get_qos(publisherQos);
942  status
943  = disco->update_publication_qos(domain_id_,
944  dp_id_,
945  this->publication_id_,
946  new_qos,
947  publisherQos);
948  }
949  if (!status) {
950  ACE_ERROR_RETURN((LM_ERROR,
951  ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
952  ACE_TEXT("qos not updated.\n")),
954  }
955 
956  if (!(qos_ == new_qos)) {
957  data_container_->set_deadline_period(TimeDuration(qos.deadline.period));
958  qos_ = new_qos;
959  }
960  }
961 
962  qos_ = new_qos;
963  passed_qos_ = qos;
964 
966  if (observer) {
967  observer->on_qos_changed(this);
968  }
969 
970  return DDS::RETCODE_OK;
971 
972  } else {
974  }
975 }
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
DeadlineQosPolicy deadline
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DataRepresentationQosPolicy representation
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DDS::DataWriterQos passed_qos_
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
DDS::DomainId_t domain_id_
The domain id.
ACE_TEXT("TCP_Factory")
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_UNSUPPORTED
#define ACE_ERROR_RETURN(X, Y)
DataRepresentationIdSeq value
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
#define TheServiceParticipant
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
GUID_t publication_id_
The repository id of this datawriter/publication.
RcHandle< WriteDataContainer > data_container_
The sample data container.
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)

◆ set_wait_pending_deadline()

void OpenDDS::DCPS::DataWriterImpl::set_wait_pending_deadline ( const MonotonicTimePoint deadline)

Set deadline to complete wait_pending by. If 0, then wait_pending will wait indefinitely if needed.

Definition at line 2787 of file DataWriterImpl.cpp.

References wait_pending_deadline_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter(), and OpenDDS::DCPS::PublisherImpl::set_wait_pending_deadline().

2788 {
2789  wait_pending_deadline_ = deadline;
2790 }
MonotonicTimePoint wait_pending_deadline_

◆ setup_serialization()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::setup_serialization ( )
protected

Setup CDR serialization options.

Definition at line 2823 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, OpenDDS::DCPS::DataWriterImpl::EncodingMode::buffer_size_bound(), OpenDDS::DCPS::TransportClient::cdr_encapsulation(), data_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::DataWriterImpl::EncodingMode::encoding(), encoding_mode_, OpenDDS::DCPS::SerializedSizeBound::get(), OpenDDS::DCPS::Encoding::kind(), OpenDDS::DCPS::Encoding::kind_to_string(), OpenDDS::DCPS::Encoding::KIND_UNALIGNED_CDR, OpenDDS::DCPS::Encoding::KIND_XCDR1, LM_DEBUG, LM_NOTICE, LM_WARNING, OpenDDS::DCPS::log_level, OpenDDS::DCPS::TypeSupportImpl::max_extensibility(), OpenDDS::DCPS::MUTABLE, n_chunks_, OpenDDS::DCPS::TypeSupportImpl::name(), OpenDDS::DCPS::LogLevel::Notice, qos_, OpenDDS::DCPS::repr_to_encoding_kind(), OpenDDS::DCPS::repr_to_string(), DDS::DataWriterQos::representation, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::TransportClient::swap_bytes(), type_support_, OpenDDS::DCPS::UNALIGNED_CDR_DATA_REPRESENTATION, OpenDDS::DCPS::DataWriterImpl::EncodingMode::valid(), DDS::DataRepresentationQosPolicy::value, and OpenDDS::DCPS::LogLevel::Warning.

Referenced by enable().

2824 {
2825  if (qos_.representation.value.length() > 0 &&
2827  // If the QoS explicitly sets XCDR, XCDR2, or XML, force encapsulation
2828  cdr_encapsulation(true);
2829  }
2830 
2831  if (cdr_encapsulation()) {
2832  Encoding::Kind encoding_kind;
2833  // There should only be one data representation in a DataWriter, so
2834  // simply use qos_.representation.value[0].
2835  if (repr_to_encoding_kind(qos_.representation.value[0], encoding_kind)) {
2836  encoding_mode_ = EncodingMode(type_support_, encoding_kind, swap_bytes());
2837  if (encoding_kind == Encoding::KIND_XCDR1 &&
2839  if (log_level >= LogLevel::Notice) {
2840  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2841  "Encountered unsupported combination of XCDR1 encoding and mutable extensibility "
2842  "for writer of type %C\n",
2843  type_support_->name()));
2844  }
2845  return DDS::RETCODE_ERROR;
2846  } else if (encoding_kind == Encoding::KIND_UNALIGNED_CDR) {
2847  if (log_level >= LogLevel::Notice) {
2848  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2849  "Unaligned CDR is not supported by transport types that require encapsulation\n"));
2850  }
2851  return DDS::RETCODE_ERROR;
2852  }
2853  } else if (log_level >= LogLevel::Warning) {
2854  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: DataWriterImpl::setup_serialization: "
2855  "Encountered unsupported or unknown data representation: %C ",
2856  "for writer of type %C\n",
2858  type_support_->name()));
2859  }
2860  } else {
2861  // Pick unaligned CDR as it is the implicit representation for non-encapsulated
2863  }
2864  if (!encoding_mode_.valid()) {
2865  if (log_level >= LogLevel::Notice) {
2866  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::setup_serialization: "
2867  "Could not find a valid data representation\n"));
2868  }
2869  return DDS::RETCODE_ERROR;
2870  }
2871 
2872  if (DCPS_debug_level >= 2) {
2873  ACE_DEBUG((LM_DEBUG, "(%P|%t) WriterImpl::setup_serialization: "
2874  "Setup successfully with %C data representation.\n",
2876  }
2877 
2878  // Set up allocator with reserved space for data if it is bounded
2879  const SerializedSizeBound buffer_size_bound = encoding_mode_.buffer_size_bound();
2880  if (buffer_size_bound) {
2881  const size_t chunk_size = buffer_size_bound.get();
2882  data_allocator_.reset(new DataAllocator(n_chunks_, chunk_size));
2883  if (DCPS_debug_level >= 2) {
2884  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2885  "using data allocator at %x with %B %B byte chunks\n",
2886  data_allocator_.get(),
2887  n_chunks_,
2888  chunk_size));
2889  }
2890  } else if (DCPS_debug_level >= 2) {
2891  ACE_DEBUG((LM_DEBUG, "(%P|%t) DataWriterImpl::setup_serialization: "
2892  "sample size is unbounded, not using data allocator, "
2893  "always allocating from heap\n"));
2894  }
2895  return DDS::RETCODE_OK;
2896 }
#define ACE_DEBUG(X)
SerializedSizeBound buffer_size_bound() const
size_t n_chunks_
The number of chunks for the cached allocator.
#define ACE_ERROR(X)
virtual Extensibility max_extensibility() const =0
static String kind_to_string(Kind value)
Definition: Serializer.cpp:326
DCPS::String repr_to_string(const DDS::DataRepresentationId_t &repr)
Definition: DCPS_Utils.cpp:473
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
DataRepresentationQosPolicy representation
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
class OpenDDS::DCPS::DataWriterImpl::EncodingMode encoding_mode_
virtual const char * name() const =0
TypeSupportImpl * type_support_
unique_ptr< DataAllocator > data_allocator_
const DDS::DataRepresentationId_t UNALIGNED_CDR_DATA_REPRESENTATION
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
OpenDDS_Dcps_Export LogLevel log_level
bool repr_to_encoding_kind(DDS::DataRepresentationId_t repr, Encoding::Kind &kind)
Definition: DCPS_Utils.cpp:455
const ReturnCode_t RETCODE_ERROR
const ReturnCode_t RETCODE_OK
DataRepresentationIdSeq value

◆ should_ack()

bool OpenDDS::DCPS::DataWriterImpl::should_ack ( ) const

Does this writer have samples to be acknowledged?

Definition at line 1016 of file DataWriterImpl.cpp.

References readers_.

1017 {
1018  // N.B. It may be worthwhile to investigate a more efficient
1019  // heuristic for determining if a writer should send SAMPLE_ACK
1020  // control samples. Perhaps based on a sequence number delta?
1021  return this->readers_.size() != 0;
1022 }

◆ track_sequence_number()

void OpenDDS::DCPS::DataWriterImpl::track_sequence_number ( GUIDSeq filter_out)
private

Definition at line 1930 of file DataWriterImpl.cpp.

References ACE_GUARD, get_max_sn(), reader_info_, and reader_info_lock_.

Referenced by write().

1931 {
1932  const SequenceNumber sn = get_max_sn();
1933  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
1934 
1935 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
1936  // Track individual expected sequence numbers in ReaderInfo
1937  RepoIdSet excluded;
1938 
1939  if (filter_out && !reader_info_.empty()) {
1940  const GUID_t* buf = filter_out->get_buffer();
1941  excluded.insert(buf, buf + filter_out->length());
1942  }
1943 
1944  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1945  end = reader_info_.end(); iter != end; ++iter) {
1946  // If not excluding this reader, update expected sequence
1947  if (excluded.count(iter->first) == 0) {
1948  iter->second.expected_sequence_ = sn;
1949  }
1950  }
1951 
1952 #else
1953  ACE_UNUSED_ARG(filter_out);
1954  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
1955  end = reader_info_.end(); iter != end; ++iter) {
1956  iter->second.expected_sequence_ = sn;
1957  }
1958 
1959 #endif // OPENDDS_NO_CONTENT_FILTERED_TOPIC
1960 
1961 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
GuidSet RepoIdSet
Definition: GuidUtils.h:113
SequenceNumber get_max_sn() const
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
RepoIdToReaderInfoMap reader_info_
ACE_Thread_Mutex reader_info_lock_

◆ transport_assoc_done()

void OpenDDS::DCPS::DataWriterImpl::transport_assoc_done ( int  flags,
const GUID_t remote_id 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 279 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::TransportClient::ASSOC_ACTIVE, OpenDDS::DCPS::TransportClient::ASSOC_OK, association_complete_i(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, LM_ERROR, LM_INFO, lock_, and publication_id_.

280 {
281  DBG_ENTRY_LVL("DataWriterImpl", "transport_assoc_done", 6);
282 
283  if (!(flags & ASSOC_OK)) {
284  if (DCPS_debug_level) {
285  ACE_ERROR((LM_ERROR,
286  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
287  ACE_TEXT("ERROR: transport layer failed to associate %C\n"),
288  LogGuid(remote_id).c_str()));
289  }
290 
291  return;
292  }
293 
295 
296  if (DCPS_debug_level) {
297  ACE_DEBUG((LM_INFO,
298  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
299  ACE_TEXT("writer %C succeeded in associating with reader %C\n"),
300  LogGuid(publication_id_).c_str(),
301  LogGuid(remote_id).c_str()));
302  }
303 
304  if (flags & ASSOC_ACTIVE) {
305 
306  // Have we already received an association_complete() callback?
307  if (DCPS_debug_level) {
308  ACE_DEBUG((LM_DEBUG,
309  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
310  ACE_TEXT("writer %C reader %C calling association_complete_i\n"),
311  LogGuid(publication_id_).c_str(),
312  LogGuid(remote_id).c_str()));
313  }
314  association_complete_i(remote_id);
315 
316  } else {
317  // In the current implementation, DataWriter is always active, so this
318  // code will not be applicable.
319  if (DCPS_debug_level) {
320  ACE_ERROR((LM_ERROR,
321  ACE_TEXT("(%P|%t) DataWriterImpl::transport_assoc_done: ")
322  ACE_TEXT("ERROR: DataWriter (%C) should always be active in current implementation\n"),
323  LogGuid(publication_id_).c_str()));
324  }
325  }
326 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
void association_complete_i(const GUID_t &remote_id)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Recursive_Thread_Mutex lock_
GUID_t publication_id_
The repository id of this datawriter/publication.

◆ transport_discovery_change()

void OpenDDS::DCPS::DataWriterImpl::transport_discovery_change ( )
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 2805 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::connection_info(), domain_id(), domain_id_, dp_id_, lock_, OpenDDS::DCPS::TransportClient::populate_connection_info(), publication_id_, ACE_Guard< ACE_LOCK >::release(), and TheServiceParticipant.

2806 {
2808  const TransportLocatorSeq& trans_conf_info = connection_info();
2809 
2811  const GUID_t dp_id_copy = dp_id_;
2812  const GUID_t publication_id_copy = publication_id_;
2813  const int domain_id = domain_id_;
2814  guard.release();
2815 
2816  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id);
2817  disco->update_publication_locators(domain_id,
2818  dp_id_copy,
2819  publication_id_copy,
2820  trans_conf_info);
2821 }
const TransportLocatorSeq & connection_info() const
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
sequence< TransportLocator > TransportLocatorSeq
DDS::DomainId_t domain_id_
The domain id.
DDS::DomainId_t domain_id() const
ACE_Recursive_Thread_Mutex lock_
#define TheServiceParticipant
GUID_t publication_id_
The repository id of this datawriter/publication.

◆ unregister_all()

void OpenDDS::DCPS::DataWriterImpl::unregister_all ( )

Delegate to WriteDataContainer to unregister all instances.

Definition at line 2052 of file DataWriterImpl.cpp.

References data_container_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

2053 {
2054  data_container_->unregister_all();
2055 }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ unregister_for_reader()

void OpenDDS::DCPS::DataWriterImpl::unregister_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid 
)
virtual

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 832 of file DataWriterImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_reader().

835 {
836  TransportClient::unregister_for_reader(participant, writerid, readerid);
837 }
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)

◆ unregister_instance_i()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_i ( DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)

Delegate to the WriteDataContainer to unregister and tell the transport to broadcast the unregistered instance.

Definition at line 1685 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), DDS::WriterDataLifecycleQosPolicy::autodispose_unregistered_instances, create_control_message(), data_container_, DBG_ENTRY_LVL, dispose_and_unregister(), OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), LM_ERROR, OpenDDS::DCPS::move(), qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, send_all_to_flush_control(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::UNREGISTER_INSTANCE, and DDS::DataWriterQos::writer_data_lifecycle.

Referenced by unregister_instance_w_timestamp(), and unregister_instances().

1687 {
1688  DBG_ENTRY_LVL("DataWriterImpl","unregister_instance_i",6);
1689 
1690  if (!enabled_) {
1691  ACE_ERROR_RETURN((LM_ERROR,
1692  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::unregister_instance_i: ")
1693  ACE_TEXT("Entity is not enabled.\n")),
1695  }
1696 
1697  // According to spec 1.2, autodispose_unregistered_instances true causes
1698  // dispose on the instance prior to calling unregister operation.
1700  return this->dispose_and_unregister(handle, source_timestamp);
1701  }
1702 
1705  Message_Block_Ptr unregistered_sample_data;
1706  ret = this->data_container_->unregister(handle, unregistered_sample_data);
1707 
1708  if (ret != DDS::RETCODE_OK) {
1709  ACE_ERROR_RETURN((LM_ERROR,
1710  ACE_TEXT("(%P|%t) ERROR: ")
1711  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1712  ACE_TEXT("unregister with container failed.\n")),
1713  ret);
1714  }
1715 
1716  DataSampleElement* element = 0;
1717  ret = this->data_container_->obtain_buffer_for_control(element);
1718 
1719  if (ret != DDS::RETCODE_OK) {
1720  ACE_ERROR_RETURN((LM_ERROR,
1721  ACE_TEXT("(%P|%t) ERROR: ")
1722  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1723  ACE_TEXT("obtain_buffer_for_control returned %d.\n"),
1724  ret),
1725  ret);
1726  }
1727 
1729  element->get_header(),
1730  move(unregistered_sample_data),
1731  source_timestamp));
1732  element->set_sample(move(sample));
1733 
1734  ret = this->data_container_->enqueue_control(element);
1735 
1736  if (ret != DDS::RETCODE_OK) {
1737  data_container_->release_buffer(element);
1738  ACE_ERROR_RETURN((LM_ERROR,
1739  ACE_TEXT("(%P|%t) ERROR: ")
1740  ACE_TEXT("DataWriterImpl::unregister_instance_i: ")
1741  ACE_TEXT("enqueue_control failed.\n")),
1742  ret);
1743  }
1744 
1746  return DDS::RETCODE_OK;
1747 }
void send_all_to_flush_control(ACE_Guard< ACE_Recursive_Thread_Mutex > &guard)
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NOT_ENABLED
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
DDS::ReturnCode_t dispose_and_unregister(DDS::InstanceHandle_t handle, const DDS::Time_t &timestamp)
const ReturnCode_t RETCODE_ERROR
WriterDataLifecycleQosPolicy writer_data_lifecycle
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
#define ACE_ERROR_RETURN(X, Y)
RcHandle< WriteDataContainer > data_container_
The sample data container.
ACE_Message_Block * create_control_message(MessageId message_id, DataSampleHeader &header, Message_Block_Ptr data, const DDS::Time_t &source_timestamp)

◆ unregister_instance_w_timestamp()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::unregister_instance_w_timestamp ( const Sample sample,
DDS::InstanceHandle_t  instance_handle,
const DDS::Time_t timestamp 
)

Definition at line 2929 of file DataWriterImpl.cpp.

References instance_must_exist(), DDS::RETCODE_OK, and unregister_instance_i().

Referenced by OpenDDS::XTypes::DynamicDataWriterImpl::unregister_instance_w_timestamp(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::unregister_instance_w_timestamp().

2933 {
2935  "unregister_instance_w_timestamp", sample, instance_handle, /* remove = */ true);
2936  if (rc != DDS::RETCODE_OK) {
2937  return rc;
2938  }
2939  return unregister_instance_i(instance_handle, timestamp);
2940 }
DDS::ReturnCode_t unregister_instance_i(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
DDS::ReturnCode_t instance_must_exist(const char *method_name, const Sample &sample, DDS::InstanceHandle_t &instance_handle, bool remove=false)
const ReturnCode_t RETCODE_OK

◆ unregister_instances()

void OpenDDS::DCPS::DataWriterImpl::unregister_instances ( const DDS::Time_t source_timestamp)

Unregister all registered instances and tell the transport to broadcast the unregistered instances.

Definition at line 1813 of file DataWriterImpl.cpp.

References ACE_GUARD, data_container_, sync_unreg_rem_assocs_lock_, and unregister_instance_i().

Referenced by prepare_to_delete().

1814 {
1816 
1817  while (!this->data_container_->instances_.empty()) {
1818  this->unregister_instance_i(this->data_container_->instances_.begin()->first, source_timestamp);
1819  }
1820 }
ACE_Thread_Mutex sync_unreg_rem_assocs_lock_
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::ReturnCode_t unregister_instance_i(DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp)
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ update_incompatible_qos()

void OpenDDS::DCPS::DataWriterImpl::update_incompatible_qos ( const IncompatibleQosStatus status)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 854 of file DataWriterImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, CORBA::is_nil(), OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, listener_for(), lock_, OpenDDS::DCPS::EntityImpl::notify_status_condition(), DDS::OFFERED_INCOMPATIBLE_QOS_STATUS, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::EntityImpl::set_status_changed_flag(), OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.

855 {
856  DDS::DataWriterListener_var listener =
858 
860 
861 #if 0
862 
863  if (this->offered_incompatible_qos_status_.total_count == status.total_count) {
864  // This test should make the method idempotent.
865  return;
866  }
867 
868 #endif
869 
871 
872  // copy status and increment change
873  offered_incompatible_qos_status_.total_count = status.total_count;
875  status.count_since_last_send;
876  offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
877  offered_incompatible_qos_status_.policies = status.policies;
878 
879  if (!CORBA::is_nil(listener.in())) {
880  listener->on_offered_incompatible_qos(this, offered_incompatible_qos_status_);
881 
882  // TBD - Why does the spec say to change this but not change the
883  // ChangeFlagStatus after a listener call?
885  }
886 
888 }
void set_status_changed_flag(DDS::StatusKind status, bool status_changed_flag)
Definition: EntityImpl.cpp:68
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::DataWriterListener_ptr listener_for(DDS::StatusKind kind)
const StatusKind OFFERED_INCOMPATIBLE_QOS_STATUS
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
ACE_Recursive_Thread_Mutex lock_
Boolean is_nil(T x)

◆ update_locators()

void OpenDDS::DCPS::DataWriterImpl::update_locators ( const GUID_t remote,
const TransportLocatorSeq locators 
)
virtual

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 840 of file DataWriterImpl.cpp.

References ACE_GUARD, reader_info_, reader_info_lock_, and OpenDDS::DCPS::TransportClient::update_locators().

842 {
843  {
844  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, reader_info_lock_);
845  RepoIdToReaderInfoMap::const_iterator iter = reader_info_.find(readerId);
846  if (iter == reader_info_.end()) {
847  return;
848  }
849  }
850  TransportClient::update_locators(readerId, locators);
851 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RepoIdToReaderInfoMap reader_info_
void update_locators(const GUID_t &remote, const TransportLocatorSeq &locators)
ACE_Thread_Mutex reader_info_lock_

◆ update_subscription_params()

void OpenDDS::DCPS::DataWriterImpl::update_subscription_params ( const GUID_t readerId,
const DDS::StringSeq params 
)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 891 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, LM_WARNING, lock_, publication_id_, reader_info_, reader_info_lock_, and TheServiceParticipant.

893 {
894 #ifdef OPENDDS_NO_CONTENT_FILTERED_TOPIC
895  ACE_UNUSED_ARG(readerId);
896  ACE_UNUSED_ARG(params);
897 #else
899  ACE_GUARD(ACE_Thread_Mutex, reader_info_guard, this->reader_info_lock_);
900  RepoIdToReaderInfoMap::iterator iter = reader_info_.find(readerId);
901 
902  if (iter != reader_info_.end()) {
903  iter->second.expression_params_ = params;
904 
905  } else if (DCPS_debug_level > 4 &&
906  TheServiceParticipant->publisher_content_filter()) {
907  ACE_DEBUG((LM_WARNING,
908  ACE_TEXT("(%P|%t) WARNING: DataWriterImpl::update_subscription_params()")
909  ACE_TEXT(" - writer: %C has no info about reader: %C\n"),
910  LogGuid(this->publication_id_).c_str(), LogGuid(readerId).c_str()));
911  }
912 
913 #endif
914 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
RepoIdToReaderInfoMap reader_info_
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_Thread_Mutex reader_info_lock_
ACE_Recursive_Thread_Mutex lock_
#define TheServiceParticipant
GUID_t publication_id_
The repository id of this datawriter/publication.

◆ wait_for_acknowledgments()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait)
virtual

Definition at line 1089 of file DataWriterImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), create_ack_token(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::SequenceNumber::getValue(), DDS::ReliabilityQosPolicy::kind, LM_DEBUG, qos_, DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_OK, send_request_ack(), OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_, and wait_for_specific_ack().

1090 {
1092  return DDS::RETCODE_OK;
1093 
1095 
1096  if (ret != DDS::RETCODE_OK)
1097  return ret;
1098 
1099  DataWriterImpl::AckToken token = create_ack_token(max_wait);
1100  if (DCPS_debug_level) {
1101  ACE_DEBUG ((LM_DEBUG, ACE_TEXT("(%P|%t) DataWriterImpl::wait_for_acknowledgments")
1102  ACE_TEXT(" waiting for acknowledgment of sequence %q at %T\n"),
1103  token.sequence_.getValue()));
1104  }
1105  return wait_for_specific_ack(token);
1106 }
#define ACE_DEBUG(X)
ReliabilityQosPolicy reliability
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
AckToken create_ack_token(DDS::Duration_t max_wait) const
Create an AckToken for ack operations.
DDS::ReturnCode_t wait_for_specific_ack(const AckToken &token)
ReliabilityQosPolicyKind kind
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const ReturnCode_t RETCODE_OK
DDS::ReturnCode_t send_request_ack()

◆ wait_for_specific_ack()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::wait_for_specific_ack ( const AckToken token)
protected

Definition at line 1109 of file DataWriterImpl.cpp.

References data_container_, OpenDDS::DCPS::DataWriterImpl::AckToken::deadline(), OpenDDS::DCPS::DataWriterImpl::AckToken::deadline_is_infinite(), and OpenDDS::DCPS::DataWriterImpl::AckToken::sequence_.

Referenced by wait_for_acknowledgments().

1110 {
1111  return this->data_container_->wait_ack_of_seq(token.deadline(), token.deadline_is_infinite(), token.sequence_);
1112 }
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ wait_pending()

void OpenDDS::DCPS::DataWriterImpl::wait_pending ( )

Wait for pending data and control messages to drain.

Definition at line 2706 of file DataWriterImpl.cpp.

References controlTracker, data_container_, OpenDDS::DCPS::TransportRegistry::instance(), OpenDDS::DCPS::MessageTracker::wait_messages_pending(), and wait_pending_deadline_.

Referenced by OpenDDS::DCPS::PublisherImpl::delete_datawriter().

2707 {
2708  if (!TransportRegistry::instance()->released()) {
2709  data_container_->wait_pending(wait_pending_deadline_);
2710  controlTracker.wait_messages_pending("DataWriterImpl::wait_pending", wait_pending_deadline_);
2711  }
2712 }
MonotonicTimePoint wait_pending_deadline_
static TransportRegistry * instance()
Return a singleton instance of this class.
void wait_messages_pending(const char *caller)
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ write()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write ( Message_Block_Ptr  sample,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp,
GUIDSeq filter_out,
const void *  real_data 
)

Delegate to the WriteDataContainer to queue the instance sample and finally tell the transport to send the sample.

Parameters
filter_outcan either be null (if the writer can't or won't evaluate the filters), or a list of associated reader GUID_ts that should NOT get the data sample due to content filtering.

Definition at line 1823 of file DataWriterImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), available_data_list_, coherent_, coherent_samples_, create_sample_data_message(), data_container_, DBG_ENTRY_LVL, OpenDDS::DCPS::Observer::e_SAMPLE_SENT, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), get_lock(), OpenDDS::DCPS::EntityImpl::get_observer(), get_unsent_data(), get_value_dispatcher(), OpenDDS::DCPS::DataSampleHeader::instance_state(), last_liveliness_activity_time_, LM_ERROR, lock_, max_suspended_transaction_id_, min_suspended_transaction_id_, OpenDDS::DCPS::move(), OpenDDS::DCPS::Observer::on_sample_sent(), publisher_servant_, ACE_Guard< ACE_LOCK >::release(), DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_TIMEOUT, OpenDDS::DCPS::TransportClient::send(), OpenDDS::DCPS::DataSampleHeader::sequence_, OpenDDS::DCPS::DataSampleElement::set_filter_out(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::TimePoint_T< AceClock >::set_to_now(), and track_sequence_number().

Referenced by OpenDDS::DCPS::DataDurabilityCache::get_data(), and write_sample().

1828 {
1829  DBG_ENTRY_LVL("DataWriterImpl","write",6);
1830 
1832 
1833  // take ownership of sequence allocated in FooDWImpl::write_w_timestamp()
1834  GUIDSeq_var filter_out_var(filter_out);
1835 
1836  if (!enabled_) {
1837  ACE_ERROR_RETURN((LM_ERROR,
1838  ACE_TEXT("(%P|%t) ERROR: DataWriterImpl::write: ")
1839  ACE_TEXT("Entity is not enabled.\n")),
1841  }
1842 
1844  dc_guard,
1845  get_lock(),
1847 
1848  DataSampleElement* element = 0;
1849  DDS::ReturnCode_t ret = this->data_container_->obtain_buffer(element, handle);
1850 
1851  if (ret == DDS::RETCODE_TIMEOUT) {
1852  return ret; // silent for timeout
1853 
1854  } else if (ret != DDS::RETCODE_OK) {
1855  ACE_ERROR_RETURN((LM_ERROR,
1856  ACE_TEXT("(%P|%t) ERROR: ")
1857  ACE_TEXT("DataWriterImpl::write: ")
1858  ACE_TEXT("obtain_buffer returned %d.\n"),
1859  ret),
1860  ret);
1861  }
1862 
1863  Message_Block_Ptr temp;
1864  ret = create_sample_data_message(move(data),
1865  handle,
1866  element->get_header(),
1867  temp,
1868  source_timestamp,
1869  (filter_out != 0));
1870  element->set_sample(move(temp));
1871 
1872  if (ret != DDS::RETCODE_OK) {
1873  data_container_->release_buffer(element);
1874  return ret;
1875  }
1876 
1877  element->set_filter_out(filter_out_var._retn()); // ownership passed to element
1878 
1879  ret = this->data_container_->enqueue(element, handle);
1880 
1881  if (ret != DDS::RETCODE_OK) {
1882  data_container_->release_buffer(element);
1883  ACE_ERROR_RETURN((LM_ERROR,
1884  ACE_TEXT("(%P|%t) ERROR: ")
1885  ACE_TEXT("DataWriterImpl::write: ")
1886  ACE_TEXT("enqueue failed.\n")),
1887  ret);
1888  }
1890 
1892 
1893  if (this->coherent_) {
1894  ++this->coherent_samples_;
1895  }
1896  SendStateDataSampleList list;
1897 
1898  ACE_UINT64 transaction_id = this->get_unsent_data(list);
1899 
1900  RcHandle<PublisherImpl> publisher = this->publisher_servant_.lock();
1901  if (!publisher || publisher->is_suspended()) {
1902  if (min_suspended_transaction_id_ == 0) {
1903  //provides transaction id for lower bound of suspended transactions
1904  //or transaction id for single suspended write transaction
1905  min_suspended_transaction_id_ = transaction_id;
1906  } else {
1907  //when multiple write transactions have suspended, provides the upper bound
1908  //for suspended transactions.
1909  max_suspended_transaction_id_ = transaction_id;
1910  }
1911  this->available_data_list_.enqueue_tail(list);
1912 
1913  } else {
1914  dc_guard.release();
1915  guard.release();
1916  this->send(list, transaction_id);
1917  }
1918 
1919  const ValueDispatcher* vd = get_value_dispatcher();
1921  if (observer && real_data && vd) {
1922  Observer::Sample s(handle, element->get_header().instance_state(), source_timestamp, element->get_header().sequence_, real_data, *vd);
1923  observer->on_sample_sent(this, s);
1924  }
1925 
1926  return DDS::RETCODE_OK;
1927 }
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
MonotonicTimePoint last_liveliness_activity_time_
Timestamp of last write/dispose/assert_liveliness.
WeakRcHandle< PublisherImpl > publisher_servant_
The publisher servant which creates this datawriter.
Observer_rch get_observer(Observer::Event e)
Definition: EntityImpl.cpp:94
SendStateDataSampleList available_data_list_
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DDS::InstanceHandle_t instance_handle, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
const ValueDispatcher * get_value_dispatcher() const
void enqueue_tail(const DataSampleElement *element)
const ReturnCode_t RETCODE_TIMEOUT
RcHandle< Observer > Observer_rch
Definition: Observer.h:101
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
ACE_UINT64 min_suspended_transaction_id_
The cached available data while suspending and associated transaction ids.
ACE_TEXT("TCP_Factory")
const ReturnCode_t RETCODE_NOT_ENABLED
unsigned long long ACE_UINT64
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const ReturnCode_t RETCODE_ERROR
ACE_UINT64 get_unsent_data(SendStateDataSampleList &list)
const ReturnCode_t RETCODE_OK
ACE_Recursive_Thread_Mutex & get_lock() const
ACE_Recursive_Thread_Mutex lock_
#define ACE_ERROR_RETURN(X, Y)
void track_sequence_number(GUIDSeq *filter_out)
RcHandle< WriteDataContainer > data_container_
The sample data container.

◆ write_sample()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write_sample ( const Sample sample,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp,
GUIDSeq filter_out 
)

Definition at line 3218 of file DataWriterImpl.cpp.

References ACE_ERROR, LM_NOTICE, OpenDDS::DCPS::log_level, OpenDDS::DCPS::move(), OpenDDS::DCPS::Sample::native_data(), OpenDDS::DCPS::LogLevel::Notice, OPENDDS_END_VERSIONED_NAMESPACE_DECL, DDS::RETCODE_ERROR, serialize_sample(), and write().

Referenced by write_w_timestamp().

3223 {
3224  Message_Block_Ptr serialized(serialize_sample(sample));
3225  if (!serialized) {
3226  if (log_level >= LogLevel::Notice) {
3227  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: DataWriterImpl::write_sample: "
3228  "failed to serialize sample\n"));
3229  }
3230  return DDS::RETCODE_ERROR;
3231  }
3232 
3233  return write(move(serialized), handle, source_timestamp, filter_out, sample.native_data());
3234 }
#define ACE_ERROR(X)
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
ACE_Message_Block * serialize_sample(const Sample &sample)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
DDS::ReturnCode_t write(Message_Block_Ptr sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out, const void *real_data)
OpenDDS_Dcps_Export LogLevel log_level
const ReturnCode_t RETCODE_ERROR

◆ write_w_timestamp()

DDS::ReturnCode_t OpenDDS::DCPS::DataWriterImpl::write_w_timestamp ( const Sample sample,
DDS::InstanceHandle_t  handle,
const DDS::Time_t source_timestamp 
)
protected

Definition at line 3170 of file DataWriterImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, OpenDDS::DCPS::Sample::eval(), OpenDDS::DCPS::DataWriterImpl::ReaderInfo::eval_, OpenDDS::DCPS::DataWriterImpl::ReaderInfo::expression_params_, filter_out(), get_or_create_instance_handle(), get_type_support(), DDS::HANDLE_NIL, LM_NOTICE, OpenDDS::DCPS::log_level, name, OpenDDS::DCPS::LogLevel::Notice, OpenDDS::DCPS::push_back(), reader_info_, reader_info_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::retcode_to_string(), TheServiceParticipant, and write_sample().

Referenced by OpenDDS::XTypes::DynamicDataWriterImpl::write_w_timestamp(), and OpenDDS::DCPS::DataWriterImpl_T< MessageType >::write_w_timestamp().

3174 {
3175  // This operation assumes the provided handle is valid. The handle provided
3176  // will not be verified.
3177 
3178  if (handle == DDS::HANDLE_NIL) {
3179  DDS::InstanceHandle_t registered_handle = DDS::HANDLE_NIL;
3180  const DDS::ReturnCode_t ret =
3181  get_or_create_instance_handle(registered_handle, sample, source_timestamp);
3182  if (ret != DDS::RETCODE_OK) {
3183  if (log_level >= LogLevel::Notice) {
3184  ACE_ERROR((LM_NOTICE, "(%P|%t) NOTICE: %CDataWriterImpl::write_w_timestamp: "
3185  "register failed: %C\n",
3186  get_type_support()->name(),
3187  retcode_to_string(ret)));
3188  }
3189  return ret;
3190  }
3191 
3192  handle = registered_handle;
3193  }
3194 
3195  // list of reader GUID_ts that should not get data
3196  GUIDSeq_var filter_out;
3197 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
3198  if (TheServiceParticipant->publisher_content_filter()) {
3200  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
3201  end = reader_info_.end(); iter != end; ++iter) {
3202  const ReaderInfo& ri = iter->second;
3203  if (!ri.eval_.is_nil()) {
3204  if (!filter_out.ptr()) {
3205  filter_out = new OpenDDS::DCPS::GUIDSeq;
3206  }
3207  if (!sample.eval(*ri.eval_, ri.expression_params_)) {
3208  push_back(filter_out.inout(), iter->first);
3209  }
3210  }
3211  }
3212  }
3213 #endif
3214 
3215  return write_sample(sample, handle, source_timestamp, filter_out._retn());
3216 }
DDS::ReturnCode_t write_sample(const Sample &sample, DDS::InstanceHandle_t handle, const DDS::Time_t &source_timestamp, GUIDSeq *filter_out)
#define ACE_ERROR(X)
const InstanceHandle_t HANDLE_NIL
bool filter_out(const DataSampleElement &elt, const OPENDDS_STRING &filterClassName, const FilterEvaluator &evaluator, const DDS::StringSeq &expression_params) const
RepoIdToReaderInfoMap reader_info_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const char *const name
Definition: debug.cpp:60
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
sequence< GUID_t > GUIDSeq
Definition: DdsDcpsGuid.idl:62
void push_back(Seq &seq, const typename Seq::value_type &val)
std::vector-style push_back() for CORBA Sequences
Definition: Util.h:138
OpenDDS_Dcps_Export LogLevel log_level
const char * retcode_to_string(DDS::ReturnCode_t value)
Definition: DCPS_Utils.cpp:29
const ReturnCode_t RETCODE_ERROR
DDS::ReturnCode_t get_or_create_instance_handle(DDS::InstanceHandle_t &handle, const Sample &sample, const DDS::Time_t &source_timestamp)
TypeSupportImpl * get_type_support() const
const ReturnCode_t RETCODE_OK
ACE_Thread_Mutex reader_info_lock_
#define TheServiceParticipant

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 734 of file DataWriterImpl.h.

◆ PublisherImpl

friend class PublisherImpl
friend

Definition at line 89 of file DataWriterImpl.h.

◆ WriteDataContainer

friend class WriteDataContainer
friend

Definition at line 88 of file DataWriterImpl.h.

Referenced by enable().

Member Data Documentation

◆ association_chunk_multiplier_

size_t OpenDDS::DCPS::DataWriterImpl::association_chunk_multiplier_
protected

The multiplier for allocators affected by associations.

Definition at line 562 of file DataWriterImpl.h.

Referenced by enable().

◆ available_data_list_

SendStateDataSampleList OpenDDS::DCPS::DataWriterImpl::available_data_list_
private

◆ coherent_

bool OpenDDS::DCPS::DataWriterImpl::coherent_
private

Flag indicating DataWriter current belongs to a coherent change set.

Definition at line 768 of file DataWriterImpl.h.

Referenced by begin_coherent_changes(), coherent_changes_pending(), create_sample_data_message(), end_coherent_changes(), and write().

◆ coherent_samples_

ACE_UINT32 OpenDDS::DCPS::DataWriterImpl::coherent_samples_
private

The number of samples belonging to the current coherent change set.

Definition at line 771 of file DataWriterImpl.h.

Referenced by end_coherent_changes(), and write().

◆ controlTracker

MessageTracker OpenDDS::DCPS::DataWriterImpl::controlTracker

◆ data_allocator_

unique_ptr<DataAllocator> OpenDDS::DCPS::DataWriterImpl::data_allocator_
private

Definition at line 811 of file DataWriterImpl.h.

Referenced by serialize_sample(), and setup_serialization().

◆ data_container_

RcHandle<WriteDataContainer> OpenDDS::DCPS::DataWriterImpl::data_container_
private

◆ data_delivered_count_

Atomic<int> OpenDDS::DCPS::DataWriterImpl::data_delivered_count_

Definition at line 427 of file DataWriterImpl.h.

Referenced by data_delivered().

◆ data_dropped_count_

Atomic<int> OpenDDS::DCPS::DataWriterImpl::data_dropped_count_

Statistics counter.

Definition at line 426 of file DataWriterImpl.h.

Referenced by data_dropped().

◆ db_allocator_

unique_ptr<DataBlockAllocator> OpenDDS::DCPS::DataWriterImpl::db_allocator_
private

The data block allocator.

Definition at line 808 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), enable(), and serialize_sample().

◆ db_lock_pool_

unique_ptr<DataBlockLockPool> OpenDDS::DCPS::DataWriterImpl::db_lock_pool_
private

Definition at line 738 of file DataWriterImpl.h.

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::DataWriterImpl::domain_id_
private

The domain id.

Definition at line 756 of file DataWriterImpl.h.

Referenced by enable(), init(), send_liveliness(), set_qos(), and transport_discovery_change().

◆ dp_id_

GUID_t OpenDDS::DCPS::DataWriterImpl::dp_id_
private

Definition at line 757 of file DataWriterImpl.h.

Referenced by enable(), get_dp_id(), set_qos(), and transport_discovery_change().

◆ dynamic_type_

DDS::DynamicType_var OpenDDS::DCPS::DataWriterImpl::dynamic_type_
protected

Definition at line 869 of file DataWriterImpl.h.

Referenced by dispose_w_timestamp(), enable(), and get_or_create_instance_handle().

◆ encoding_mode_

class OpenDDS::DCPS::DataWriterImpl::EncodingMode OpenDDS::DCPS::DataWriterImpl::encoding_mode_
protected

◆ header_allocator_

unique_ptr<DataSampleHeaderAllocator> OpenDDS::DCPS::DataWriterImpl::header_allocator_
private

The header data allocator.

Definition at line 810 of file DataWriterImpl.h.

Referenced by create_sample_data_message(), and enable().

◆ id_to_handle_map_

RepoIdToHandleMap OpenDDS::DCPS::DataWriterImpl::id_to_handle_map_
private

◆ instance_handles_to_values_

InstanceHandlesToValues OpenDDS::DCPS::DataWriterImpl::instance_handles_to_values_
private

Definition at line 858 of file DataWriterImpl.h.

Referenced by get_key_value(), insert_instance(), and instance_must_exist().

◆ instance_values_to_handles_

InstanceValuesToHandles OpenDDS::DCPS::DataWriterImpl::instance_values_to_handles_
private

◆ is_bit_

bool OpenDDS::DCPS::DataWriterImpl::is_bit_
private

Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 826 of file DataWriterImpl.h.

Referenced by add_association(), association_complete_i(), init(), notify_publication_disconnected(), notify_publication_lost(), notify_publication_reconnected(), and remove_associations().

◆ last_deadline_missed_total_count_

CORBA::Long OpenDDS::DCPS::DataWriterImpl::last_deadline_missed_total_count_
private

Total number of offered deadlines missed during last offered deadline status check.

Definition at line 822 of file DataWriterImpl.h.

Referenced by enable(), and get_offered_deadline_missed_status().

◆ last_liveliness_activity_time_

MonotonicTimePoint OpenDDS::DCPS::DataWriterImpl::last_liveliness_activity_time_
private

Timestamp of last write/dispose/assert_liveliness.

Definition at line 819 of file DataWriterImpl.h.

Referenced by handle_timeout(), participant_liveliness_activity_after(), send_liveliness(), and write().

◆ listener_

DDS::DataWriterListener_var OpenDDS::DCPS::DataWriterImpl::listener_
private

Used to notify the entity for relevant events.

Definition at line 754 of file DataWriterImpl.h.

Referenced by get_ext_listener(), get_listener(), listener_for(), and set_listener().

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::DataWriterImpl::listener_mask_
private

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 752 of file DataWriterImpl.h.

Referenced by listener_for(), and set_listener().

◆ listener_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::listener_mutex_
private

Mutex to protect listener info.

Definition at line 749 of file DataWriterImpl.h.

Referenced by get_ext_listener(), get_listener(), listener_for(), and set_listener().

◆ liveliness_asserted_

bool OpenDDS::DCPS::DataWriterImpl::liveliness_asserted_
private

Definition at line 848 of file DataWriterImpl.h.

Referenced by assert_liveliness_by_participant(), and handle_timeout().

◆ liveliness_check_interval_

TimeDuration OpenDDS::DCPS::DataWriterImpl::liveliness_check_interval_
private

The time interval for sending liveliness message.

Definition at line 817 of file DataWriterImpl.h.

Referenced by enable(), handle_timeout(), and liveliness_check_interval().

◆ liveliness_lost_

bool OpenDDS::DCPS::DataWriterImpl::liveliness_lost_
private

True if the writer failed to actively signal its liveliness within its offered liveliness period.

Definition at line 791 of file DataWriterImpl.h.

Referenced by handle_timeout().

◆ liveliness_lost_status_

DDS::LivelinessLostStatus OpenDDS::DCPS::DataWriterImpl::liveliness_lost_status_
private

Status conditions.

Definition at line 784 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), get_liveliness_lost_status(), and handle_timeout().

◆ liveness_timer_

RcHandle<LivenessTimer> OpenDDS::DCPS::DataWriterImpl::liveness_timer_
private

Definition at line 853 of file DataWriterImpl.h.

Referenced by enable(), and handle_timeout().

◆ lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::lock_
mutableprivate

◆ max_suspended_transaction_id_

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::max_suspended_transaction_id_
private

Definition at line 830 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

◆ mb_allocator_

unique_ptr<MessageBlockAllocator> OpenDDS::DCPS::DataWriterImpl::mb_allocator_
private

The message block allocator.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Definition at line 806 of file DataWriterImpl.h.

Referenced by create_control_message(), create_sample_data_message(), enable(), and serialize_sample().

◆ min_suspended_transaction_id_

ACE_UINT64 OpenDDS::DCPS::DataWriterImpl::min_suspended_transaction_id_
private

The cached available data while suspending and associated transaction ids.

Definition at line 829 of file DataWriterImpl.h.

Referenced by send_suspended_data(), and write().

◆ monitor_

unique_ptr<Monitor> OpenDDS::DCPS::DataWriterImpl::monitor_
private

Monitor object for this entity.

Definition at line 834 of file DataWriterImpl.h.

Referenced by association_complete_i(), DataWriterImpl(), enable(), and register_instance_i().

◆ n_chunks_

size_t OpenDDS::DCPS::DataWriterImpl::n_chunks_
protected

The number of chunks for the cached allocator.

Definition at line 559 of file DataWriterImpl.h.

Referenced by enable(), and setup_serialization().

◆ offered_deadline_missed_status_

DDS::OfferedDeadlineMissedStatus OpenDDS::DCPS::DataWriterImpl::offered_deadline_missed_status_
private

Definition at line 785 of file DataWriterImpl.h.

Referenced by DataWriterImpl(), enable(), and get_offered_deadline_missed_status().

◆ offered_incompatible_qos_status_

DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::DataWriterImpl::offered_incompatible_qos_status_
private

◆ participant_permissions_handle_

DDS::Security::PermissionsHandle OpenDDS::DCPS::DataWriterImpl::participant_permissions_handle_
protected

Definition at line 868 of file DataWriterImpl.h.

Referenced by dispose_w_timestamp(), enable(), and get_or_create_instance_handle().

◆ participant_servant_

WeakRcHandle<DomainParticipantImpl> OpenDDS::DCPS::DataWriterImpl::participant_servant_
protected

◆ passed_qos_

DDS::DataWriterQos OpenDDS::DCPS::DataWriterImpl::passed_qos_
protected

The qos policy passed in by the user. Differs from qos_ because representation has been interpreted.

Definition at line 572 of file DataWriterImpl.h.

Referenced by get_qos(), init(), and set_qos().

◆ periodic_monitor_

unique_ptr<Monitor> OpenDDS::DCPS::DataWriterImpl::periodic_monitor_
private

Periodic Monitor object for this entity.

Definition at line 837 of file DataWriterImpl.h.

Referenced by DataWriterImpl().

◆ publication_id_

GUID_t OpenDDS::DCPS::DataWriterImpl::publication_id_
private

◆ publication_match_status_

DDS::PublicationMatchedStatus OpenDDS::DCPS::DataWriterImpl::publication_match_status_
private

◆ publisher_servant_

WeakRcHandle<PublisherImpl> OpenDDS::DCPS::DataWriterImpl::publisher_servant_
private

◆ qos_

DDS::DataWriterQos OpenDDS::DCPS::DataWriterImpl::qos_
protected

◆ reactor_

ACE_Reactor_Timer_Interface* OpenDDS::DCPS::DataWriterImpl::reactor_
private

The orb's reactor to be used to register the liveliness timer.

Definition at line 815 of file DataWriterImpl.h.

Referenced by enable(), handle_timeout(), and init().

◆ reader_info_

RepoIdToReaderInfoMap OpenDDS::DCPS::DataWriterImpl::reader_info_
protected

◆ reader_info_lock_

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::reader_info_lock_
protected

◆ readers_

RepoIdSet OpenDDS::DCPS::DataWriterImpl::readers_
private

◆ security_config_

Security::SecurityConfig_rch OpenDDS::DCPS::DataWriterImpl::security_config_
protected

Definition at line 867 of file DataWriterImpl.h.

Referenced by dispose_w_timestamp(), enable(), and get_or_create_instance_handle().

◆ sequence_number_

SequenceNumber OpenDDS::DCPS::DataWriterImpl::sequence_number_
private

The sequence number unique in DataWriter scope.

Definition at line 763 of file DataWriterImpl.h.

Referenced by create_control_message(), and need_sequence_repair_i().

◆ skip_serialize_

bool OpenDDS::DCPS::DataWriterImpl::skip_serialize_
protected

Definition at line 608 of file DataWriterImpl.h.

Referenced by serialize_sample().

◆ sn_lock_

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::sn_lock_
mutableprivate

Mutex for sequence_number_.

Definition at line 765 of file DataWriterImpl.h.

Referenced by create_control_message(), and create_sample_data_message().

◆ sync_unreg_rem_assocs_lock_

ACE_Thread_Mutex OpenDDS::DCPS::DataWriterImpl::sync_unreg_rem_assocs_lock_
private

Definition at line 852 of file DataWriterImpl.h.

Referenced by remove_associations(), and unregister_instances().

◆ topic_id_

GUID_t OpenDDS::DCPS::DataWriterImpl::topic_id_
private

The associated topic repository id.

Definition at line 743 of file DataWriterImpl.h.

Referenced by init().

◆ topic_name_

CORBA::String_var OpenDDS::DCPS::DataWriterImpl::topic_name_
private

The name of associated topic.

Definition at line 741 of file DataWriterImpl.h.

Referenced by enable(), init(), and retrieve_inline_qos_data().

◆ topic_servant_

TopicDescriptionPtr<TopicImpl> OpenDDS::DCPS::DataWriterImpl::topic_servant_
private

The topic servant.

Definition at line 745 of file DataWriterImpl.h.

Referenced by cleanup(), enable(), get_topic(), and init().

◆ type_name_

CORBA::String_var OpenDDS::DCPS::DataWriterImpl::type_name_
protected

The type name of associated topic.

Definition at line 566 of file DataWriterImpl.h.

Referenced by get_type_name(), and init().

◆ type_support_

TypeSupportImpl* OpenDDS::DCPS::DataWriterImpl::type_support_
private

◆ wait_pending_deadline_

MonotonicTimePoint OpenDDS::DCPS::DataWriterImpl::wait_pending_deadline_
private

Definition at line 855 of file DataWriterImpl.h.

Referenced by set_wait_pending_deadline(), and wait_pending().


The documentation for this class was generated from the following files: