OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Public Attributes | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::ReplayerImpl Class Reference

Implementation of Replayer functionality. More...

#include <ReplayerImpl.h>

Inheritance diagram for OpenDDS::DCPS::ReplayerImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::ReplayerImpl:
Collaboration graph
[legend]

Classes

struct  ReaderInfo
 

Public Member Functions

 ReplayerImpl ()
 
 ~ReplayerImpl ()
 
DDS::ReturnCode_t cleanup ()
 
virtual void init (DDS::Topic_ptr topic, TopicImpl *topic_servant, const DDS::DataWriterQos &qos, ReplayerListener_rch a_listener, const DDS::StatusMask &mask, OpenDDS::DCPS::DomainParticipantImpl *participant_servant, const DDS::PublisherQos &publisher_qos)
 
virtual DDS::ReturnCode_t write (const RawDataSample &sample)
 
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSample &sample)
 
virtual DDS::ReturnCode_t write_to_reader (DDS::InstanceHandle_t subscription, const RawDataSampleList &samples)
 
virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos &publisher_qos, const DDS::DataWriterQos &datawriter_qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &publisher_qos, DDS::DataWriterQos &datawriter_qos)
 
virtual DDS::ReturnCode_t set_listener (const ReplayerListener_rch &a_listener, DDS::StatusMask mask)
 
virtual ReplayerListener_rch get_listener ()
 
virtual bool check_transport_qos (const TransportInst &inst)
 
virtual GUID_t get_guid () const
 
DDS::DomainId_t domain_id () const
 
virtual CORBA::Long get_priority_value (const AssociationData &data) const
 
SequenceNumber get_max_sn () const
 
virtual void data_delivered (const DataSampleElement *sample)
 
virtual void data_dropped (const DataSampleElement *sample, bool dropped_by_transport)
 
virtual void control_delivered (const Message_Block_Ptr &sample)
 
virtual void control_dropped (const Message_Block_Ptr &sample, bool dropped_by_transport)
 
virtual void notify_publication_disconnected (const ReaderIdSeq &subids)
 
virtual void notify_publication_reconnected (const ReaderIdSeq &subids)
 
virtual void notify_publication_lost (const ReaderIdSeq &subids)
 
virtual void retrieve_inline_qos_data (InlineQosData &qos_data) const
 
virtual void add_association (const GUID_t &yourId, const ReaderAssociation &reader, bool active)
 
virtual void remove_associations (const ReaderIdSeq &readers, CORBA::Boolean callback)
 
virtual void replay_durable_data_for (const GUID_t &)
 
virtual void update_incompatible_qos (const IncompatibleQosStatus &status)
 
virtual void update_subscription_params (const GUID_t &readerId, const DDS::StringSeq &exprParams)
 
void remove_all_associations ()
 
virtual void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
virtual void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
virtual DCPS::WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
DDS::ReturnCode_t enable ()
 
DomainParticipantImplparticipant ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
- Public Member Functions inherited from OpenDDS::DCPS::Replayer
virtual ~Replayer ()
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportClient
void use_datalink (const GUID_t &remote_id, const DataLink_rch &link)
 
 TransportClient ()
 
virtual ~TransportClient ()
 
void enable_transport (bool reliable, bool durable)
 
void enable_transport_using_config (bool reliable, bool durable, const TransportConfig_rch &tc)
 
bool swap_bytes () const
 
bool cdr_encapsulation () const
 
const TransportLocatorSeqconnection_info () const
 
void populate_connection_info ()
 
bool is_reliable () const
 
bool associate (const AssociationData &peer, bool active)
 
void disassociate (const GUID_t &peerId)
 
void stop_associating ()
 
void stop_associating (const GUID_t *repos, CORBA::ULong length)
 
void send_final_acks ()
 
void transport_stop ()
 
void register_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)
 
void unregister_for_reader (const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)
 
void register_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid, const TransportLocatorSeq &locators, DiscoveryListener *listener)
 
void unregister_for_writer (const GUID_t &participant, const GUID_t &readerid, const GUID_t &writerid)
 
void update_locators (const GUID_t &remote, const TransportLocatorSeq &locators)
 
WeakRcHandle< ICE::Endpointget_ice_endpoint ()
 
bool send_response (const GUID_t &peer, const DataSampleHeader &header, Message_Block_Ptr payload)
 
void send (SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
 
SendControlStatus send_w_control (SendStateDataSampleList send_list, const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
SendControlStatus send_control (const DataSampleHeader &header, Message_Block_Ptr msg)
 
SendControlStatus send_control_to (const DataSampleHeader &header, Message_Block_Ptr msg, const GUID_t &destination)
 
bool remove_sample (const DataSampleElement *sample)
 
bool remove_all_msgs ()
 
virtual void add_link (const DataLink_rch &link, const GUID_t &peer)
 
virtual RcHandle< BitSubscriberget_builtin_subscriber_proxy () const
 
void terminate_send_if_suspended ()
 
bool associated_with (const GUID_t &remote) const
 
bool pending_association_with (const GUID_t &remote) const
 
GUID_t repo_id () const
 
void data_acked (const GUID_t &remote)
 
bool is_leading (const GUID_t &reader_id) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendListener
virtual ~TransportSendListener ()
 
virtual void data_acked (const GUID_t &)
 
virtual SendControlStatus send_control_customized (const DataLinkSet_rch &links, const DataSampleHeader &header, ACE_Message_Block *msg, void *extra)
 
virtual void transport_discovery_change ()
 
- Public Member Functions inherited from OpenDDS::DCPS::DataWriterCallbacks
 DataWriterCallbacks ()
 
virtual ~DataWriterCallbacks ()
 
virtual void update_locators (const GUID_t &, const TransportLocatorSeq &)
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
virtual RcHandle< EntityImplparent () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 

Public Attributes

int data_dropped_count_
 Statistics counter. More...
 
int data_delivered_count_
 

Private Member Functions

void notify_publication_lost (const DDS::InstanceHandleSeq &handles)
 
DDS::ReturnCode_t write (const RawDataSample *sample_array, int array_size, DDS::InstanceHandle_t *reader)
 
DDS::ReturnCode_t create_sample_data_message (Message_Block_Ptr data, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
 
bool need_sequence_repair () const
 
void lookup_instance_handles (const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
 Lookup the instance handles by the subscription repo ids. More...
 
typedef OPENDDS_MAP_CMP (GUID_t, ReaderInfo, GUID_tKeyLessThan) RepoIdToReaderInfoMap
 
void association_complete_i (const GUID_t &remote_id)
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::InstanceHandle_t, GUID_tKeyLessThan) RepoIdToHandleMap
 
typedef OPENDDS_MAP_CMP (GUID_t, SequenceNumber, GUID_tKeyLessThan) RepoIdToSequenceMap
 

Private Attributes

size_t n_chunks_
 The number of chunks for the cached allocator. More...
 
size_t association_chunk_multiplier_
 The multiplier for allocators affected by associations. More...
 
CORBA::String_var type_name_
 The type name of associated topic. More...
 
DDS::DataWriterQos qos_
 The qos policy list of this datawriter. More...
 
DDS::DataWriterQos passed_qos_
 
DomainParticipantImplparticipant_servant_
 
RepoIdToReaderInfoMap reader_info_
 
CORBA::String_var topic_name_
 The name of associated topic. More...
 
GUID_t topic_id_
 The associated topic repository id. More...
 
DDS::Topic_var topic_objref_
 The object reference of the associated topic. More...
 
TopicDescriptionPtr< TopicImpltopic_servant_
 The topic servant. More...
 
DDS::StatusMask listener_mask_
 
ReplayerListener_rch listener_
 Used to notify the entity for relevant events. More...
 
DDS::DomainId_t domain_id_
 The domain id. More...
 
PublisherImplpublisher_servant_
 The publisher servant which creates this datawriter. More...
 
DDS::PublisherQos publisher_qos_
 
GUID_t publication_id_
 The repository id of this datawriter/publication. More...
 
SequenceNumber sequence_number_
 The sequence number unique in DataWriter scope. More...
 
ACE_Recursive_Thread_Mutex lock_
 The sample data container. More...
 
RepoIdToHandleMap id_to_handle_map_
 
RepoIdSet readers_
 
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
 Status conditions. More...
 
DDS::PublicationMatchedStatus publication_match_status_
 
unique_ptr< MessageBlockAllocatormb_allocator_
 
unique_ptr< DataBlockAllocatordb_allocator_
 
unique_ptr< DataSampleHeaderAllocatorheader_allocator_
 
unique_ptr< DataSampleElementAllocatorsample_list_element_allocator_
 
bool is_bit_
 The time interval for sending liveliness message. More...
 
RepoIdToSequenceMap idToSequence_
 
ConditionVariable< ACE_Recursive_Thread_Mutexempty_condition_
 
int pending_write_count_
 

Friends

class ::DDS_TEST
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::Replayer
typedef Replayer_ptr _ptr_type
 
typedef Replayer_var _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::TransportClient
enum  { ASSOC_OK = 1, ASSOC_ACTIVE = 2 }
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Static Public Member Functions inherited from OpenDDS::DCPS::Replayer
static Replayer_ptr _duplicate (Replayer_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportClient
void cdr_encapsulation (bool encap)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendListener
 TransportSendListener ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Detailed Description

Implementation of Replayer functionality.

This class is the implementation of the Replayer. Inheritance is used to limit the applications access to underlying system methods.

Definition at line 61 of file ReplayerImpl.h.

Constructor & Destructor Documentation

◆ ReplayerImpl()

OpenDDS::DCPS::ReplayerImpl::ReplayerImpl ( )

Definition at line 52 of file ReplayerImpl.cpp.

References DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DDS::HANDLE_NIL, DDS::OfferedIncompatibleQosStatus::last_policy_id, DDS::PublicationMatchedStatus::last_subscription_handle, offered_incompatible_qos_status_, DDS::OfferedIncompatibleQosStatus::policies, publication_match_status_, DDS::OfferedIncompatibleQosStatus::total_count, DDS::PublicationMatchedStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count_change, and DDS::PublicationMatchedStatus::total_count_change.

55  n_chunks_(TheServiceParticipant->n_chunks()),
56  association_chunk_multiplier_(TheServiceParticipant->association_chunk_multiplier()),
57  qos_(TheServiceParticipant->initial_DataWriterQos()),
60  topic_servant_(0),
62  domain_id_(0),
66  // data_container_(0),
67  // liveliness_lost_(false),
68  // last_deadline_missed_total_count_(0),
69  is_bit_(false),
72 {
73  // liveliness_lost_status_.total_count = 0;
74  // liveliness_lost_status_.total_count_change = 0;
75  //
76  // offered_deadline_missed_status_.total_count = 0;
77  // offered_deadline_missed_status_.total_count_change = 0;
78  // offered_deadline_missed_status_.last_instance_handle = DDS::HANDLE_NIL;
79 
84 
90 
91 }
GUID_t topic_id_
The associated topic repository id.
Definition: ReplayerImpl.h:220
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
const InstanceHandle_t HANDLE_NIL
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
const DDS::StatusMask DEFAULT_STATUS_MASK
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
Definition: ReplayerImpl.h:187
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
Status conditions.
Definition: ReplayerImpl.h:257
DDS::StatusMask listener_mask_
Definition: ReplayerImpl.h:228
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
int data_dropped_count_
Statistics counter.
Definition: ReplayerImpl.h:124
DDS::PublicationMatchedStatus publication_match_status_
Definition: ReplayerImpl.h:258
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
size_t n_chunks_
The number of chunks for the cached allocator.
Definition: ReplayerImpl.h:184
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define TheServiceParticipant
PublisherImpl * publisher_servant_
The publisher servant which creates this datawriter.
Definition: ReplayerImpl.h:234
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232

◆ ~ReplayerImpl()

OpenDDS::DCPS::ReplayerImpl::~ReplayerImpl ( )

Definition at line 95 of file ReplayerImpl.cpp.

References DBG_ENTRY_LVL.

96 {
97  DBG_ENTRY_LVL("ReplayerImpl","~ReplayerImpl",6);
98 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ add_association()

void OpenDDS::DCPS::ReplayerImpl::add_association ( const GUID_t yourId,
const ReaderAssociation reader,
bool  active 
)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 401 of file ReplayerImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::TransportClient::associate(), association_complete_i(), OpenDDS::DCPS::LogGuid::c_str(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::AssociationData::discovery_locator_, DDS::DataReaderQos::durability, OpenDDS::DCPS::ReaderAssociation::exprParams, OpenDDS::DCPS::ReaderAssociation::filterExpression, OpenDDS::DCPS::GUID_UNKNOWN, is_bit_, DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, LM_DEBUG, LM_ERROR, lock_, participant_servant_, publication_id_, qos_, reader_info_, OpenDDS::DCPS::ReaderAssociation::readerDiscInfo, OpenDDS::DCPS::ReaderAssociation::readerId, OpenDDS::DCPS::ReaderAssociation::readerQos, OpenDDS::DCPS::ReaderAssociation::readerTransInfo, DDS::DataReaderQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, OpenDDS::DCPS::AssociationData::remote_data_, OpenDDS::DCPS::AssociationData::remote_durable_, OpenDDS::DCPS::AssociationData::remote_id_, OpenDDS::DCPS::AssociationData::remote_reliable_, OpenDDS::DCPS::AssociationData::remote_transport_context_, TheServiceParticipant, DDS::DataWriterQos::transport_priority, OpenDDS::DCPS::ReaderAssociation::transportContext, DDS::TransportPriorityQosPolicy::value, and DDS::VOLATILE_DURABILITY_QOS.

404 {
405  DBG_ENTRY_LVL("ReplayerImpl", "add_association", 6);
406 
407  if (DCPS_debug_level >= 1) {
408  ACE_DEBUG((LM_DEBUG,
409  ACE_TEXT("(%P|%t) ReplayerImpl::add_association - ")
410  ACE_TEXT("bit %d local %C remote %C\n"),
411  is_bit_,
412  LogGuid(yourId).c_str(),
413  LogGuid(reader.readerId).c_str()));
414  }
415 
416  // if (entity_deleted_) {
417  // if (DCPS_debug_level >= 1)
418  // ACE_DEBUG((LM_DEBUG,
419  // ACE_TEXT("(%P|%t) ReplayerImpl::add_association")
420  // ACE_TEXT(" This is a deleted datawriter, ignoring add.\n")));
421  //
422  // return;
423  // }
424 
425  if (GUID_UNKNOWN == publication_id_) {
426  publication_id_ = yourId;
427  }
428 
429  {
431  reader_info_.insert(std::make_pair(reader.readerId,
432  ReaderInfo(TheServiceParticipant->publisher_content_filter() ? reader.filterExpression : "",
433  reader.exprParams, participant_servant_,
434  reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS)));
435  }
436 
437  if (DCPS_debug_level > 4) {
438  ACE_DEBUG((LM_DEBUG,
439  ACE_TEXT("(%P|%t) ReplayerImpl::add_association(): ")
440  ACE_TEXT("adding subscription to publication %C with priority %d.\n"),
441  LogGuid(publication_id_).c_str(),
443  }
444 
445  AssociationData data;
446  data.remote_id_ = reader.readerId;
447  data.remote_data_ = reader.readerTransInfo;
448  data.discovery_locator_ = reader.readerDiscInfo;
449  data.remote_transport_context_ = reader.transportContext;
450  data.remote_reliable_ =
451  (reader.readerQos.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS);
452  data.remote_durable_ =
453  (reader.readerQos.durability.kind > DDS::VOLATILE_DURABILITY_QOS);
454 
455  if (!this->associate(data, active)) {
456  //FUTURE: inform inforepo and try again as passive peer
457  if (DCPS_debug_level) {
458  ACE_ERROR((LM_ERROR,
459  ACE_TEXT("(%P|%t) ReplayerImpl::add_association: ")
460  ACE_TEXT("ERROR: transport layer failed to associate.\n")));
461  }
462  return;
463  }
464 
465  if (active) {
467 
468  association_complete_i(reader.readerId);
469  }
470 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211
void association_complete_i(const GUID_t &remote_id)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
#define TheServiceParticipant
TransportPriorityQosPolicy transport_priority
bool associate(const AssociationData &peer, bool active)

◆ association_complete_i()

void OpenDDS::DCPS::ReplayerImpl::association_complete_i ( const GUID_t remote_id)
private

Definition at line 491 of file ReplayerImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::DomainParticipantImpl::assign_handle(), OpenDDS::DCPS::bind(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, id_to_handle_map_, OpenDDS::DCPS::RcHandle< T >::in(), OpenDDS::DCPS::insert(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, LM_DEBUG, LM_ERROR, LM_WARNING, lock_, OpenDDS::DCPS::ReplayerListener::on_replayer_matched(), participant_servant_, publication_match_status_, readers_, DDS::PublicationMatchedStatus::total_count, and DDS::PublicationMatchedStatus::total_count_change.

Referenced by add_association().

492 {
493  DBG_ENTRY_LVL("ReplayerImpl", "association_complete_i", 6);
494  // bool reader_durable = false;
495  {
497  if (OpenDDS::DCPS::insert(readers_, remote_id) == -1) {
498  ACE_ERROR((LM_ERROR,
499  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
500  ACE_TEXT("insert %C from pending failed.\n"),
501  LogGuid(remote_id).c_str()));
502  }
503  // RepoIdToReaderInfoMap::const_iterator it = reader_info_.find(remote_id);
504  // if (it != reader_info_.end()) {
505  // reader_durable = it->second.durable_;
506  // }
507  }
508 
509  if (!is_bit_) {
510 
511  const DDS::InstanceHandle_t handle = participant_servant_->assign_handle(remote_id);
512 
513  {
514  // protect publication_match_status_ and status changed flags.
516 
517  // update the publication_match_status_
522 
523  if (OpenDDS::DCPS::bind(id_to_handle_map_, remote_id, handle) != 0) {
524  ACE_DEBUG((LM_WARNING,
525  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::association_complete_i: ")
526  ACE_TEXT("id_to_handle_map_%C = 0x%x failed.\n"),
527  LogGuid(remote_id).c_str(),
528  handle));
529  return;
530 
531  } else if (DCPS_debug_level > 4) {
532  ACE_DEBUG((LM_DEBUG,
533  ACE_TEXT("(%P|%t) ReplayerImpl::association_complete_i: ")
534  ACE_TEXT("id_to_handle_map_%C = 0x%x.\n"),
535  LogGuid(remote_id).c_str(),
536  handle));
537  }
538 
540 
541  }
542 
543 
544  if (listener_.in()) {
547 
548  // TBD - why does the spec say to change this but not
549  // change the ChangeFlagStatus after a listener call?
552  }
553 
554  }
555 
556 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::InstanceHandle_t assign_handle(const GUID_t &id=GUID_UNKNOWN)
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
virtual void on_replayer_matched(Replayer *replayer, const DDS::PublicationMatchedStatus &status)
Definition: Replayer.cpp:18
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::PublicationMatchedStatus publication_match_status_
Definition: ReplayerImpl.h:258
ACE_TEXT("TCP_Factory")
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
RepoIdToHandleMap id_to_handle_map_
Definition: ReplayerImpl.h:250
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246

◆ check_transport_qos()

bool OpenDDS::DCPS::ReplayerImpl::check_transport_qos ( const TransportInst inst)
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 763 of file ReplayerImpl.cpp.

764 {
765  // DataWriter does not impose any constraints on which transports
766  // may be used based on QoS.
767  return true;
768 }

◆ cleanup()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::cleanup ( void  )

cleanup the DataWriter.

Definition at line 102 of file ReplayerImpl.cpp.

References ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_TEXT(), domain_id_, empty_condition_, LM_ERROR, lock_, pending_write_count_, publication_id_, remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, TheServiceParticipant, topic_objref_, topic_servant_, and OpenDDS::DCPS::ConditionVariable< Mutex >::wait().

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer(), and OpenDDS::DCPS::DomainParticipantImpl::handle_exception().

103 {
104 
105  // // Unregister all registered instances prior to deletion.
106  // // this->unregister_instances(SystemTimePoint::now().to_dds_time());
107  //
108  // // CORBA::String_var topic_name = this->get_Atopic_name();
109  {
111 
112  // Wait for pending samples to drain prior to removing associations
113  // and unregistering the publication.
114  ThreadStatusManager& thread_status_manager = TheServiceParticipant->get_thread_status_manager();
115  while (this->pending_write_count_) {
116  this->empty_condition_.wait(thread_status_manager);
117  }
118 
119  // Call remove association before unregistering the datawriter
120  // with the transport, otherwise some callbacks resulted from
121  // remove_association may lost.
122  this->remove_all_associations();
123 
124  // release our Topic_var
125  topic_objref_ = DDS::Topic::_nil();
126  topic_servant_ = 0;
127 
128  }
129 
130  // not just unregister but remove any pending writes/sends.
131  // this->unregister_all();
132 
133  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
134  if (!disco->remove_publication(
135  this->domain_id_,
136  this->participant_servant_->get_id(),
137  this->publication_id_)) {
138  ACE_ERROR_RETURN((LM_ERROR,
139  ACE_TEXT("(%P|%t) ERROR: ")
140  ACE_TEXT("PublisherImpl::delete_datawriter, ")
141  ACE_TEXT("publication not removed from discovery.\n")),
143  }
144  return DDS::RETCODE_OK;
145 }
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
const ReturnCode_t RETCODE_OK
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
DDS::Topic_var topic_objref_
The object reference of the associated topic.
Definition: ReplayerImpl.h:222
CvStatus wait(ThreadStatusManager &thread_status_manager)
Block until thread is woken up.
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
ACE_TEXT("TCP_Factory")
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232

◆ control_delivered()

void OpenDDS::DCPS::ReplayerImpl::control_delivered ( const Message_Block_Ptr sample)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 808 of file ReplayerImpl.cpp.

809 {
810  ACE_UNUSED_ARG(sample);
811 }

◆ control_dropped()

void OpenDDS::DCPS::ReplayerImpl::control_dropped ( const Message_Block_Ptr sample,
bool  dropped_by_transport 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 832 of file ReplayerImpl.cpp.

834 {
835  ACE_UNUSED_ARG(sample);
836 }

◆ create_sample_data_message()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::create_sample_data_message ( Message_Block_Ptr  data,
DataSampleHeader header_data,
Message_Block_Ptr message,
const DDS::Time_t source_timestamp,
bool  content_filter 
)
private

Definition at line 952 of file ReplayerImpl.cpp.

References ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC_RETURN, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::DCPS::DataSampleHeader::cdr_encapsulation_, OpenDDS::DCPS::DataSampleHeader::coherent_change_, OpenDDS::DCPS::DataSampleHeader::content_filter_, db_allocator_, DDS::LifespanQosPolicy::duration, DDS::DURATION_INFINITE_NSEC, DDS::DURATION_INFINITE_SEC, OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), header_allocator_, DDS::DataWriterQos::lifespan, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_nanosec_, OpenDDS::DCPS::DataSampleHeader::lifespan_duration_sec_, ACE_Time_Value::max_time, mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, DDS::Duration_t::nanosec, DDS::Time_t::nanosec, need_sequence_repair(), qos_, OpenDDS::DCPS::unique_ptr< T, Deleter >::release(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::SAMPLE_DATA, DDS::Duration_t::sec, DDS::Time_t::sec, OpenDDS::DCPS::DataSampleHeader::sequence_, sequence_number_, OpenDDS::DCPS::DataSampleHeader::sequence_repair_, OpenDDS::DCPS::SequenceNumber::SEQUENCENUMBER_UNKNOWN(), OpenDDS::DCPS::DataSampleHeader::source_timestamp_nanosec_, OpenDDS::DCPS::DataSampleHeader::source_timestamp_sec_, ACE_Message_Block::total_length(), and ACE_Time_Value::zero.

Referenced by write().

957 {
958  header_data.message_id_ = SAMPLE_DATA;
959  header_data.coherent_change_ = content_filter;
960 
961  header_data.content_filter_ = false;
962  header_data.cdr_encapsulation_ = this->cdr_encapsulation();
963  header_data.message_length_ = static_cast<ACE_UINT32>(data->total_length());
964  header_data.sequence_repair_ = need_sequence_repair();
966  this->sequence_number_ = SequenceNumber();
967  } else {
968  ++this->sequence_number_;
969  }
970  header_data.sequence_ = this->sequence_number_;
971  header_data.source_timestamp_sec_ = source_timestamp.sec;
972  header_data.source_timestamp_nanosec_ = source_timestamp.nanosec;
973 
976  header_data.lifespan_duration_ = true;
977  header_data.lifespan_duration_sec_ = qos_.lifespan.duration.sec;
978  header_data.lifespan_duration_nanosec_ = qos_.lifespan.duration.nanosec;
979  }
980 
981  // header_data.publication_id_ = publication_id_;
982  // header_data.publisher_id_ = this->publisher_servant_->publisher_id_;
983  ACE_Message_Block* tmp;
985  static_cast<ACE_Message_Block*>(
986  mb_allocator_->malloc(sizeof(ACE_Message_Block))),
989  data.release(), //cont
990  0, //data
991  header_allocator_.get(), //alloc_strategy
992  0, //locking_strategy
996  db_allocator_.get(),
997  mb_allocator_.get()),
999  message.reset(tmp);
1000  *message << header_data;
1001  return DDS::RETCODE_OK;
1002 }
unsigned long nanosec
unique_ptr< DataSampleHeaderAllocator > header_allocator_
Definition: ReplayerImpl.h:281
static const ACE_Time_Value max_time
const long DURATION_INFINITE_SEC
Definition: DdsDcpsCore.idl:72
const ReturnCode_t RETCODE_OK
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
unique_ptr< DataBlockAllocator > db_allocator_
Definition: ReplayerImpl.h:279
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
const unsigned long DURATION_INFINITE_NSEC
Definition: DdsDcpsCore.idl:73
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: ReplayerImpl.h:277
const ReturnCode_t RETCODE_ERROR
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
LifespanQosPolicy lifespan
unsigned long nanosec
Definition: DdsDcpsCore.idl:69
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
static const ACE_Time_Value zero
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY

◆ data_delivered()

void OpenDDS::DCPS::ReplayerImpl::data_delivered ( const DataSampleElement sample)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 782 of file ReplayerImpl.cpp.

References ACE_DES_FREE, ACE_ERROR, ACE_GUARD, ACE_TEXT(), OpenDDS::DCPS::LogGuid::c_str(), data_delivered_count_, DBG_ENTRY_LVL, empty_condition_, OpenDDS::DCPS::DataSampleElement::get_pub_id(), LM_ERROR, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), pending_write_count_, publication_id_, and sample_list_element_allocator_.

783 {
784  DBG_ENTRY_LVL("ReplayerImpl","data_delivered",6);
785  if (!(sample->get_pub_id() == this->publication_id_)) {
786  ACE_ERROR((LM_ERROR,
787  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::data_delivered: ")
788  ACE_TEXT(" The publication id %C from delivered element ")
789  ACE_TEXT("does not match the datawriter's id %C\n"),
790  LogGuid(sample->get_pub_id()).c_str(),
791  LogGuid(publication_id_).c_str()));
792  return;
793  }
794  DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
795  // this->data_container_->data_delivered(sample);
796  ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
798 
799  {
801  if (--pending_write_count_ == 0) {
803  }
804  }
805 }
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
bool notify_all()
Unblock all of the threads waiting on this condition.
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246

◆ data_dropped()

void OpenDDS::DCPS::ReplayerImpl::data_dropped ( const DataSampleElement sample,
bool  dropped_by_transport 
)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 814 of file ReplayerImpl.cpp.

References ACE_DES_FREE, ACE_GUARD, data_dropped_count_, DBG_ENTRY_LVL, empty_condition_, lock_, OpenDDS::DCPS::ConditionVariable< Mutex >::notify_all(), pending_write_count_, and sample_list_element_allocator_.

816 {
817  DBG_ENTRY_LVL("ReplayerImpl","data_dropped",6);
818  // this->data_container_->data_dropped(element, dropped_by_transport);
819  ACE_UNUSED_ARG(dropped_by_transport);
820  DataSampleElement* elem = const_cast<DataSampleElement*>(sample);
821  ACE_DES_FREE(elem, sample_list_element_allocator_->free, DataSampleElement);
823  {
825  if ((--pending_write_count_) == 0) {
827  }
828  }
829 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ConditionVariable< ACE_Recursive_Thread_Mutex > empty_condition_
Definition: ReplayerImpl.h:313
bool notify_all()
Unblock all of the threads waiting on this condition.
int data_dropped_count_
Statistics counter.
Definition: ReplayerImpl.h:124
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246

◆ domain_id()

DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id ( ) const
inlinevirtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 105 of file ReplayerImpl.h.

105 { return this->domain_id_; }
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::enable ( )

Implements DDS::Entity.

Definition at line 298 of file ReplayerImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), association_chunk_multiplier_, OpenDDS::DCPS::TransportClient::cdr_encapsulation(), OpenDDS::XTypes::TypeInformation::complete, OpenDDS::DCPS::TransportClient::connection_info(), db_allocator_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::XTypes::TypeIdentifierWithDependencies::dependent_typeid_count, domain_id_, DDS::DataWriterQos::durability, OpenDDS::DCPS::TransportClient::enable_transport(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), OpenDDS::DCPS::GUID_UNKNOWN, header_allocator_, OpenDDS::DCPS::EntityImpl::is_enabled(), DDS::DurabilityQosPolicy::kind, DDS::ReliabilityQosPolicy::kind, DDS::LENGTH_UNLIMITED, LM_DEBUG, LM_ERROR, DDS::ResourceLimitsQosPolicy::max_samples, mb_allocator_, OpenDDS::XTypes::TypeInformation::minimal, n_chunks_, participant_servant_, publication_id_, publisher_qos_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::DataWriterQos::reliability, DDS::RELIABLE_RELIABILITY_QOS, DDS::DataWriterQos::representation, DDS::DataWriterQos::resource_limits, DDS::RETCODE_ERROR, DDS::RETCODE_OK, sample_list_element_allocator_, OpenDDS::DCPS::EntityImpl::set_enabled(), OpenDDS::DCPS::set_writer_effective_data_rep_qos(), TheServiceParticipant, topic_servant_, OpenDDS::XTypes::TypeIdentifierWithDependencies::typeid_with_size, OpenDDS::XTypes::TypeIdentifierWithSize::typeobject_serialized_size, DDS::DataRepresentationQosPolicy::value, and DDS::VOLATILE_DURABILITY_QOS.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

299 {
300  //According spec:
301  // - Calling enable on an already enabled Entity returns OK and has no
302  // effect.
303  // - Calling enable on an Entity whose factory is not enabled will fail
304  // and return PRECONDITION_NOT_MET.
305 
306  if (this->is_enabled()) {
307  return DDS::RETCODE_OK;
308  }
309 
310  // if (!this->publisher_servant_->is_enabled()) {
311  // return DDS::RETCODE_PRECONDITION_NOT_MET;
312  // }
313  //
314  const bool reliable = qos_.reliability.kind == DDS::RELIABLE_RELIABILITY_QOS;
315 
318  }
319  // +1 because we might allocate one before releasing another
320  // TBD - see if this +1 can be removed.
324 
326 
327 
328  if (DCPS_debug_level >= 2) {
329  ACE_DEBUG((LM_DEBUG,
330  "(%P|%t) ReplayerImpl::enable-mb"
331  " Cached_Allocator_With_Overflow %x with %d chunks\n",
332  mb_allocator_.get(),
333  n_chunks_));
334 
335  ACE_DEBUG((LM_DEBUG,
336  "(%P|%t) ReplayerImpl::enable-db"
337  " Cached_Allocator_With_Overflow %x with %d chunks\n",
338  db_allocator_.get(),
339  n_chunks_));
340 
341  ACE_DEBUG((LM_DEBUG,
342  "(%P|%t) ReplayerImpl::enable-header"
343  " Cached_Allocator_With_Overflow %x with %d chunks\n",
344  header_allocator_.get(),
345  n_chunks_));
346  }
347 
348  this->set_enabled();
349 
350  try {
351  this->enable_transport(reliable,
353 
354  } catch (const Transport::Exception&) {
355  ACE_ERROR((LM_ERROR,
356  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
357  ACE_TEXT("Transport Exception.\n")));
358  return DDS::RETCODE_ERROR;
359 
360  }
361 
362  const TransportLocatorSeq& trans_conf_info = connection_info();
363 
364 
365  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
366 
368  if (!topic_servant_->check_data_representation(qos_.representation.value, true)) {
369  return DDS::RETCODE_ERROR;
370  }
371 
372  XTypes::TypeInformation type_info;
373  type_info.minimal.typeid_with_size.typeobject_serialized_size = 0;
374  type_info.minimal.dependent_typeid_count = 0;
375  type_info.complete.typeid_with_size.typeobject_serialized_size = 0;
376  type_info.complete.dependent_typeid_count = 0;
377 
378  this->publication_id_ =
379  disco->add_publication(this->domain_id_,
380  this->participant_servant_->get_id(),
381  this->topic_servant_->get_id(),
382  rchandle_from(this),
383  this->qos_,
384  trans_conf_info,
385  this->publisher_qos_,
386  type_info);
387 
388  if (this->publication_id_ == GUID_UNKNOWN) {
389  ACE_ERROR((LM_ERROR,
390  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::enable, ")
391  ACE_TEXT("add_publication returned invalid id.\n")));
392  return DDS::RETCODE_ERROR;
393  }
394 
395  return DDS::RETCODE_OK;
396 }
void enable_transport(bool reliable, bool durable)
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
unique_ptr< DataSampleHeaderAllocator > header_allocator_
Definition: ReplayerImpl.h:281
void set_writer_effective_data_rep_qos(DDS::DataRepresentationIdSeq &qos, bool cdr_encapsulated)
Definition: DCPS_Utils.cpp:508
Cached_Allocator_With_Overflow< DataSampleElement, ACE_Null_Mutex > DataSampleElementAllocator
const ReturnCode_t RETCODE_OK
ReliabilityQosPolicyKind kind
ResourceLimitsQosPolicy resource_limits
sequence< TransportLocator > TransportLocatorSeq
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
size_t association_chunk_multiplier_
The multiplier for allocators affected by associations.
Definition: ReplayerImpl.h:187
const TransportLocatorSeq & connection_info() const
ReliabilityQosPolicy reliability
unique_ptr< DataBlockAllocator > db_allocator_
Definition: ReplayerImpl.h:279
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
const long LENGTH_UNLIMITED
unique_ptr< MessageBlockAllocator > mb_allocator_
Definition: ReplayerImpl.h:277
const ReturnCode_t RETCODE_ERROR
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
DataRepresentationIdSeq value
DurabilityQosPolicyKind kind
Cached_Allocator_With_Overflow< DataSampleHeader, ACE_Null_Mutex > DataSampleHeaderAllocator
DurabilityQosPolicy durability
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
ACE_TEXT("TCP_Factory")
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DataRepresentationQosPolicy representation
Cached_Allocator_With_Overflow< ACE_Message_Block, ACE_Thread_Mutex > MessageBlockAllocator
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
size_t n_chunks_
The number of chunks for the cached allocator.
Definition: ReplayerImpl.h:184
Cached_Allocator_With_Overflow< ACE_Data_Block, ACE_Thread_Mutex > DataBlockAllocator
#define TheServiceParticipant
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232

◆ get_guid()

GUID_t OpenDDS::DCPS::ReplayerImpl::get_guid ( ) const
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 770 of file ReplayerImpl.cpp.

References publication_id_.

771 {
772  return this->publication_id_;
773 }
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238

◆ get_ice_endpoint()

virtual DCPS::WeakRcHandle<ICE::Endpoint> OpenDDS::DCPS::ReplayerImpl::get_ice_endpoint ( )
inlinevirtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 157 of file ReplayerImpl.h.

157 { return DCPS::WeakRcHandle<ICE::Endpoint>(); }

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::ReplayerImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 1045 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::EntityImpl::get_entity_instance_handle(), participant_servant_, publication_id_, and OpenDDS::DCPS::rchandle_from().

1046 {
1048 }
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
DDS::InstanceHandle_t get_entity_instance_handle(const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
Definition: EntityImpl.cpp:142
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310

◆ get_listener()

ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::get_listener ( )
virtual

Get the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 292 of file ReplayerImpl.cpp.

References listener_.

293 {
294  return listener_;
295 }
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230

◆ get_max_sn()

SequenceNumber OpenDDS::DCPS::ReplayerImpl::get_max_sn ( ) const
inlinevirtual

Reimplemented from OpenDDS::DCPS::TransportClient.

Definition at line 107 of file ReplayerImpl.h.

107 { return sequence_number_; }
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240

◆ get_priority_value()

CORBA::Long OpenDDS::DCPS::ReplayerImpl::get_priority_value ( const AssociationData data) const
virtual

Implements OpenDDS::DCPS::TransportClient.

Definition at line 776 of file ReplayerImpl.cpp.

References qos_, DDS::DataWriterQos::transport_priority, and DDS::TransportPriorityQosPolicy::value.

777 {
778  return this->qos_.transport_priority.value;
779 }
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
TransportPriorityQosPolicy transport_priority

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::get_qos ( DDS::PublisherQos publisher_qos,
DDS::DataWriterQos datawriter_qos 
)
virtual

Get the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 275 of file ReplayerImpl.cpp.

References passed_qos_, publisher_qos_, and DDS::RETCODE_OK.

277 {
278  qos = passed_qos_;
279  publisher_qos = publisher_qos_;
280  return DDS::RETCODE_OK;
281 }
const ReturnCode_t RETCODE_OK
DDS::DataWriterQos passed_qos_
Definition: ReplayerImpl.h:196
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235

◆ init()

void OpenDDS::DCPS::ReplayerImpl::init ( DDS::Topic_ptr  topic,
TopicImpl topic_servant,
const DDS::DataWriterQos qos,
ReplayerListener_rch  a_listener,
const DDS::StatusMask mask,
OpenDDS::DCPS::DomainParticipantImpl participant_servant,
const DDS::PublisherQos publisher_qos 
)
virtual

Initialize the data members.

Definition at line 148 of file ReplayerImpl.cpp.

References DBG_ENTRY_LVL, domain_id_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), TAO::String_var< charT >::in(), is_bit_, listener_, listener_mask_, participant_servant_, passed_qos_, publisher_qos_, qos_, topic_id_, topic_name_, topic_objref_, topic_servant_, OpenDDS::DCPS::topicIsBIT(), and type_name_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

156 {
157  DBG_ENTRY_LVL("ReplayerImpl","init",6);
158  topic_objref_ = DDS::Topic::_duplicate(topic);
159  topic_servant_ = topic_servant;
160  topic_name_ = topic_servant_->get_name();
161  topic_id_ = topic_servant_->get_id();
162  type_name_ = topic_servant_->get_type_name();
163 
164 #if !defined (DDS_HAS_MINIMUM_BIT)
166 #endif // !defined (DDS_HAS_MINIMUM_BIT)
167 
168  qos_ = qos;
169  passed_qos_ = qos;
170 
171  //Note: OK to _duplicate(nil).
172  listener_ = a_listener;
173  listener_mask_ = mask;
174 
175  // Only store the participant pointer, since it is our "grand"
176  // parent, we will exist as long as it does.
177  participant_servant_ = participant_servant;
179 
180  publisher_qos_ = publisher_qos;
181 }
GUID_t topic_id_
The associated topic repository id.
Definition: ReplayerImpl.h:220
DDS::DataWriterQos passed_qos_
Definition: ReplayerImpl.h:196
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
DDS::StatusMask listener_mask_
Definition: ReplayerImpl.h:228
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
CORBA::String_var topic_name_
The name of associated topic.
Definition: ReplayerImpl.h:218
DDS::Topic_var topic_objref_
The object reference of the associated topic.
Definition: ReplayerImpl.h:222
bool topicIsBIT(const char *name, const char *type)
TopicDescriptionPtr< TopicImpl > topic_servant_
The topic servant.
Definition: ReplayerImpl.h:224
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
CORBA::String_var type_name_
The type name of associated topic.
Definition: ReplayerImpl.h:190
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
const character_type * in(void) const
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232

◆ lookup_instance_handles()

void OpenDDS::DCPS::ReplayerImpl::lookup_instance_handles ( const ReaderIdSeq ids,
DDS::InstanceHandleSeq hdls 
)
private

Lookup the instance handles by the subscription repo ids.

Definition at line 1005 of file ReplayerImpl.cpp.

References ACE_DEBUG, ACE_TEXT(), OpenDDS::DCPS::LogGuid::conv_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, OpenDDS::DCPS::DomainParticipantImpl::lookup_handle(), OPENDDS_STRING, and participant_servant_.

Referenced by remove_associations().

1007 {
1008  CORBA::ULong const num_rds = ids.length();
1009 
1010  if (DCPS_debug_level > 9) {
1011  OPENDDS_STRING separator;
1012  OPENDDS_STRING buffer;
1013 
1014  for (CORBA::ULong i = 0; i < num_rds; ++i) {
1015  buffer += separator + LogGuid(ids[i]).conv_;
1016  separator = ", ";
1017  }
1018 
1019  ACE_DEBUG((LM_DEBUG,
1020  ACE_TEXT("(%P|%t) ReplayerImpl::lookup_instance_handles: ")
1021  ACE_TEXT("searching for handles for reader Ids: %C.\n"),
1022  buffer.c_str()));
1023  }
1024 
1025  hdls.length(num_rds);
1026 
1027  for (CORBA::ULong i = 0; i < num_rds; ++i) {
1028  hdls[i] = participant_servant_->lookup_handle(ids[i]);
1029  }
1030 }
#define ACE_DEBUG(X)
DDS::InstanceHandle_t lookup_handle(const GUID_t &id) const
#define OPENDDS_STRING
ACE_CDR::ULong ULong
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200

◆ need_sequence_repair()

bool OpenDDS::DCPS::ReplayerImpl::need_sequence_repair ( ) const
private

Definition at line 1033 of file ReplayerImpl.cpp.

References reader_info_, and sequence_number_.

Referenced by create_sample_data_message().

1034 {
1035  for (RepoIdToReaderInfoMap::const_iterator it = reader_info_.begin(),
1036  end = reader_info_.end(); it != end; ++it) {
1037  if (it->second.expected_sequence_ != sequence_number_) {
1038  return true;
1039  }
1040  }
1041  return false;
1042 }
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211

◆ notify_publication_disconnected()

void OpenDDS::DCPS::ReplayerImpl::notify_publication_disconnected ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 839 of file ReplayerImpl.cpp.

840 {
841  ACE_UNUSED_ARG(subids);
842 }

◆ notify_publication_lost() [1/2]

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 851 of file ReplayerImpl.cpp.

Referenced by remove_associations().

852 {
853  ACE_UNUSED_ARG(subids);
854 }

◆ notify_publication_lost() [2/2]

void OpenDDS::DCPS::ReplayerImpl::notify_publication_lost ( const DDS::InstanceHandleSeq handles)
private

Definition at line 857 of file ReplayerImpl.cpp.

858 {
859  ACE_UNUSED_ARG(handles);
860 }

◆ notify_publication_reconnected()

void OpenDDS::DCPS::ReplayerImpl::notify_publication_reconnected ( const ReaderIdSeq subids)
virtual

Implements OpenDDS::DCPS::TransportSendListener.

Definition at line 845 of file ReplayerImpl.cpp.

846 {
847  ACE_UNUSED_ARG(subids);
848 }

◆ OPENDDS_MAP_CMP() [1/3]

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( GUID_t  ,
ReaderInfo  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [2/3]

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::InstanceHandle_t  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [3/3]

typedef OpenDDS::DCPS::ReplayerImpl::OPENDDS_MAP_CMP ( GUID_t  ,
SequenceNumber  ,
GUID_tKeyLessThan   
)
private

◆ participant()

DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant ( )
inline

Definition at line 161 of file ReplayerImpl.h.

References write().

Referenced by OpenDDS::DCPS::Service_Participant::delete_replayer().

161  {
162  return participant_servant_;
163  }
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200

◆ register_for_reader()

void OpenDDS::DCPS::ReplayerImpl::register_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid,
const TransportLocatorSeq locators,
DiscoveryListener listener 
)
virtual

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 721 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportClient::register_for_reader().

726 {
727  TransportClient::register_for_reader(participant, writerid, readerid, locators, listener);
728 }
DomainParticipantImpl * participant()
Definition: ReplayerImpl.h:161
void register_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid, const TransportLocatorSeq &locators, OpenDDS::DCPS::DiscoveryListener *listener)

◆ remove_all_associations()

void OpenDDS::DCPS::ReplayerImpl::remove_all_associations ( )

Definition at line 688 of file ReplayerImpl.cpp.

References ACE_GUARD, lock_, readers_, remove_associations(), OpenDDS::DCPS::TransportClient::stop_associating(), and OpenDDS::DCPS::TransportClient::transport_stop().

Referenced by cleanup().

689 {
690  this->stop_associating();
691 
693  CORBA::ULong size;
694  {
696 
697  size = static_cast<CORBA::ULong>(readers_.size());
698  readers.length(size);
699 
700  RepoIdSet::iterator itEnd = readers_.end();
701  int i = 0;
702 
703  for (RepoIdSet::iterator it = readers_.begin(); it != itEnd; ++it) {
704  readers[i++] = *it;
705  }
706  }
707 
708  try {
709  if (0 < size) {
710  CORBA::Boolean dont_notify_lost = false;
711  this->remove_associations(readers, dont_notify_lost);
712  }
713 
714  } catch (const CORBA::Exception&) {
715  }
716 
717  transport_stop();
718 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
ACE_CDR::ULong ULong
ACE_CDR::Boolean Boolean
virtual void remove_associations(const ReaderIdSeq &readers, CORBA::Boolean callback)
sequence< GUID_t > ReaderIdSeq
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246

◆ remove_associations()

void OpenDDS::DCPS::ReplayerImpl::remove_associations ( const ReaderIdSeq readers,
CORBA::Boolean  callback 
)
virtual

Section 7.1.4.1: total_count will not decrement.

: Reconcile this with the verbiage in section 7.1.4.1 TODO: Should rds_len really be fully_associated_len here??

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 559 of file ReplayerImpl.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), DDS::PublicationMatchedStatus::current_count, DDS::PublicationMatchedStatus::current_count_change, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TransportClient::disassociate(), id_to_handle_map_, idToSequence_, OpenDDS::DCPS::RcHandle< T >::in(), is_bit_, DDS::PublicationMatchedStatus::last_subscription_handle, listener_, LM_DEBUG, lock_, lookup_instance_handles(), notify_publication_lost(), OpenDDS::DCPS::ReplayerListener::on_replayer_matched(), participant_servant_, publication_id_, publication_match_status_, reader_info_, readers_, OpenDDS::DCPS::remove(), OpenDDS::DCPS::DomainParticipantImpl::return_handle(), OpenDDS::DCPS::TransportClient::stop_associating(), and DDS::PublicationMatchedStatus::total_count_change.

Referenced by remove_all_associations().

561 {
562  if (DCPS_debug_level >= 1) {
563  ACE_DEBUG((LM_DEBUG,
564  ACE_TEXT("(%P|%t) ReplayerImpl::remove_associations: ")
565  ACE_TEXT("bit %d local %C remote %C num remotes %d\n"),
566  is_bit_,
567  LogGuid(publication_id_).c_str(),
568  LogGuid(readers[0]).c_str(),
569  readers.length()));
570  }
571 
572  this->stop_associating(readers.get_buffer(), readers.length());
573 
574  ReaderIdSeq fully_associated_readers;
575  CORBA::ULong fully_associated_len = 0;
576  ReaderIdSeq rds;
577  CORBA::ULong rds_len = 0;
578  DDS::InstanceHandleSeq handles;
579 
580  {
581  // Ensure the same acquisition order as in wait_for_acknowledgments().
582  // ACE_GUARD(ACE_SYNCH_MUTEX, wfaGuard, this->wfaLock_);
584 
585  //Remove the readers from fully associated reader list.
586  //If the supplied reader is not in the cached reader list then it is
587  //already removed. We just need remove the readers in the list that have
588  //not been removed.
589 
590  CORBA::ULong len = readers.length();
591 
592  for (CORBA::ULong i = 0; i < len; ++i) {
593  //Remove the readers from fully associated reader list. If it's not
594  //in there, the association_complete() is not called yet and remove it
595  //from pending list.
596 
597  if (OpenDDS::DCPS::remove(readers_, readers[i]) == 0) {
598  ++fully_associated_len;
599  fully_associated_readers.length(fully_associated_len);
600  fully_associated_readers [fully_associated_len - 1] = readers[i];
601 
602  // Remove this reader from the ACK sequence map if its there.
603  // This is where we need to be holding the wfaLock_ obtained
604  // above.
605  RepoIdToSequenceMap::iterator where
606  = this->idToSequence_.find(readers[i]);
607 
608  if (where != this->idToSequence_.end()) {
609  this->idToSequence_.erase(where);
610 
611  // It is possible that this subscription was causing the wait
612  // to continue, so give the opportunity to find out.
613  // this->wfaCondition_.broadcast();
614  }
615 
616  ++rds_len;
617  rds.length(rds_len);
618  rds [rds_len - 1] = readers[i];
619  }
620  reader_info_.erase(readers[i]);
621  //else reader is already removed which indicates remove_association()
622  //is called multiple times.
623  }
624 
625  if (fully_associated_len > 0 && !is_bit_) {
626  // The reader should be in the id_to_handle map at this time
627  this->lookup_instance_handles(fully_associated_readers, handles);
628 
629  for (CORBA::ULong i = 0; i < fully_associated_len; ++i) {
630  id_to_handle_map_.erase(fully_associated_readers[i]);
631  }
632  }
633 
634  // wfaGuard.release();
635 
636  // Mirror the PUBLICATION_MATCHED_STATUS processing from
637  // association_complete() here.
638  if (!this->is_bit_) {
639 
640  // Derive the change in the number of subscriptions reading this writer.
641  int matchedSubscriptions =
642  static_cast<int>(this->id_to_handle_map_.size());
644  matchedSubscriptions - this->publication_match_status_.current_count;
645 
646  // Only process status if the number of subscriptions has changed.
648  this->publication_match_status_.current_count = matchedSubscriptions;
649 
650  /// Section 7.1.4.1: total_count will not decrement.
651 
652  /// @TODO: Reconcile this with the verbiage in section 7.1.4.1
653  /// TODO: Should rds_len really be fully_associated_len here??
655  handles[rds_len - 1];
656 
657 
658  if (listener_.in()) {
660  this,
662 
663  // Listener consumes the change.
666  }
667 
668  }
669  }
670  }
671 
672  for (CORBA::ULong i = 0; i < rds.length(); ++i) {
673  this->disassociate(rds[i]);
674  }
675 
676  // If this remove_association is invoked when the InfoRepo
677  // detects a lost reader then make a callback to notify
678  // subscription lost.
679  if (notify_lost && handles.length() > 0) {
680  this->notify_publication_lost(handles);
681  }
682 
683  for (unsigned int i = 0; i < handles.length(); ++i) {
684  participant_servant_->return_handle(handles[i]);
685  }
686 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
sequence< InstanceHandle_t > InstanceHandleSeq
Definition: DdsDcpsCore.idl:53
int remove(Container &c, const ValueType &v)
Definition: Util.h:121
void return_handle(DDS::InstanceHandle_t handle)
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
void disassociate(const GUID_t &peerId)
virtual void on_replayer_matched(Replayer *replayer, const DDS::PublicationMatchedStatus &status)
Definition: Replayer.cpp:18
RepoIdToSequenceMap idToSequence_
Definition: ReplayerImpl.h:311
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
ACE_CDR::ULong ULong
bool is_bit_
The time interval for sending liveliness message.
Definition: ReplayerImpl.h:306
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
DDS::PublicationMatchedStatus publication_match_status_
Definition: ReplayerImpl.h:258
ACE_TEXT("TCP_Factory")
sequence< GUID_t > ReaderIdSeq
virtual void notify_publication_lost(const ReaderIdSeq &subids)
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
RepoIdToHandleMap id_to_handle_map_
Definition: ReplayerImpl.h:250
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
void lookup_instance_handles(const ReaderIdSeq &ids, DDS::InstanceHandleSeq &hdls)
Lookup the instance handles by the subscription repo ids.

◆ replay_durable_data_for()

virtual void OpenDDS::DCPS::ReplayerImpl::replay_durable_data_for ( const GUID_t )
inlinevirtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 138 of file ReplayerImpl.h.

138 {}

◆ retrieve_inline_qos_data()

void OpenDDS::DCPS::ReplayerImpl::retrieve_inline_qos_data ( TransportSendListener::InlineQosData qos_data) const
virtual

Reimplemented from OpenDDS::DCPS::TransportSendListener.

Definition at line 864 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportSendListener::InlineQosData::dw_qos, TAO::String_var< charT >::in(), OpenDDS::DCPS::TransportSendListener::InlineQosData::pub_qos, publisher_qos_, qos_, OpenDDS::DCPS::TransportSendListener::InlineQosData::topic_name, and topic_name_.

865 {
866  qos_data.pub_qos = this->publisher_qos_;
867  qos_data.dw_qos = this->qos_;
868  qos_data.topic_name = this->topic_name_.in();
869 }
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
CORBA::String_var topic_name_
The name of associated topic.
Definition: ReplayerImpl.h:218
const character_type * in(void) const

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_listener ( const ReplayerListener_rch a_listener,
DDS::StatusMask  mask 
)
virtual

Change the listener for this Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 284 of file ReplayerImpl.cpp.

References listener_, listener_mask_, and DDS::RETCODE_OK.

286 {
287  listener_ = a_listener;
288  listener_mask_ = mask;
289  return DDS::RETCODE_OK;
290 }
const ReturnCode_t RETCODE_OK
ReplayerListener_rch listener_
Used to notify the entity for relevant events.
Definition: ReplayerImpl.h:230
DDS::StatusMask listener_mask_
Definition: ReplayerImpl.h:228

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::set_qos ( const DDS::PublisherQos publisher_qos,
const DDS::DataWriterQos datawriter_qos 
)
virtual

Set the Quality of Service settings for the Replayer.

Implements OpenDDS::DCPS::Replayer.

Definition at line 184 of file ReplayerImpl.cpp.

References ACE_ERROR_RETURN, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::DomainParticipantImpl::get_domain_id(), OpenDDS::DCPS::DomainParticipantImpl::get_id(), LM_ERROR, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, participant_servant_, publication_id_, publisher_qos_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

186 {
187 
189 
190  if (Qos_Helper::valid(publisher_qos) && Qos_Helper::consistent(publisher_qos)) {
191  if (publisher_qos_ == publisher_qos)
192  return DDS::RETCODE_OK;
193 
194  // for the not changeable qos, it can be changed before enable
195  if (!Qos_Helper::changeable(publisher_qos_, publisher_qos) && enabled_) {
197 
198  } else {
199  publisher_qos_ = publisher_qos;
200  }
201  } else {
203  }
204 
210 
211  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
212  if (qos_ == qos)
213  return DDS::RETCODE_OK;
214 
215  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
217 
218  } else {
219  Discovery_rch disco = TheServiceParticipant->get_discovery(domain_id_);
220  // DDS::PublisherQos publisherQos;
221  // this->publisher_servant_->get_qos(publisherQos);
222  DDS::PublisherQos publisherQos = this->publisher_qos_;
223  const bool status
224  = disco->update_publication_qos(this->participant_servant_->get_domain_id(),
225  this->participant_servant_->get_id(),
226  this->publication_id_,
227  qos,
228  publisherQos);
229 
230  if (!status) {
231  ACE_ERROR_RETURN((LM_ERROR,
232  ACE_TEXT("(%P|%t) DataWriterImpl::set_qos, ")
233  ACE_TEXT("qos not updated.\n")),
235  }
236  }
237 
238  if (!(qos_ == qos)) {
239  // Reset the deadline timer if the period has changed.
240  // if (qos_.deadline.period.sec != qos.deadline.period.sec
241  // || qos_.deadline.period.nanosec != qos.deadline.period.nanosec) {
242  // if (qos_.deadline.period.sec == DDS::DURATION_INFINITE_SEC
243  // && qos_.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
244  // ACE_auto_ptr_reset(this->watchdog_,
245  // new OfferedDeadlineWatchdog(
246  // this->reactor_,
247  // this->lock_,
248  // qos.deadline,
249  // this,
250  // this,
251  // this->offered_deadline_missed_status_,
252  // this->last_deadline_missed_total_count_));
253  //
254  // } else if (qos.deadline.period.sec == DDS::DURATION_INFINITE_SEC
255  // && qos.deadline.period.nanosec == DDS::DURATION_INFINITE_NSEC) {
256  // this->watchdog_->cancel_all();
257  // this->watchdog_.reset();
258  //
259  // } else {
260  // this->watchdog_->reset_interval(
261  // duration_to_time_value(qos.deadline.period));
262  // }
263  // }
264 
265  qos_ = qos;
266  }
267 
268  return DDS::RETCODE_OK;
269 
270  } else {
272  }
273 }
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
DDS::PublisherQos publisher_qos_
Definition: ReplayerImpl.h:235
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
const ReturnCode_t RETCODE_ERROR
DDS::DataWriterQos qos_
The qos policy list of this datawriter.
Definition: ReplayerImpl.h:193
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
ACE_TEXT("TCP_Factory")
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define ACE_ERROR_RETURN(X, Y)
const ReturnCode_t RETCODE_UNSUPPORTED
#define TheServiceParticipant
DDS::DomainId_t domain_id_
The domain id.
Definition: ReplayerImpl.h:232
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)

◆ unregister_for_reader()

void OpenDDS::DCPS::ReplayerImpl::unregister_for_reader ( const GUID_t participant,
const GUID_t writerid,
const GUID_t readerid 
)
virtual

Reimplemented from OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 731 of file ReplayerImpl.cpp.

References OpenDDS::DCPS::TransportClient::unregister_for_reader().

734 {
736 }
DomainParticipantImpl * participant()
Definition: ReplayerImpl.h:161
void unregister_for_reader(const GUID_t &participant, const GUID_t &writerid, const GUID_t &readerid)

◆ update_incompatible_qos()

void OpenDDS::DCPS::ReplayerImpl::update_incompatible_qos ( const IncompatibleQosStatus status)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 739 of file ReplayerImpl.cpp.

References ACE_GUARD, OpenDDS::DCPS::IncompatibleQosStatus::count_since_last_send, OpenDDS::DCPS::IncompatibleQosStatus::last_policy_id, DDS::OfferedIncompatibleQosStatus::last_policy_id, lock_, offered_incompatible_qos_status_, OpenDDS::DCPS::IncompatibleQosStatus::policies, DDS::OfferedIncompatibleQosStatus::policies, OpenDDS::DCPS::IncompatibleQosStatus::total_count, DDS::OfferedIncompatibleQosStatus::total_count, and DDS::OfferedIncompatibleQosStatus::total_count_change.

740 {
741 
742 
744 
745  // copy status and increment change
746  offered_incompatible_qos_status_.total_count = status.total_count;
748  status.count_since_last_send;
749  offered_incompatible_qos_status_.last_policy_id = status.last_policy_id;
750  offered_incompatible_qos_status_.policies = status.policies;
751 
752 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
DDS::OfferedIncompatibleQosStatus offered_incompatible_qos_status_
Status conditions.
Definition: ReplayerImpl.h:257
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246

◆ update_subscription_params()

void OpenDDS::DCPS::ReplayerImpl::update_subscription_params ( const GUID_t readerId,
const DDS::StringSeq exprParams 
)
virtual

Implements OpenDDS::DCPS::DataWriterCallbacks.

Definition at line 755 of file ReplayerImpl.cpp.

757 {
758  ACE_UNUSED_ARG(readerId);
759  ACE_UNUSED_ARG(params);
760 }

◆ write() [1/2]

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample)
virtual

Send the sample to all associated DataReaders.

Note
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 946 of file ReplayerImpl.cpp.

Referenced by write_to_reader().

947 {
948  return this->write(&sample, 1, 0);
949 }
virtual DDS::ReturnCode_t write(const RawDataSample &sample)

◆ write() [2/2]

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write ( const RawDataSample sample_array,
int  array_size,
DDS::InstanceHandle_t reader 
)
private

Definition at line 872 of file ReplayerImpl.cpp.

References ACE_DES_FREE, ACE_ERROR_RETURN, ACE_GUARD_RETURN, ACE_NEW_MALLOC_RETURN, ACE_TEXT(), OpenDDS::DCPS::DataSampleHeader::byte_order_, create_sample_data_message(), DBG_ENTRY_LVL, OpenDDS::DCPS::SendStateDataSampleList::dequeue(), OpenDDS::DCPS::SendStateDataSampleList::enqueue_tail(), OpenDDS::DCPS::DataSampleElement::get_header(), OpenDDS::DCPS::DomainParticipantImpl::get_repoid(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, lock_, OpenDDS::DCPS::move(), participant_servant_, pending_write_count_, OpenDDS::DCPS::DataSampleHeader::publication_id_, publication_id_, reader_info_, OpenDDS::DCPS::TransportClient::repo_id(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::RawDataSample::sample_byte_order_, sample_list_element_allocator_, OpenDDS::DCPS::TransportClient::send(), sequence_number_, OpenDDS::DCPS::DataSampleElement::set_num_subs(), OpenDDS::DCPS::DataSampleElement::set_sample(), OpenDDS::DCPS::DataSampleElement::set_sub_id(), and OpenDDS::DCPS::RawDataSample::source_timestamp_.

875 {
876  DBG_ENTRY_LVL("ReplayerImpl","write",6);
877 
879  if (reader_ih_ptr) {
880  repo_id = this->participant_servant_->get_repoid(*reader_ih_ptr);
881  if (repo_id == GUID_UNKNOWN) {
882  ACE_ERROR_RETURN((LM_ERROR,
883  ACE_TEXT("(%P|%t) ERROR: ReplayerImpl::write: ")
884  ACE_TEXT("Invalid reader instance handle (%d)\n"), *reader_ih_ptr),
886  }
887  }
888 
889  SendStateDataSampleList list;
890 
891  for (int i = 0; i < num_samples; ++i) {
892  DataSampleElement* element = 0;
893 
895  element,
896  static_cast<DataSampleElement*>(
898  sizeof(DataSampleElement))),
899  DataSampleElement(publication_id_,
900  this,
903 
904  element->get_header().byte_order_ = samples[i].sample_byte_order_;
905  element->get_header().publication_id_ = this->publication_id_;
906  list.enqueue_tail(element);
907  Message_Block_Ptr temp;
908  Message_Block_Ptr sample(samples[i].sample_->duplicate());
910  element->get_header(),
911  temp,
912  samples[i].source_timestamp_,
913  false);
914  element->set_sample(move(temp));
915  if (reader_ih_ptr) {
916  element->set_num_subs(1);
917  element->set_sub_id(0, repo_id);
918  }
919 
920  if (ret != DDS::RETCODE_OK) {
921  // we need to free the list
922  while (list.dequeue(element)) {
923  ACE_DES_FREE(element, sample_list_element_allocator_->free, DataSampleElement);
924  }
925 
926  return ret;
927  }
928  }
929 
930  {
933  }
934 
935  this->send(list);
936 
937  for (RepoIdToReaderInfoMap::iterator iter = reader_info_.begin(),
938  end = reader_info_.end(); iter != end; ++iter) {
939  iter->second.expected_sequence_ = sequence_number_;
940  }
941 
942  return DDS::RETCODE_OK;
943 }
GUID_t get_repoid(DDS::InstanceHandle_t id) const
const ReturnCode_t RETCODE_OK
SequenceNumber sequence_number_
The sequence number unique in DataWriter scope.
Definition: ReplayerImpl.h:240
GUID_t publication_id_
The repository id of this datawriter/publication.
Definition: ReplayerImpl.h:238
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
T::rv_reference move(T &p)
Definition: unique_ptr.h:141
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
RepoIdToReaderInfoMap reader_info_
Definition: ReplayerImpl.h:211
ACE_TEXT("TCP_Factory")
DomainParticipantImpl * participant_servant_
Definition: ReplayerImpl.h:200
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
DDS::ReturnCode_t create_sample_data_message(Message_Block_Ptr data, DataSampleHeader &header_data, Message_Block_Ptr &message, const DDS::Time_t &source_timestamp, bool content_filter)
void send(SendStateDataSampleList send_list, ACE_UINT64 transaction_id=0)
unique_ptr< DataSampleElementAllocator > sample_list_element_allocator_
Definition: ReplayerImpl.h:285
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
ACE_Recursive_Thread_Mutex lock_
The sample data container.
Definition: ReplayerImpl.h:246
#define ACE_ERROR_RETURN(X, Y)
RcHandle< PublicationInstance > PublicationInstance_rch

◆ write_to_reader() [1/2]

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSample sample 
)
virtual

Send the sample to the specified DataReader.

Note
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1051 of file ReplayerImpl.cpp.

References write().

1053 {
1054  return write(&sample, 1, &subscription);
1055 }
virtual DDS::ReturnCode_t write(const RawDataSample &sample)

◆ write_to_reader() [2/2]

DDS::ReturnCode_t OpenDDS::DCPS::ReplayerImpl::write_to_reader ( DDS::InstanceHandle_t  subscription,
const RawDataSampleList &  samples 
)
virtual

Send the samples to the specified DataReader.

Note
Only samples of type SAMPLE_DATA should be sent.

Implements OpenDDS::DCPS::Replayer.

Definition at line 1058 of file ReplayerImpl.cpp.

References OPENDDS_END_VERSIONED_NAMESPACE_DECL, DDS::RETCODE_ERROR, and write().

1060 {
1061  if (!samples.empty())
1062  return write(&samples[0], static_cast<int>(samples.size()), &subscription);
1063  return DDS::RETCODE_ERROR;
1064 }
const ReturnCode_t RETCODE_ERROR
virtual DDS::ReturnCode_t write(const RawDataSample &sample)

Friends And Related Function Documentation

◆ ::DDS_TEST

friend class ::DDS_TEST
friend

Definition at line 215 of file ReplayerImpl.h.

Member Data Documentation

◆ association_chunk_multiplier_

size_t OpenDDS::DCPS::ReplayerImpl::association_chunk_multiplier_
private

The multiplier for allocators affected by associations.

Definition at line 187 of file ReplayerImpl.h.

Referenced by enable().

◆ data_delivered_count_

int OpenDDS::DCPS::ReplayerImpl::data_delivered_count_

Definition at line 125 of file ReplayerImpl.h.

Referenced by data_delivered().

◆ data_dropped_count_

int OpenDDS::DCPS::ReplayerImpl::data_dropped_count_

Statistics counter.

Definition at line 124 of file ReplayerImpl.h.

Referenced by data_dropped().

◆ db_allocator_

unique_ptr<DataBlockAllocator> OpenDDS::DCPS::ReplayerImpl::db_allocator_
private

Definition at line 279 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::ReplayerImpl::domain_id_
private

The domain id.

Definition at line 232 of file ReplayerImpl.h.

Referenced by cleanup(), enable(), init(), and set_qos().

◆ empty_condition_

ConditionVariable<ACE_Recursive_Thread_Mutex> OpenDDS::DCPS::ReplayerImpl::empty_condition_
private

Definition at line 313 of file ReplayerImpl.h.

Referenced by cleanup(), data_delivered(), and data_dropped().

◆ header_allocator_

unique_ptr<DataSampleHeaderAllocator> OpenDDS::DCPS::ReplayerImpl::header_allocator_
private

Definition at line 281 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

◆ id_to_handle_map_

RepoIdToHandleMap OpenDDS::DCPS::ReplayerImpl::id_to_handle_map_
private

Definition at line 250 of file ReplayerImpl.h.

Referenced by association_complete_i(), and remove_associations().

◆ idToSequence_

RepoIdToSequenceMap OpenDDS::DCPS::ReplayerImpl::idToSequence_
private

Definition at line 311 of file ReplayerImpl.h.

Referenced by remove_associations().

◆ is_bit_

bool OpenDDS::DCPS::ReplayerImpl::is_bit_
private

The time interval for sending liveliness message.

The orb's reactor to be used to register the liveliness timer.Timestamp of last write/dispose/assert_liveliness. Total number of offered deadlines missed during last offered deadline status check. Watchdog responsible for reporting missed offered deadlines. The flag indicates whether the liveliness timer is scheduled and needs be cancelled. Flag indicates that this datawriter is a builtin topic datawriter.

Definition at line 306 of file ReplayerImpl.h.

Referenced by add_association(), association_complete_i(), init(), and remove_associations().

◆ listener_

ReplayerListener_rch OpenDDS::DCPS::ReplayerImpl::listener_
private

Used to notify the entity for relevant events.

Definition at line 230 of file ReplayerImpl.h.

Referenced by association_complete_i(), get_listener(), init(), remove_associations(), and set_listener().

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::ReplayerImpl::listener_mask_
private

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 228 of file ReplayerImpl.h.

Referenced by init(), and set_listener().

◆ lock_

ACE_Recursive_Thread_Mutex OpenDDS::DCPS::ReplayerImpl::lock_
private

The sample data container.

The lock to protect the activate subscriptions and status changes.

Definition at line 246 of file ReplayerImpl.h.

Referenced by add_association(), association_complete_i(), cleanup(), data_delivered(), data_dropped(), remove_all_associations(), remove_associations(), update_incompatible_qos(), and write().

◆ mb_allocator_

unique_ptr<MessageBlockAllocator> OpenDDS::DCPS::ReplayerImpl::mb_allocator_
private

True if the writer failed to actively signal its liveliness within its offered liveliness period.

Todo:
The publication_lost_status_ and publication_reconnecting_status_ are left here for future use when we add get_publication_lost_status() and get_publication_reconnecting_status() methods.

Definition at line 277 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), and enable().

◆ n_chunks_

size_t OpenDDS::DCPS::ReplayerImpl::n_chunks_
private

The number of chunks for the cached allocator.

Definition at line 184 of file ReplayerImpl.h.

Referenced by enable().

◆ offered_incompatible_qos_status_

DDS::OfferedIncompatibleQosStatus OpenDDS::DCPS::ReplayerImpl::offered_incompatible_qos_status_
private

Status conditions.

Definition at line 257 of file ReplayerImpl.h.

Referenced by ReplayerImpl(), and update_incompatible_qos().

◆ participant_servant_

DomainParticipantImpl* OpenDDS::DCPS::ReplayerImpl::participant_servant_
private

The participant servant which creats the publisher that creates this datawriter.

Definition at line 200 of file ReplayerImpl.h.

Referenced by add_association(), association_complete_i(), enable(), get_instance_handle(), init(), lookup_instance_handles(), remove_associations(), set_qos(), and write().

◆ passed_qos_

DDS::DataWriterQos OpenDDS::DCPS::ReplayerImpl::passed_qos_
private

The qos policy passed in by the user. Differs from qos_ because representation has been interpreted.

Definition at line 196 of file ReplayerImpl.h.

Referenced by get_qos(), and init().

◆ pending_write_count_

int OpenDDS::DCPS::ReplayerImpl::pending_write_count_
private

Definition at line 314 of file ReplayerImpl.h.

Referenced by cleanup(), data_delivered(), data_dropped(), and write().

◆ publication_id_

GUID_t OpenDDS::DCPS::ReplayerImpl::publication_id_
private

The repository id of this datawriter/publication.

Definition at line 238 of file ReplayerImpl.h.

Referenced by add_association(), cleanup(), data_delivered(), enable(), get_guid(), get_instance_handle(), remove_associations(), set_qos(), and write().

◆ publication_match_status_

DDS::PublicationMatchedStatus OpenDDS::DCPS::ReplayerImpl::publication_match_status_
private

Definition at line 258 of file ReplayerImpl.h.

Referenced by association_complete_i(), remove_associations(), and ReplayerImpl().

◆ publisher_qos_

DDS::PublisherQos OpenDDS::DCPS::ReplayerImpl::publisher_qos_
private

Definition at line 235 of file ReplayerImpl.h.

Referenced by enable(), get_qos(), init(), retrieve_inline_qos_data(), and set_qos().

◆ publisher_servant_

PublisherImpl* OpenDDS::DCPS::ReplayerImpl::publisher_servant_
private

The publisher servant which creates this datawriter.

Definition at line 234 of file ReplayerImpl.h.

◆ qos_

DDS::DataWriterQos OpenDDS::DCPS::ReplayerImpl::qos_
private

The qos policy list of this datawriter.

Definition at line 193 of file ReplayerImpl.h.

Referenced by add_association(), create_sample_data_message(), enable(), get_priority_value(), init(), retrieve_inline_qos_data(), and set_qos().

◆ reader_info_

RepoIdToReaderInfoMap OpenDDS::DCPS::ReplayerImpl::reader_info_
private

Definition at line 211 of file ReplayerImpl.h.

Referenced by add_association(), need_sequence_repair(), remove_associations(), and write().

◆ readers_

RepoIdSet OpenDDS::DCPS::ReplayerImpl::readers_
private

◆ sample_list_element_allocator_

unique_ptr<DataSampleElementAllocator> OpenDDS::DCPS::ReplayerImpl::sample_list_element_allocator_
private

The cached allocator to allocate DataSampleElement objects.

Definition at line 285 of file ReplayerImpl.h.

Referenced by data_delivered(), data_dropped(), enable(), and write().

◆ sequence_number_

SequenceNumber OpenDDS::DCPS::ReplayerImpl::sequence_number_
private

The sequence number unique in DataWriter scope.

Definition at line 240 of file ReplayerImpl.h.

Referenced by create_sample_data_message(), need_sequence_repair(), and write().

◆ topic_id_

GUID_t OpenDDS::DCPS::ReplayerImpl::topic_id_
private

The associated topic repository id.

Definition at line 220 of file ReplayerImpl.h.

Referenced by init().

◆ topic_name_

CORBA::String_var OpenDDS::DCPS::ReplayerImpl::topic_name_
private

The name of associated topic.

Definition at line 218 of file ReplayerImpl.h.

Referenced by init(), and retrieve_inline_qos_data().

◆ topic_objref_

DDS::Topic_var OpenDDS::DCPS::ReplayerImpl::topic_objref_
private

The object reference of the associated topic.

Definition at line 222 of file ReplayerImpl.h.

Referenced by cleanup(), and init().

◆ topic_servant_

TopicDescriptionPtr<TopicImpl> OpenDDS::DCPS::ReplayerImpl::topic_servant_
private

The topic servant.

Definition at line 224 of file ReplayerImpl.h.

Referenced by cleanup(), enable(), and init().

◆ type_name_

CORBA::String_var OpenDDS::DCPS::ReplayerImpl::type_name_
private

The type name of associated topic.

Definition at line 190 of file ReplayerImpl.h.

Referenced by init().


The documentation for this class was generated from the following files: