OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Static Public Member Functions | Private Types | Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::PublisherImpl Class Reference

Implements the OpenDDS::DCPS::Publisher interfaces. More...

#include <PublisherImpl.h>

Inheritance diagram for OpenDDS::DCPS::PublisherImpl:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::PublisherImpl:
Collaboration graph
[legend]

Public Member Functions

 PublisherImpl (DDS::InstanceHandle_t handle, GUID_t id, const DDS::PublisherQos &qos, DDS::PublisherListener_ptr a_listener, const DDS::StatusMask &mask, DomainParticipantImpl *participant)
 
virtual ~PublisherImpl ()
 
virtual DDS::InstanceHandle_t get_instance_handle ()
 
bool contains_writer (DDS::InstanceHandle_t a_handle)
 
virtual DDS::DataWriter_ptr create_datawriter (DDS::Topic_ptr a_topic, const DDS::DataWriterQos &qos, DDS::DataWriterListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::ReturnCode_t delete_datawriter (DDS::DataWriter_ptr a_datawriter)
 
virtual DDS::DataWriter_ptr lookup_datawriter (const char *topic_name)
 
virtual DDS::ReturnCode_t delete_contained_entities ()
 
virtual DDS::ReturnCode_t set_qos (const DDS::PublisherQos &qos)
 
virtual DDS::ReturnCode_t get_qos (DDS::PublisherQos &qos)
 
virtual DDS::ReturnCode_t set_listener (DDS::PublisherListener_ptr a_listener, DDS::StatusMask mask)
 
virtual DDS::PublisherListener_ptr get_listener ()
 
virtual DDS::ReturnCode_t suspend_publications ()
 
virtual DDS::ReturnCode_t resume_publications ()
 
virtual DDS::ReturnCode_t begin_coherent_changes ()
 
virtual DDS::ReturnCode_t end_coherent_changes ()
 
virtual DDS::ReturnCode_t wait_for_acknowledgments (const DDS::Duration_t &max_wait)
 
virtual DDS::DomainParticipant_ptr get_participant ()
 
virtual DDS::ReturnCode_t set_default_datawriter_qos (const DDS::DataWriterQos &qos)
 
virtual DDS::ReturnCode_t get_default_datawriter_qos (DDS::DataWriterQos &qos)
 
virtual DDS::ReturnCode_t copy_from_topic_qos (DDS::DataWriterQos &a_datawriter_qos, const DDS::TopicQos &a_topic_qos)
 
virtual DDS::ReturnCode_t enable ()
 
ACE_Recursive_Thread_Mutexget_pi_lock ()
 
bool is_clean (String *leftover_entities=0) const
 
DDS::ReturnCode_t writer_enabled (const char *topic_name, DataWriterImpl *impl)
 
DDS::PublisherListener_ptr listener_for (::DDS::StatusKind kind)
 
DDS::ReturnCode_t assert_liveliness_by_participant ()
 
TimeDuration liveliness_check_interval (DDS::LivelinessQosPolicyKind kind)
 
bool participant_liveliness_activity_after (const MonotonicTimePoint &tv)
 
typedef OPENDDS_VECTOR (GUID_t) PublicationIdVec
 
void get_publication_ids (PublicationIdVec &pubs)
 
bool is_suspended () const
 
virtual RcHandle< EntityImplparent () const
 
bool prepare_to_delete_datawriters ()
 
bool set_wait_pending_deadline (const MonotonicTimePoint &deadline)
 
- Public Member Functions inherited from DDS::Publisher
DataWriter create_datawriter (in Topic a_topic, in DataWriterQos qos, in DataWriterListener a_listener, in StatusMask mask)
 
ReturnCode_t delete_datawriter (in DataWriter a_datawriter)
 
DataWriter lookup_datawriter (in string topic_name)
 
ReturnCode_t set_qos (in PublisherQos qos)
 
ReturnCode_t get_qos (inout PublisherQos qos)
 
ReturnCode_t set_listener (in PublisherListener a_listener, in StatusMask mask)
 
ReturnCode_t wait_for_acknowledgments (in Duration_t max_wait)
 
ReturnCode_t set_default_datawriter_qos (in DataWriterQos qos)
 
ReturnCode_t get_default_datawriter_qos (inout DataWriterQos qos)
 
ReturnCode_t copy_from_topic_qos (inout DataWriterQos a_datawriter_qos, in TopicQos a_topic_qos)
 
- Public Member Functions inherited from OpenDDS::DCPS::LocalObjectBase
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
virtual CORBA::ULong _refcount_value () const
 
- Public Member Functions inherited from CORBA::LocalObject
virtual ~LocalObject (void)
 
virtual CORBA::Boolean _non_existent (void)
 
virtual char * _repository_id (void)
 
virtual CORBA::InterfaceDef_ptr _get_interface (void)
 
virtual CORBA::Object_ptr _get_component (void)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual void _create_request (CORBA::Context_ptr ctx, const char *operation, CORBA::NVList_ptr arg_list, CORBA::NamedValue_ptr result, CORBA::ExceptionList_ptr exclist, CORBA::ContextList_ptr ctxtlist, CORBA::Request_ptr &request, CORBA::Flags req_flags)
 
virtual CORBA::Request_ptr _request (const char *operation)
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
virtual CORBA::ULong _hash (CORBA::ULong maximum)
 
virtual CORBA::Boolean _is_equivalent (CORBA::Object_ptr other_obj)
 
virtual CORBA::ORB_ptr _get_orb (void)
 
virtual TAO::ObjectKey_key (void)
 
- Public Member Functions inherited from CORBA::Object
virtual ~Object (void)
 
virtual TAO_Abstract_ServantBase_servant (void) const
 
virtual CORBA::Boolean _is_collocated (void) const
 
virtual CORBA::Boolean _is_local (void) const
 
 Object (TAO_Stub *p, CORBA::Boolean collocated=false, TAO_Abstract_ServantBase *servant=0, TAO_ORB_Core *orb_core=0)
 
 Object (IOP::IOR *ior, TAO_ORB_Core *orb_core)
 
virtual TAO_Stub_stubobj (void) const
 
virtual TAO_Stub_stubobj (void)
 
virtual void _proxy_broker (TAO::Object_Proxy_Broker *proxy_broker)
 
virtual CORBA::Boolean marshal (TAO_OutputCDR &cdr)
 
CORBA::Boolean is_evaluated (void) const
 
TAO_ORB_Coreorb_core (void) const
 
IOP::IORsteal_ior (void)
 
const IOP::IORior (void) const
 
virtual bool can_convert_to_ior (void) const
 
virtual char * convert_to_ior (bool use_omg_ior_format, const char *ior_prefix) const
 
void _decr_refcount (void)
 
virtual CORBA::Boolean _is_a (const char *logical_type_id)
 
virtual const char * _interface_repository_id (void) const
 
CORBA::Policy_ptr _get_policy (CORBA::PolicyType type)
 
CORBA::Policy_ptr _get_cached_policy (TAO_Cached_Policy_Type type)
 
CORBA::Object_ptr _set_policy_overrides (const CORBA::PolicyList &policies, CORBA::SetOverrideType set_add)
 
CORBA::PolicyList_get_policy_overrides (const CORBA::PolicyTypeSeq &types)
 
CORBA::Boolean _validate_connection (CORBA::PolicyList_out inconsistent_policies)
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 
- Public Member Functions inherited from OpenDDS::DCPS::EntityImpl
 EntityImpl ()
 
virtual ~EntityImpl ()
 
bool is_enabled () const
 
virtual DDS::StatusCondition_ptr get_statuscondition ()
 
virtual DDS::StatusMask get_status_changes ()
 
virtual DDS::DomainId_t get_domain_id ()
 
virtual GUID_t get_id () const
 
void set_status_changed_flag (DDS::StatusKind status, bool status_changed_flag)
 
void notify_status_condition ()
 
virtual void transport_config (const TransportConfig_rch &cfg)
 
TransportConfig_rch transport_config () const
 
void set_observer (Observer_rch observer, Observer::Event e)
 
Observer_rch get_observer (Observer::Event e)
 

Static Public Member Functions

static bool validate_datawriter_qos (const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Publisher >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::LocalObject
static LocalObject_ptr _duplicate (LocalObject_ptr obj)
 
static LocalObject_ptr _nil (void)
 
static LocalObject_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from CORBA::Object
static CORBA::Boolean marshal (const Object_ptr x, TAO_OutputCDR &cdr)
 
static void _tao_any_destructor (void *)
 
static CORBA::Boolean is_nil_i (CORBA::Object_ptr obj)
 
static void tao_object_initialize (Object *)
 
static CORBA::Object_ptr _duplicate (CORBA::Object_ptr obj)
 
static CORBA::Object_ptr _nil (void)
 
static CORBA::Object_ptr _narrow (CORBA::Object_ptr obj)
 
- Static Public Member Functions inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
static _ptr_type _narrow (CORBA::Object_ptr obj)
 

Private Types

typedef ACE_Recursive_Thread_Mutex lock_type
 
typedef ACE_Reverse_Lock< lock_typereverse_lock_type
 

Private Member Functions

typedef OPENDDS_MULTIMAP (OPENDDS_STRING, DataWriterImpl_rch) DataWriterMap
 
typedef OPENDDS_MAP_CMP (GUID_t, DataWriterImpl_rch, GUID_tKeyLessThan) PublicationMap
 
typedef OPENDDS_MAP_CMP (GUID_t, DDS::DataWriterQos, GUID_tKeyLessThan) DwIdToQosMap
 
typedef OPENDDS_SET (DataWriterImpl_rch) DataWriterSet
 

Private Attributes

DDS::InstanceHandle_t handle_
 
DDS::PublisherQos qos_
 Publisher QoS policy list. More...
 
DDS::DataWriterQos default_datawriter_qos_
 Default datawriter Qos policy list. More...
 
ACE_Thread_Mutex listener_mutex_
 Mutex to protect listener info. More...
 
DDS::StatusMask listener_mask_
 
DDS::PublisherListener_var listener_
 Used to notify the entity for relevant events. More...
 
DataWriterSet writers_not_enabled_
 
DataWriterMap datawriter_map_
 This map is used to support datawriter lookup by topic name. More...
 
PublicationMap publication_map_
 
std::size_t change_depth_
 The number of times begin_coherent_changes as been called. More...
 
DDS::DomainId_t domain_id_
 Domain in which we are contained. More...
 
WeakRcHandle< DomainParticipantImplparticipant_
 The DomainParticipant servant that owns this Publisher. More...
 
CORBA::Short suspend_depth_count_
 The suspend depth count. More...
 
SequenceNumber sequence_number_
 
MonotonicTimePoint aggregation_period_start_
 Start of current aggregation period. - NOT USED IN FIRST IMPL. More...
 
lock_type pi_lock_
 The recursive lock to protect datawriter map and suspend count. More...
 
reverse_lock_type reverse_pi_lock_
 
lock_type pi_suspended_lock_
 
unique_ptr< Monitormonitor_
 Monitor object for this entity. More...
 
GUID_t publisher_id_
 

Friends

class DataWriterImpl
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Publisher >
typedef DDS::Publisher ::_ptr_type _ptr_type
 
typedef DDS::Publisher ::_var_type _var_type
 
- Public Types inherited from CORBA::LocalObject
typedef LocalObject_ptr _ptr_type
 
typedef LocalObject_var _var_type
 
typedef LocalObject_out _out_type
 
- Public Types inherited from CORBA::Object
typedef Object_ptr _ptr_type
 
typedef Object_var _var_type
 
typedef Object_out _out_type
 
- Public Types inherited from OpenDDS::DCPS::LocalObject< DDS::Entity >
typedef DDS::Entity ::_ptr_type _ptr_type
 
typedef DDS::Entity ::_var_type _var_type
 
- Protected Member Functions inherited from CORBA::LocalObject
 LocalObject (void)
 
- Protected Member Functions inherited from CORBA::Object
 Object (int dummy=0)
 
TAO::Object_Proxy_Brokerproxy_broker () const
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Member Functions inherited from OpenDDS::DCPS::EntityImpl
DDS::ReturnCode_t set_enabled ()
 
void set_deleted (bool state)
 
bool get_deleted () const
 
DDS::InstanceHandle_t get_entity_instance_handle (const GUID_t &id, const RcHandle< DomainParticipantImpl > &participant)
 
- Protected Attributes inherited from CORBA::Object
ACE_Atomic_Op< TAO_SYNCH_MUTEX, unsigned long > refcount_
 
- Protected Attributes inherited from OpenDDS::DCPS::EntityImpl
AtomicBool enabled_
 The flag indicates the entity is enabled. More...
 
AtomicBool entity_deleted_
 The flag indicates the entity is being deleted. More...
 

Detailed Description

Implements the OpenDDS::DCPS::Publisher interfaces.

This class acts as a factory and container of the datawriter.

See the DDS specification, OMG formal/2015-04-10, for a description of the interface this class is implementing.

Definition at line 38 of file PublisherImpl.h.

Member Typedef Documentation

◆ lock_type

Definition at line 205 of file PublisherImpl.h.

◆ reverse_lock_type

Definition at line 206 of file PublisherImpl.h.

Constructor & Destructor Documentation

◆ PublisherImpl()

OpenDDS::DCPS::PublisherImpl::PublisherImpl ( DDS::InstanceHandle_t  handle,
GUID_t  id,
const DDS::PublisherQos qos,
DDS::PublisherListener_ptr  a_listener,
const DDS::StatusMask mask,
DomainParticipantImpl participant 
)

Definition at line 29 of file PublisherImpl.cpp.

References _duplicate(), monitor_, and TheServiceParticipant.

35 : handle_(handle),
36  qos_(qos),
37  default_datawriter_qos_(TheServiceParticipant->initial_DataWriterQos()),
38  listener_mask_(mask),
39  listener_(DDS::PublisherListener::_duplicate(a_listener)),
40 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
41  change_depth_(0),
42 #endif
43  domain_id_(participant->get_domain_id()),
44  participant_(*participant),
48  publisher_id_(id)
49 {
50  monitor_.reset(TheServiceParticipant->monitor_factory_->create_publisher_monitor(this));
51 }
CORBA::Short suspend_depth_count_
The suspend depth count.
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
DDS::DomainId_t domain_id_
Domain in which we are contained.
DDS::PublisherQos qos_
Publisher QoS policy list.
DDS::InstanceHandle_t handle_
unique_ptr< Monitor > monitor_
Monitor object for this entity.
reverse_lock_type reverse_pi_lock_
DDS::StatusMask listener_mask_
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
#define TheServiceParticipant
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.

◆ ~PublisherImpl()

OpenDDS::DCPS::PublisherImpl::~PublisherImpl ( )
virtual

Definition at line 53 of file PublisherImpl.cpp.

References ACE_ERROR, handle_, is_clean(), LM_WARNING, OpenDDS::DCPS::log_level, participant_, and OpenDDS::DCPS::LogLevel::Warning.

54 {
55  const RcHandle<DomainParticipantImpl> participant = participant_.lock();
56  if (participant) {
57  participant->return_handle(handle_);
58  }
59 
60  // The datawriters should be deleted already before calling delete
61  // publisher.
62  String leftover_entities;
63  if (!is_clean(&leftover_entities)) {
65  ACE_ERROR((LM_WARNING, "(%P|%t) WARNING: PublisherImpl::~PublisherImpl: "
66  "%C still exist\n", leftover_entities.c_str()));
67  }
68  }
69 }
OpenDDS_Dcps_Export LogLevel log_level
#define ACE_ERROR(X)
bool is_clean(String *leftover_entities=0) const
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
DDS::InstanceHandle_t handle_
std::string String

Member Function Documentation

◆ assert_liveliness_by_participant()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::assert_liveliness_by_participant ( )

Definition at line 960 of file PublisherImpl.cpp.

References datawriter_map_, and DDS::RETCODE_OK.

961 {
963 
964  for (DataWriterMap::iterator it(datawriter_map_.begin());
965  it != datawriter_map_.end(); ++it) {
966  const DDS::ReturnCode_t dw_ret = it->second->assert_liveliness_by_participant();
967 
968  if (dw_ret != DDS::RETCODE_OK) {
969  ret = dw_ret;
970  }
971  }
972 
973  return ret;
974 }
const ReturnCode_t RETCODE_OK
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.

◆ begin_coherent_changes()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::begin_coherent_changes ( )
virtual

Implements DDS::Publisher.

Definition at line 609 of file PublisherImpl.cpp.

References DDS::PresentationQosPolicy::access_scope, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), change_depth_, DDS::PresentationQosPolicy::coherent_access, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

610 {
611  if (!enabled_) {
612  if (DCPS_debug_level > 0) {
613  ACE_ERROR((LM_ERROR,
614  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
615  ACE_TEXT(" Publisher is not enabled!\n")));
616  }
618  }
619 
621  if (DCPS_debug_level > 0) {
622  ACE_ERROR((LM_ERROR,
623  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::begin_coherent_changes:")
624  ACE_TEXT(" QoS policy does not support coherent access!\n")));
625  }
626  return DDS::RETCODE_ERROR;
627  }
628 
630  guard,
631  this->pi_lock_,
633 
634  ++this->change_depth_;
635 
637  // INSTANCE access scope essentially behaves
638  // as a no-op. (see: 7.1.3.6)
639  return DDS::RETCODE_OK;
640  }
641 
642  // We should only notify publications on the first
643  // and last change to the current change set:
644  if (this->change_depth_ == 1) {
645  for (PublicationMap::iterator it = this->publication_map_.begin();
646  it != this->publication_map_.end(); ++it) {
647  it->second->begin_coherent_changes();
648  }
649  }
650 
651  return DDS::RETCODE_OK;
652 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
DDS::PublisherQos qos_
Publisher QoS policy list.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
PresentationQosPolicy presentation
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
PresentationQosPolicyAccessScopeKind access_scope
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ contains_writer()

bool OpenDDS::DCPS::PublisherImpl::contains_writer ( DDS::InstanceHandle_t  a_handle)

Definition at line 78 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, datawriter_map_, pi_lock_, and DDS::RETCODE_ERROR.

79 {
81  guard,
82  this->pi_lock_,
84 
85  for (DataWriterMap::iterator it(datawriter_map_.begin());
86  it != datawriter_map_.end(); ++it) {
87  if (a_handle == it->second->get_instance_handle()) {
88  return true;
89  }
90  }
91 
92  return false;
93 }
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ copy_from_topic_qos()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::copy_from_topic_qos ( DDS::DataWriterQos a_datawriter_qos,
const DDS::TopicQos a_topic_qos 
)
virtual

Definition at line 831 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DDS::RETCODE_INCONSISTENT_POLICY, and DDS::RETCODE_OK.

833 {
834  if (Qos_Helper::copy_from_topic_qos(a_datawriter_qos, a_topic_qos)) {
835  return DDS::RETCODE_OK;
836  } else {
838  }
839 }
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)

◆ create_datawriter()

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::create_datawriter ( DDS::Topic_ptr  a_topic,
const DDS::DataWriterQos qos,
DDS::DataWriterListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 96 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), DDS::EntityFactoryQosPolicy::autoenable_created_entities, OpenDDS::DCPS::DCPS_debug_level, default_datawriter_qos_, OpenDDS::DCPS::DataWriterImpl::enable(), OpenDDS::DCPS::EntityImpl::enabled_, DDS::PublisherQos::entity_factory, OpenDDS::DCPS::TopicDescriptionImpl::get_name(), OpenDDS::DCPS::TopicDescriptionImpl::get_type_support(), TAO::String_var< charT >::in(), OpenDDS::DCPS::DataWriterImpl::init(), LM_ERROR, LM_WARNING, name, participant_, pi_lock_, qos_, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_OK, validate_datawriter_qos(), and writers_not_enabled_.

101 {
102  DDS::DataWriterQos dw_qos;
103 
104  if (!validate_datawriter_qos(qos, default_datawriter_qos_, a_topic, dw_qos)) {
105  return DDS::DataWriter::_nil();
106  }
107 
108  TopicImpl* topic_servant = dynamic_cast<TopicImpl*>(a_topic);
109 
110  if (!topic_servant) {
111  if (DCPS_debug_level > 0) {
112  CORBA::String_var name = a_topic->get_name();
113  ACE_ERROR((LM_ERROR,
114  ACE_TEXT("(%P|%t) ERROR: ")
115  ACE_TEXT("PublisherImpl::create_datawriter, ")
116  ACE_TEXT("topic_servant(topic_name=%C) is nil.\n"),
117  name.in()));
118  }
119  return 0;
120  }
121 
122  OpenDDS::DCPS::TypeSupport_ptr typesupport =
123  topic_servant->get_type_support();
124 
125  if (typesupport == 0) {
126  if (DCPS_debug_level > 0) {
127  CORBA::String_var name = topic_servant->get_name();
128  ACE_ERROR((LM_ERROR,
129  ACE_TEXT("(%P|%t) ERROR: ")
130  ACE_TEXT("PublisherImpl::create_datawriter, ")
131  ACE_TEXT("typesupport(topic_name=%C) is nil.\n"),
132  name.in()));
133  }
134  return DDS::DataWriter::_nil();
135  }
136 
137  DDS::DataWriter_var dw_obj = typesupport->create_datawriter();
138 
139  DataWriterImpl* dw_servant =
140  dynamic_cast <DataWriterImpl*>(dw_obj.in());
141 
142  if (dw_servant == 0) {
143  if (DCPS_debug_level > 0) {
144  ACE_ERROR((LM_ERROR,
145  ACE_TEXT("(%P|%t) ERROR: ")
146  ACE_TEXT("PublisherImpl::create_datawriter, ")
147  ACE_TEXT("servant is nil.\n")));
148  }
149  return DDS::DataWriter::_nil();
150  }
151 
152  dw_servant->init(
153  topic_servant,
154  dw_qos,
155  a_listener,
156  mask,
157  participant_,
158  this);
159 
161  const DDS::ReturnCode_t ret = dw_servant->enable();
162 
163  if (ret != DDS::RETCODE_OK) {
164  if (DCPS_debug_level > 0) {
165  ACE_ERROR((LM_WARNING,
166  ACE_TEXT("(%P|%t) WARNING: ")
167  ACE_TEXT("PublisherImpl::create_datawriter, ")
168  ACE_TEXT("enable failed.\n")));
169  }
170  return DDS::DataWriter::_nil();
171  }
172  } else {
174  writers_not_enabled_.insert(rchandle_from(dw_servant));
175  }
176 
177  return DDS::DataWriter::_duplicate(dw_obj.in());
178 }
#define ACE_ERROR(X)
static bool validate_datawriter_qos(const DDS::DataWriterQos &qos, const DDS::DataWriterQos &default_qos, DDS::Topic_ptr a_topic, DDS::DataWriterQos &dw_qos)
const ReturnCode_t RETCODE_OK
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
DDS::PublisherQos qos_
Publisher QoS policy list.
EntityFactoryQosPolicy entity_factory
DataWriterSet writers_not_enabled_
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
const char *const name
Definition: debug.cpp:60
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
const character_type * in(void) const
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82

◆ delete_contained_entities()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_contained_entities ( )
virtual

Implements DDS::Publisher.

Definition at line 369 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, delete_datawriter(), OpenDDS::DCPS::EntityImpl::get_deleted(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::RcHandle< T >::in(), LM_ERROR, pi_lock_, prepare_to_delete_datawriters(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, OpenDDS::DCPS::EntityImpl::set_deleted(), set_wait_pending_deadline(), and TheServiceParticipant.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher().

370 {
371  // If the call isn't part of another delete, prepare the datawriters to be
372  // deleted and set the pending deadline on all the writers.
373  if (!get_deleted()) {
374  // mark that the entity is being deleted
375  set_deleted(true);
376 
378  return DDS::RETCODE_ERROR;
379  }
380  if (!set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline())) {
381  return DDS::RETCODE_ERROR;
382  }
383  }
384 
385  while (true) {
386  GUID_t pub_id = GUID_UNKNOWN;
387  DataWriterImpl_rch a_datawriter;
388 
389  {
391  guard,
392  this->pi_lock_,
394 
395  if (datawriter_map_.empty()) {
396  break;
397  } else {
398  a_datawriter = datawriter_map_.begin()->second;
399  pub_id = a_datawriter->get_guid();
400  }
401  }
402 
403  const DDS::ReturnCode_t ret = delete_datawriter(a_datawriter.in());
404 
405  if (ret != DDS::RETCODE_OK) {
406  if (DCPS_debug_level > 0) {
407  ACE_ERROR((LM_ERROR,
408  ACE_TEXT("(%P|%t) ERROR: ")
409  ACE_TEXT("PublisherImpl::")
410  ACE_TEXT("delete_contained_entities: ")
411  ACE_TEXT("failed to delete ")
412  ACE_TEXT("datawriter %C.\n"),
413  LogGuid(pub_id).c_str()));
414  }
415  return ret;
416  }
417  }
418 
419  // the publisher can now start creating new publications
420  set_deleted(false);
421 
422  return DDS::RETCODE_OK;
423 }
bool set_wait_pending_deadline(const MonotonicTimePoint &deadline)
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
RcHandle< DataWriterImpl > DataWriterImpl_rch
void set_deleted(bool state)
Definition: EntityImpl.cpp:83
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
virtual DDS::ReturnCode_t delete_datawriter(DDS::DataWriter_ptr a_datawriter)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
#define TheServiceParticipant

◆ delete_datawriter()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::delete_datawriter ( DDS::DataWriter_ptr  a_datawriter)
virtual

Definition at line 181 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DataWriterImpl::cleanup(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::EntityImpl::get_deleted(), OpenDDS::DCPS::DataWriterImpl::get_guid(), OpenDDS::DCPS::DataWriterImpl::get_publisher(), OpenDDS::DCPS::GUID_UNKNOWN, LM_ERROR, monitor_, participant_, pi_lock_, OpenDDS::DCPS::DataWriterImpl::prepare_to_delete(), publication_map_, OpenDDS::DCPS::DataWriterImpl::remove_all_associations(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, reverse_pi_lock_, OpenDDS::DCPS::DataWriterImpl::set_wait_pending_deadline(), TheServiceParticipant, OpenDDS::DCPS::DataWriterImpl::unregister_all(), and OpenDDS::DCPS::DataWriterImpl::wait_pending().

Referenced by delete_contained_entities().

182 {
183  DataWriterImpl* dw_servant = dynamic_cast<DataWriterImpl*>(a_datawriter);
184  if (!dw_servant) {
185  if (DCPS_debug_level > 0) {
186  ACE_ERROR((LM_ERROR,
187  "(%P|%t) PublisherImpl::delete_datawriter - dynamic cast to DataWriterImpl failed\n"));
188  }
189  return DDS::RETCODE_ERROR;
190  }
191 
192  {
193  DDS::Publisher_var dw_publisher(dw_servant->get_publisher());
194 
195  if (dw_publisher.in() != this) {
196  if (DCPS_debug_level > 0) {
197  ACE_ERROR((LM_ERROR,
198  ACE_TEXT("(%P|%t) PublisherImpl::delete_datawriter: ")
199  ACE_TEXT("the data writer %C doesn't ")
200  ACE_TEXT("belong to this subscriber\n"),
201  LogGuid(dw_servant->get_guid()).c_str()));
202  }
204  }
205  }
206 
207  if (!dw_servant->get_deleted()) {
208  dw_servant->prepare_to_delete();
209  dw_servant->set_wait_pending_deadline(TheServiceParticipant->new_pending_timeout_deadline());
210  }
211 
212  // Wait for any data and control messages to be transported during
213  // unregistering of instances.
214  dw_servant->wait_pending();
215 
216  GUID_t publication_id = GUID_UNKNOWN;
217  {
219  guard,
220  this->pi_lock_,
222 
223  publication_id = dw_servant->get_guid();
224 
225  PublicationMap::iterator it = publication_map_.find(publication_id);
226 
227  if (it == publication_map_.end()) {
228  if (DCPS_debug_level > 0) {
229  ACE_ERROR((LM_ERROR,
230  ACE_TEXT("(%P|%t) ERROR: ")
231  ACE_TEXT("PublisherImpl::delete_datawriter, ")
232  ACE_TEXT("datawriter %C not found.\n"),
233  LogGuid(publication_id).c_str()));
234  }
235  return DDS::RETCODE_ERROR;
236  }
237 
238  // We can not erase the datawriter from datawriter map by the topic name
239  // because the map might have multiple datawriters with the same topic
240  // name.
241  // Find the iterator to the datawriter in the datawriter map and erase
242  // by the iterator.
243  DataWriterMap::iterator writ;
244  DataWriterMap::iterator the_writ = datawriter_map_.end();
245 
246  for (writ = datawriter_map_.begin();
247  writ != datawriter_map_.end();
248  ++writ) {
249  if (writ->second == it->second) {
250  the_writ = writ;
251  break;
252  }
253  }
254 
255  if (the_writ != datawriter_map_.end()) {
256  datawriter_map_.erase(the_writ);
257  }
258 
259  publication_map_.erase(it);
260 
261  // not just unregister but remove any pending writes/sends.
262  dw_servant->unregister_all();
263 
264  // Release pi_lock_ before making call to transport layer to avoid
265  // some deadlock situations that threads acquire locks(PublisherImpl
266  // pi_lock_, TransportClient reservation_lock and TransportImpl
267  // lock_) in reverse order.
268  ACE_GUARD_RETURN(reverse_lock_type, reverse_monitor, this->reverse_pi_lock_,
270  // Wait for pending samples to drain prior to removing associations
271  // and unregistering the publication.
272  dw_servant->wait_pending();
273 
274  // Call remove association before unregistering the datawriter
275  // with the transport, otherwise some callbacks resulted from
276  // remove_association may lost.
277  dw_servant->remove_all_associations();
278  dw_servant->cleanup();
279  }
280 
281  if (this->monitor_) {
282  this->monitor_->report();
283  }
284 
285  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
286 
287  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
288  if (!disco->remove_publication(
289  this->domain_id_,
290  participant->get_id(),
291  publication_id)) {
292  if (DCPS_debug_level > 0) {
293  ACE_ERROR((LM_ERROR,
294  ACE_TEXT("(%P|%t) ERROR: ")
295  ACE_TEXT("PublisherImpl::delete_datawriter, ")
296  ACE_TEXT("publication not removed from discovery.\n")));
297  }
298  return DDS::RETCODE_ERROR;
299  }
300 
301  participant->remove_adjust_liveliness_timers();
302 
303  return DDS::RETCODE_OK;
304 }
ACE_Reverse_Lock< lock_type > reverse_lock_type
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
DDS::DomainId_t domain_id_
Domain in which we are contained.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
unique_ptr< Monitor > monitor_
Monitor object for this entity.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
reverse_lock_type reverse_pi_lock_
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
#define TheServiceParticipant

◆ enable()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::enable ( )
virtual

Implements DDS::Entity.

Definition at line 842 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, DDS::EntityFactoryQosPolicy::autoenable_created_entities, DDS::PublisherQos::entity_factory, OpenDDS::DCPS::EntityImpl::is_enabled(), monitor_, participant_, pi_lock_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, OpenDDS::DCPS::EntityImpl::set_enabled(), and writers_not_enabled_.

Referenced by OpenDDS::DCPS::DomainParticipantImpl::create_publisher().

843 {
844  //According spec:
845  // - Calling enable on an already enabled Entity returns OK and has no
846  // effect.
847  // - Calling enable on an Entity whose factory is not enabled will fail
848  // and return PRECONDITION_NOT_MET.
849 
850  if (this->is_enabled()) {
851  return DDS::RETCODE_OK;
852  }
853 
854  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
855  if (!participant || !participant->is_enabled()) {
857  }
858 
859  if (this->monitor_) {
860  this->monitor_->report();
861  }
862 
863  this->set_enabled();
864 
867  DataWriterSet writers;
868  writers_not_enabled_.swap(writers);
869  for (DataWriterSet::iterator it = writers.begin(); it != writers.end(); ++it) {
870  (*it)->enable();
871  }
872  }
873 
874  return DDS::RETCODE_OK;
875 }
const ReturnCode_t RETCODE_OK
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
DDS::PublisherQos qos_
Publisher QoS policy list.
EntityFactoryQosPolicy entity_factory
DataWriterSet writers_not_enabled_
DDS::ReturnCode_t set_enabled()
Definition: EntityImpl.cpp:40
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
unique_ptr< Monitor > monitor_
Monitor object for this entity.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ end_coherent_changes()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::end_coherent_changes ( )
virtual

Implements DDS::Publisher.

Definition at line 655 of file PublisherImpl.cpp.

References DDS::PresentationQosPolicy::access_scope, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), change_depth_, DDS::PresentationQosPolicy::coherent_access, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, DDS::INSTANCE_PRESENTATION_QOS, LM_ERROR, pi_lock_, DDS::PublisherQos::presentation, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and DDS::RETCODE_PRECONDITION_NOT_MET.

656 {
657  if (!enabled_) {
658  if (DCPS_debug_level > 0) {
659  ACE_ERROR((LM_ERROR,
660  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
661  ACE_TEXT(" Publisher is not enabled!\n")));
662  }
664  }
665 
667  if (DCPS_debug_level > 0) {
668  ACE_ERROR((LM_ERROR,
669  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
670  ACE_TEXT(" QoS policy does not support coherent access!\n")));
671  }
672  return DDS::RETCODE_ERROR;
673  }
674 
676  guard,
677  this->pi_lock_,
679 
680  if (this->change_depth_ == 0) {
681  if (DCPS_debug_level > 0) {
682  ACE_ERROR((LM_ERROR,
683  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes:")
684  ACE_TEXT(" No matching call to begin_coherent_changes!\n")));
685  }
687  }
688 
689  --this->change_depth_;
690 
692  // INSTANCE access scope essentially behaves
693  // as a no-op. (see: 7.1.3.6)
694  return DDS::RETCODE_OK;
695  }
696 
697  // We should only notify publications on the first
698  // and last change to the current change set:
699  if (this->change_depth_ == 0) {
700  GroupCoherentSamples group_samples;
701  for (PublicationMap::iterator it = this->publication_map_.begin();
702  it != this->publication_map_.end(); ++it) {
703 
704  if (it->second->coherent_samples_ == 0) {
705  continue;
706  }
707 
708  std::pair<GroupCoherentSamples::iterator, bool> pair =
709  group_samples.insert(GroupCoherentSamples::value_type(
710  it->second->get_guid(),
711  WriterCoherentSample(it->second->coherent_samples_,
712  it->second->sequence_number_)));
713 
714  if (!pair.second) {
715  if (DCPS_debug_level > 0) {
716  ACE_ERROR((LM_ERROR,
717  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::end_coherent_changes: ")
718  ACE_TEXT("failed to insert to GroupCoherentSamples.\n")));
719  }
720  return DDS::RETCODE_ERROR;
721  }
722  }
723 
724  for (PublicationMap::iterator it = this->publication_map_.begin();
725  it != this->publication_map_.end(); ++it) {
726  if (it->second->coherent_samples_ == 0) {
727  continue;
728  }
729 
730  it->second->end_coherent_changes(group_samples);
731  }
732  }
733 
734  return DDS::RETCODE_OK;
735 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
DDS::PublisherQos qos_
Publisher QoS policy list.
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
PresentationQosPolicy presentation
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
PresentationQosPolicyAccessScopeKind access_scope
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ get_default_datawriter_qos()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_default_datawriter_qos ( DDS::DataWriterQos qos)
virtual

Definition at line 824 of file PublisherImpl.cpp.

References default_datawriter_qos_, and DDS::RETCODE_OK.

825 {
827  return DDS::RETCODE_OK;
828 }
const ReturnCode_t RETCODE_OK
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.

◆ get_instance_handle()

DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::get_instance_handle ( )
virtual

Implements OpenDDS::DCPS::EntityImpl.

Definition at line 72 of file PublisherImpl.cpp.

References handle_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

73 {
74  return handle_;
75 }
DDS::InstanceHandle_t handle_

◆ get_listener()

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::get_listener ( )
virtual

Implements DDS::Publisher.

Definition at line 525 of file PublisherImpl.cpp.

References listener_, and listener_mutex_.

526 {
528  return DDS::PublisherListener::_duplicate(listener_.in());
529 }
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.

◆ get_participant()

DDS::DomainParticipant_ptr OpenDDS::DCPS::PublisherImpl::get_participant ( )
virtual

Implements DDS::Publisher.

Definition at line 806 of file PublisherImpl.cpp.

References participant_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

807 {
808  return participant_.lock()._retn();
809 }
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.

◆ get_pi_lock()

ACE_Recursive_Thread_Mutex& OpenDDS::DCPS::PublisherImpl::get_pi_lock ( )
inline

Definition at line 113 of file PublisherImpl.h.

References OpenDDS::DCPS::OPENDDS_MAP_CMP(), OPENDDS_MULTIMAP, OPENDDS_STRING, and OpenDDS::DCPS::OPENDDS_VECTOR().

113 { return pi_lock_; }
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ get_publication_ids()

void OpenDDS::DCPS::PublisherImpl::get_publication_ids ( PublicationIdVec &  pubs)

Populates a std::vector with the GUID_ts of this Publisher's Data Writers

Definition at line 1000 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, pi_lock_, and publication_map_.

Referenced by OpenDDS::DCPS::PublisherMonitorImpl::report().

1001 {
1003  guard,
1004  this->pi_lock_,
1005  );
1006 
1007  pubs.reserve(publication_map_.size());
1008  for (PublicationMap::iterator iter = publication_map_.begin();
1009  iter != publication_map_.end();
1010  ++iter) {
1011  pubs.push_back(iter->first);
1012  }
1013 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ get_qos()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::get_qos ( DDS::PublisherQos qos)
virtual

Definition at line 507 of file PublisherImpl.cpp.

References qos_, and DDS::RETCODE_OK.

508 {
509  qos = qos_;
510  return DDS::RETCODE_OK;
511 }
const ReturnCode_t RETCODE_OK
DDS::PublisherQos qos_
Publisher QoS policy list.

◆ is_clean()

bool OpenDDS::DCPS::PublisherImpl::is_clean ( String leftover_entities = 0) const

This method is not defined in the IDL and is defined for internal use. Check if there is any datawriter associated with this publisher.

Definition at line 877 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, datawriter_map_, pi_lock_, publication_map_, and OpenDDS::DCPS::to_dds_string().

Referenced by OpenDDS::DCPS::DomainParticipantImpl::delete_publisher(), and ~PublisherImpl().

878 {
879  if (leftover_entities) {
880  leftover_entities->clear();
881  }
882 
884 
885  const size_t writer_count = datawriter_map_.size();
886  if (leftover_entities && writer_count) {
887  *leftover_entities += to_dds_string(writer_count) + " writer(s)";
888  }
889 
890  const size_t publication_count = publication_map_.size();
891  if (leftover_entities && publication_count) {
892  if (leftover_entities->size()) {
893  *leftover_entities += ", ";
894  }
895  *leftover_entities += to_dds_string(publication_count) + " publication(s)";
896  }
897 
898  return writer_count == 0 && publication_count == 0;
899 }
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
String to_dds_string(unsigned short to_convert)

◆ is_suspended()

bool OpenDDS::DCPS::PublisherImpl::is_suspended ( void  ) const

Definition at line 553 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, pi_suspended_lock_, and suspend_depth_count_.

554 {
556  suspend_guard,
557  this->pi_suspended_lock_,
558  false);
559  return suspend_depth_count_;
560 }
CORBA::Short suspend_depth_count_
The suspend depth count.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ listener_for()

DDS::PublisherListener_ptr OpenDDS::DCPS::PublisherImpl::listener_for ( ::DDS::StatusKind  kind)

This is used to retrieve the listener for a certain status change. If this publisher has a registered listener and the status kind is in the listener mask then the listener is returned. Otherwise, the query for listener is propagated up to the factory/DomainParticipant.

Definition at line 939 of file PublisherImpl.cpp.

References CORBA::is_nil(), listener_, listener_mask_, listener_mutex_, participant_, and ACE_Guard< ACE_LOCK >::release().

940 {
941  // per 2.1.4.3.1 Listener Access to Plain Communication Status
942  // use this entities factory if listener is mask not enabled
943  // for this kind.
944  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
945 
946  if (!participant)
947  return 0;
948 
950  if (CORBA::is_nil(listener_.in()) || (listener_mask_ & kind) == 0) {
951  g.release();
952  return participant->listener_for(kind);
953 
954  } else {
955  return DDS::PublisherListener::_duplicate(listener_.in());
956  }
957 }
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
DDS::StatusMask listener_mask_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
Boolean is_nil(T x)
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.

◆ liveliness_check_interval()

TimeDuration OpenDDS::DCPS::PublisherImpl::liveliness_check_interval ( DDS::LivelinessQosPolicyKind  kind)

Definition at line 977 of file PublisherImpl.cpp.

References datawriter_map_, and OpenDDS::DCPS::TimeDuration::max_value.

978 {
979  TimeDuration tv = TimeDuration::max_value;
980  for (DataWriterMap::iterator it(datawriter_map_.begin());
981  it != datawriter_map_.end(); ++it) {
982  tv = std::min(tv, it->second->liveliness_check_interval(kind));
983  }
984  return tv;
985 }
static const TimeDuration max_value
Definition: TimeDuration.h:32
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.

◆ lookup_datawriter()

DDS::DataWriter_ptr OpenDDS::DCPS::PublisherImpl::lookup_datawriter ( const char *  topic_name)
virtual

Definition at line 307 of file PublisherImpl.cpp.

References ACE_DEBUG, ACE_GUARD_RETURN, ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, LM_DEBUG, and pi_lock_.

308 {
310  guard,
311  this->pi_lock_,
312  DDS::DataWriter::_nil());
313 
314  // If multiple entries whose key is "topic_name" then which one is
315  // returned ? Spec does not limit which one should give.
316  DataWriterMap::iterator it = datawriter_map_.find(topic_name);
317 
318  if (it == datawriter_map_.end()) {
319  if (DCPS_debug_level >= 2) {
320  ACE_DEBUG((LM_DEBUG,
321  ACE_TEXT("(%P|%t) ")
322  ACE_TEXT("PublisherImpl::lookup_datawriter, ")
323  ACE_TEXT("The datawriter(topic_name=%C) is not found\n"),
324  topic_name));
325  }
326 
327  return DDS::DataWriter::_nil();
328 
329  } else {
330  return DDS::DataWriter::_duplicate(it->second.in());
331  }
332 }
#define ACE_DEBUG(X)
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ OPENDDS_MAP_CMP() [1/2]

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DataWriterImpl_rch  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MAP_CMP() [2/2]

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MAP_CMP ( GUID_t  ,
DDS::DataWriterQos  ,
GUID_tKeyLessThan   
)
private

◆ OPENDDS_MULTIMAP()

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_MULTIMAP ( OPENDDS_STRING  ,
DataWriterImpl_rch   
)
private

◆ OPENDDS_SET()

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_SET ( DataWriterImpl_rch  )
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::PublisherImpl::OPENDDS_VECTOR ( GUID_t  )

◆ parent()

RcHandle< EntityImpl > OpenDDS::DCPS::PublisherImpl::parent ( void  ) const
virtual

Reimplemented from OpenDDS::DCPS::EntityImpl.

Definition at line 1016 of file PublisherImpl.cpp.

References participant_.

1017 {
1018  return this->participant_.lock();
1019 }
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.

◆ participant_liveliness_activity_after()

bool OpenDDS::DCPS::PublisherImpl::participant_liveliness_activity_after ( const MonotonicTimePoint tv)

Definition at line 988 of file PublisherImpl.cpp.

References datawriter_map_.

989 {
990  for (DataWriterMap::iterator it(datawriter_map_.begin());
991  it != datawriter_map_.end(); ++it) {
992  if (it->second->participant_liveliness_activity_after(tv)) {
993  return true;
994  }
995  }
996  return false;
997 }
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.

◆ prepare_to_delete_datawriters()

bool OpenDDS::DCPS::PublisherImpl::prepare_to_delete_datawriters ( )

Definition at line 334 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, datawriter_map_, OpenDDS::DCPS::EntityImpl::get_deleted(), pi_lock_, and OpenDDS::DCPS::DataWriterImpl::prepare_to_delete().

Referenced by delete_contained_entities().

335 {
337  bool result = true;
338  const DataWriterMap::iterator end = datawriter_map_.end();
339  for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
340  DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
341  if (writer) {
342  if (!writer->get_deleted()) {
343  writer->prepare_to_delete();
344  }
345  } else {
346  result = false;
347  }
348  }
349 
350  return result;
351 }
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ resume_publications()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::resume_publications ( )
virtual

Implements DDS::Publisher.

Definition at line 563 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_lock_, pi_suspended_lock_, publication_map_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, DDS::RETCODE_PRECONDITION_NOT_MET, and suspend_depth_count_.

564 {
565  if (!enabled_) {
566  if (DCPS_debug_level > 0) {
567  ACE_ERROR((LM_ERROR,
568  ACE_TEXT("(%P|%t) ERROR: ")
569  ACE_TEXT("PublisherImpl::resume_publications, ")
570  ACE_TEXT(" Entity is not enabled.\n")));
571  }
573  }
574 
575  PublicationMap publication_map_copy;
576  {
578  suspend_guard,
579  this->pi_suspended_lock_,
582 
583  if (suspend_depth_count_ < 0) {
586  }
587  if (suspend_depth_count_ == 0) {
588  suspend_guard.release();
590  guard,
591  this->pi_lock_,
593 
594  publication_map_copy = publication_map_;
595  }
596  }
597 
598  for (PublicationMap::const_iterator it = publication_map_copy.begin();
599  it != publication_map_copy.end(); ++it) {
600  it->second->send_suspended_data();
601  }
602 
603  return DDS::RETCODE_OK;
604 }
#define ACE_ERROR(X)
CORBA::Short suspend_depth_count_
The suspend depth count.
const ReturnCode_t RETCODE_OK
const ReturnCode_t RETCODE_PRECONDITION_NOT_MET
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ set_default_datawriter_qos()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_default_datawriter_qos ( const DDS::DataWriterQos qos)
virtual

Definition at line 812 of file PublisherImpl.cpp.

References OpenDDS::DCPS::Qos_Helper::consistent(), default_datawriter_qos_, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, and OpenDDS::DCPS::Qos_Helper::valid().

813 {
814  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
816  return DDS::RETCODE_OK;
817 
818  } else {
820  }
821 }
const ReturnCode_t RETCODE_OK
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.

◆ set_listener()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_listener ( DDS::PublisherListener_ptr  a_listener,
DDS::StatusMask  mask 
)
virtual

Definition at line 514 of file PublisherImpl.cpp.

References listener_, listener_mask_, listener_mutex_, and DDS::RETCODE_OK.

516 {
518  listener_mask_ = mask;
519  //note: OK to duplicate a nil object ref
520  listener_ = DDS::PublisherListener::_duplicate(a_listener);
521  return DDS::RETCODE_OK;
522 }
const ReturnCode_t RETCODE_OK
DDS::StatusMask listener_mask_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.

◆ set_qos()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::set_qos ( const DDS::PublisherQos qos)
virtual

Definition at line 426 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::changeable(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::DCPS_debug_level, domain_id_, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK, participant_, pi_lock_, publication_map_, qos_, DDS::RETCODE_ERROR, DDS::RETCODE_IMMUTABLE_POLICY, DDS::RETCODE_INCONSISTENT_POLICY, DDS::RETCODE_OK, DDS::RETCODE_UNSUPPORTED, TheServiceParticipant, and OpenDDS::DCPS::Qos_Helper::valid().

427 {
428 
430 
431  if (Qos_Helper::valid(qos) && Qos_Helper::consistent(qos)) {
432  if (qos_ == qos)
433  return DDS::RETCODE_OK;
434 
435  // for the not changeable qos, it can be changed before enable
436  if (!Qos_Helper::changeable(qos_, qos) && enabled_) {
438 
439  } else {
440  qos_ = qos;
441 
442  DwIdToQosMap idToQosMap;
443  {
445  guard,
446  this->pi_lock_,
448 
449  for (PublicationMap::iterator iter = publication_map_.begin();
450  iter != publication_map_.end();
451  ++iter) {
452  DDS::DataWriterQos qos = iter->second->qos_;
453  GUID_t id = iter->second->get_guid();
454  std::pair<DwIdToQosMap::iterator, bool> pair =
455  idToQosMap.insert(DwIdToQosMap::value_type(id, qos));
456 
457  if (!pair.second) {
458  if (DCPS_debug_level > 0) {
459  ACE_ERROR((LM_ERROR,
460  ACE_TEXT("(%P|%t) ")
461  ACE_TEXT("PublisherImpl::set_qos: ")
462  ACE_TEXT("insert id %C to DwIdToQosMap ")
463  ACE_TEXT("failed.\n"),
464  LogGuid(id).c_str()));
465  }
466  return DDS::RETCODE_ERROR;
467  }
468  }
469  }
470 
471  DwIdToQosMap::iterator iter = idToQosMap.begin();
472 
473  while (iter != idToQosMap.end()) {
474  Discovery_rch disco = TheServiceParticipant->get_discovery(this->domain_id_);
475  bool status = false;
476 
477  RcHandle<DomainParticipantImpl> participant = this->participant_.lock();
478  if (participant)
479  status = disco->update_publication_qos(
480  participant->get_domain_id(),
481  participant->get_id(),
482  iter->first,
483  iter->second,
484  this->qos_);
485 
486  if (!status) {
487  if (DCPS_debug_level > 0) {
488  ACE_ERROR((LM_ERROR,
489  ACE_TEXT("(%P|%t) PublisherImpl::set_qos, ")
490  ACE_TEXT("failed.\n")));
491  }
492  return DDS::RETCODE_ERROR;
493  }
494 
495  ++iter;
496  }
497  }
498 
499  return DDS::RETCODE_OK;
500 
501  } else {
503  }
504 }
#define ACE_ERROR(X)
RcHandle< Discovery > Discovery_rch
Definition: Discovery.h:296
const ReturnCode_t RETCODE_OK
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
DDS::DomainId_t domain_id_
Domain in which we are contained.
DDS::PublisherQos qos_
Publisher QoS policy list.
const ReturnCode_t RETCODE_INCONSISTENT_POLICY
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
const ReturnCode_t RETCODE_IMMUTABLE_POLICY
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
static bool changeable(const DDS::UserDataQosPolicy &qos1, const DDS::UserDataQosPolicy &qos2)
Definition: Qos_Helper.inl:948
#define OPENDDS_NO_OBJECT_MODEL_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
const ReturnCode_t RETCODE_UNSUPPORTED
#define TheServiceParticipant
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82

◆ set_wait_pending_deadline()

bool OpenDDS::DCPS::PublisherImpl::set_wait_pending_deadline ( const MonotonicTimePoint deadline)

Definition at line 353 of file PublisherImpl.cpp.

References ACE_GUARD_RETURN, datawriter_map_, pi_lock_, and OpenDDS::DCPS::DataWriterImpl::set_wait_pending_deadline().

Referenced by delete_contained_entities().

354 {
356  bool result = true;
357  const DataWriterMap::iterator end = datawriter_map_.end();
358  for (DataWriterMap::iterator i = datawriter_map_.begin(); i != end; ++i) {
359  DataWriterImpl* const writer = dynamic_cast<DataWriterImpl*>(i->second.in());
360  if (writer) {
361  writer->set_wait_pending_deadline(deadline);
362  } else {
363  result = false;
364  }
365  }
366  return result;
367 }
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

◆ suspend_publications()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::suspend_publications ( )
virtual

Implements DDS::Publisher.

Definition at line 532 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, LM_ERROR, pi_suspended_lock_, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, DDS::RETCODE_OK, and suspend_depth_count_.

533 {
534  if (!enabled_) {
535  if (DCPS_debug_level > 0) {
536  ACE_ERROR((LM_ERROR,
537  ACE_TEXT("(%P|%t) ERROR: ")
538  ACE_TEXT("PublisherImpl::suspend_publications, ")
539  ACE_TEXT(" Entity is not enabled.\n")));
540  }
542  }
543 
545  suspend_guard,
546  this->pi_suspended_lock_,
549  return DDS::RETCODE_OK;
550 }
#define ACE_ERROR(X)
CORBA::Short suspend_depth_count_
The suspend depth count.
const ReturnCode_t RETCODE_OK
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ validate_datawriter_qos()

bool OpenDDS::DCPS::PublisherImpl::validate_datawriter_qos ( const DDS::DataWriterQos qos,
const DDS::DataWriterQos default_qos,
DDS::Topic_ptr  a_topic,
DDS::DataWriterQos dw_qos 
)
static

Definition at line 1022 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_TEXT(), OpenDDS::DCPS::Qos_Helper::consistent(), OpenDDS::DCPS::Qos_Helper::copy_from_topic_qos(), DATAWRITER_QOS_DEFAULT, DATAWRITER_QOS_USE_TOPIC_QOS, OpenDDS::DCPS::DCPS_debug_level, CORBA::is_nil(), LM_ERROR, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK, OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK, OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK, and OpenDDS::DCPS::Qos_Helper::valid().

Referenced by create_datawriter(), and OpenDDS::DCPS::DomainParticipantImpl::create_replayer().

1026 {
1027  if (CORBA::is_nil(a_topic)) {
1028  if (DCPS_debug_level > 0) {
1029  ACE_ERROR((LM_ERROR,
1030  ACE_TEXT("(%P|%t) ERROR: ")
1031  ACE_TEXT("PublisherImpl::create_datawriter, ")
1032  ACE_TEXT("topic is nil.\n")));
1033  }
1034  return DDS::DataWriter::_nil();
1035  }
1036 
1037  if (qos == DATAWRITER_QOS_DEFAULT) {
1038  dw_qos = default_qos;
1039 
1040  } else if (qos == DATAWRITER_QOS_USE_TOPIC_QOS) {
1041  DDS::TopicQos topic_qos;
1042  a_topic->get_qos(topic_qos);
1043  dw_qos = default_qos;
1044 
1045  Qos_Helper::copy_from_topic_qos(dw_qos, topic_qos);
1046 
1047  } else {
1048  dw_qos = qos;
1049  }
1050 
1051  OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1052  OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1053  OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1054  OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(dw_qos, DDS::DataWriter::_nil());
1056 
1057  if (!Qos_Helper::valid(dw_qos)) {
1058  if (DCPS_debug_level > 0) {
1059  ACE_ERROR((LM_ERROR,
1060  ACE_TEXT("(%P|%t) ERROR: ")
1061  ACE_TEXT("PublisherImpl::create_datawriter, ")
1062  ACE_TEXT("invalid qos.\n")));
1063  }
1064  return DDS::DataWriter::_nil();
1065  }
1066 
1067  if (!Qos_Helper::consistent(dw_qos)) {
1068  if (DCPS_debug_level > 0) {
1069  ACE_ERROR((LM_ERROR,
1070  ACE_TEXT("(%P|%t) ERROR: ")
1071  ACE_TEXT("PublisherImpl::create_datawriter, ")
1072  ACE_TEXT("inconsistent qos.\n")));
1073  }
1074  return DDS::DataWriter::_nil();
1075  }
1076  return true;
1077 }
#define ACE_ERROR(X)
#define OPENDDS_NO_DURABILITY_SERVICE_COMPATIBILITY_CHECK(qos, error_rtn_value)
static bool consistent(const DDS::ResourceLimitsQosPolicy &resource_limits, const DDS::HistoryQosPolicy &history)
Definition: Qos_Helper.inl:596
#define OPENDDS_NO_OWNERSHIP_KIND_EXCLUSIVE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define OPENDDS_NO_OWNERSHIP_PROFILE_COMPATIBILITY_CHECK(qos, error_rtn_value)
#define OPENDDS_NO_OWNERSHIP_STRENGTH_COMPATIBILITY_CHECK(qos, error_rtn_value)
static bool valid(const DDS::UserDataQosPolicy &qos)
Definition: Qos_Helper.inl:723
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DATAWRITER_QOS_DEFAULT
static bool copy_from_topic_qos(DDS::DataReaderQos &a_datareader_qos, const DDS::TopicQos &a_topic_qos)
#define DATAWRITER_QOS_USE_TOPIC_QOS
#define OPENDDS_NO_DURABILITY_KIND_TRANSIENT_PERSISTENT_COMPATIBILITY_CHECK(qos, error_rtn_value)
Boolean is_nil(T x)

◆ wait_for_acknowledgments()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::wait_for_acknowledgments ( const DDS::Duration_t max_wait)
virtual

Definition at line 740 of file PublisherImpl.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::EntityImpl::enabled_, OpenDDS::DCPS::RcHandle< T >::in(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::OPENDDS_MAP(), pi_lock_, DDS::RELIABLE_RELIABILITY_QOS, DDS::RETCODE_ERROR, DDS::RETCODE_NOT_ENABLED, and DDS::RETCODE_OK.

742 {
743  if (!enabled_) {
744  if (DCPS_debug_level > 0) {
745  ACE_ERROR((LM_ERROR,
746  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
747  ACE_TEXT("Entity is not enabled.\n")));
748  }
750  }
751 
752  typedef OPENDDS_MAP(DataWriterImpl*, DataWriterImpl::AckToken) DataWriterAckMap;
753  DataWriterAckMap ack_writers;
754  {
756  guard,
757  this->pi_lock_,
759 
760  // Collect writers to request acks
761  for (DataWriterMap::iterator it(this->datawriter_map_.begin());
762  it != this->datawriter_map_.end(); ++it) {
763  DataWriterImpl_rch writer = it->second;
764  if (writer->qos_.reliability.kind != DDS::RELIABLE_RELIABILITY_QOS)
765  continue;
766  if (writer->should_ack()) {
767  DataWriterImpl::AckToken token = writer->create_ack_token(max_wait);
768 
769  std::pair<DataWriterAckMap::iterator, bool> pair =
770  ack_writers.insert(DataWriterAckMap::value_type(writer.in(), token));
771 
772  if (!pair.second) {
773  if (DCPS_debug_level > 0) {
774  ACE_ERROR((LM_ERROR,
775  ACE_TEXT("(%P|%t) ERROR: PublisherImpl::wait_for_acknowledgments, ")
776  ACE_TEXT("Unable to insert AckToken into DataWriterAckMap!\n")));
777  }
778  return DDS::RETCODE_ERROR;
779  }
780  }
781  }
782  }
783 
784  if (ack_writers.empty()) {
785  if (DCPS_debug_level > 0) {
786  ACE_DEBUG((LM_DEBUG,
787  ACE_TEXT("(%P|%t) PublisherImpl::wait_for_acknowledgments() - ")
788  ACE_TEXT("not blocking due to no writers requiring acks.\n")));
789  }
790 
791  return DDS::RETCODE_OK;
792  }
793 
794  // Wait for ack responses from all associated readers
795  for (DataWriterAckMap::iterator it(ack_writers.begin());
796  it != ack_writers.end(); ++it) {
797  DataWriterImpl::AckToken token = it->second;
798 
799  it->first->wait_for_specific_ack(token);
800  }
801 
802  return DDS::RETCODE_OK;
803 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
RcHandle< DataWriterImpl > DataWriterImpl_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.
AtomicBool enabled_
The flag indicates the entity is enabled.
Definition: EntityImpl.h:82
const ReturnCode_t RETCODE_NOT_ENABLED

◆ writer_enabled()

DDS::ReturnCode_t OpenDDS::DCPS::PublisherImpl::writer_enabled ( const char *  topic_name,
DataWriterImpl impl 
)

This method is called when the datawriter created by this publisher was enabled.

Definition at line 902 of file PublisherImpl.cpp.

References ACE_ERROR, ACE_GUARD_RETURN, ACE_TEXT(), datawriter_map_, OpenDDS::DCPS::DCPS_debug_level, LM_ERROR, monitor_, pi_lock_, publication_map_, OpenDDS::DCPS::rchandle_from(), DDS::RETCODE_ERROR, DDS::RETCODE_OK, and writers_not_enabled_.

904 {
906  guard,
907  this->pi_lock_,
909  DataWriterImpl_rch writer = rchandle_from(writer_ptr);
910  writers_not_enabled_.erase(writer);
911 
912  datawriter_map_.insert(DataWriterMap::value_type(topic_name, writer));
913 
914  const GUID_t publication_id = writer->get_guid();
915 
916  std::pair<PublicationMap::iterator, bool> pair =
917  publication_map_.insert(PublicationMap::value_type(publication_id, writer));
918 
919  if (!pair.second) {
920  if (DCPS_debug_level > 0) {
921  ACE_ERROR((LM_ERROR,
922  ACE_TEXT("(%P|%t) ERROR: ")
923  ACE_TEXT("PublisherImpl::writer_enabled: ")
924  ACE_TEXT("insert publication %C failed.\n"),
925  LogGuid(publication_id).c_str()));
926  }
927  return DDS::RETCODE_ERROR;
928  }
929 
930  if (this->monitor_) {
931  this->monitor_->report();
932  }
933 
934  return DDS::RETCODE_OK;
935 }
#define ACE_ERROR(X)
const ReturnCode_t RETCODE_OK
DataWriterSet writers_not_enabled_
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
unique_ptr< Monitor > monitor_
Monitor object for this entity.
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
const ReturnCode_t RETCODE_ERROR
RcHandle< DataWriterImpl > DataWriterImpl_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.

Friends And Related Function Documentation

◆ DataWriterImpl

friend class DataWriterImpl
friend

Definition at line 43 of file PublisherImpl.h.

Member Data Documentation

◆ aggregation_period_start_

MonotonicTimePoint OpenDDS::DCPS::PublisherImpl::aggregation_period_start_
private

Start of current aggregation period. - NOT USED IN FIRST IMPL.

Definition at line 203 of file PublisherImpl.h.

◆ change_depth_

std::size_t OpenDDS::DCPS::PublisherImpl::change_depth_
private

The number of times begin_coherent_changes as been called.

Definition at line 191 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), and end_coherent_changes().

◆ datawriter_map_

DataWriterMap OpenDDS::DCPS::PublisherImpl::datawriter_map_
private

◆ default_datawriter_qos_

DDS::DataWriterQos OpenDDS::DCPS::PublisherImpl::default_datawriter_qos_
private

Default datawriter Qos policy list.

Definition at line 171 of file PublisherImpl.h.

Referenced by create_datawriter(), get_default_datawriter_qos(), and set_default_datawriter_qos().

◆ domain_id_

DDS::DomainId_t OpenDDS::DCPS::PublisherImpl::domain_id_
private

Domain in which we are contained.

Definition at line 194 of file PublisherImpl.h.

Referenced by delete_datawriter(), and set_qos().

◆ handle_

DDS::InstanceHandle_t OpenDDS::DCPS::PublisherImpl::handle_
private

Definition at line 166 of file PublisherImpl.h.

Referenced by get_instance_handle(), and ~PublisherImpl().

◆ listener_

DDS::PublisherListener_var OpenDDS::DCPS::PublisherImpl::listener_
private

Used to notify the entity for relevant events.

Definition at line 179 of file PublisherImpl.h.

Referenced by get_listener(), listener_for(), and set_listener().

◆ listener_mask_

DDS::StatusMask OpenDDS::DCPS::PublisherImpl::listener_mask_
private

The StatusKind bit mask indicates which status condition change can be notified by the listener of this entity.

Definition at line 177 of file PublisherImpl.h.

Referenced by listener_for(), and set_listener().

◆ listener_mutex_

ACE_Thread_Mutex OpenDDS::DCPS::PublisherImpl::listener_mutex_
private

Mutex to protect listener info.

Definition at line 174 of file PublisherImpl.h.

Referenced by get_listener(), listener_for(), and set_listener().

◆ monitor_

unique_ptr<Monitor> OpenDDS::DCPS::PublisherImpl::monitor_
private

Monitor object for this entity.

Definition at line 213 of file PublisherImpl.h.

Referenced by delete_datawriter(), enable(), PublisherImpl(), and writer_enabled().

◆ participant_

WeakRcHandle<DomainParticipantImpl> OpenDDS::DCPS::PublisherImpl::participant_
private

The DomainParticipant servant that owns this Publisher.

Definition at line 196 of file PublisherImpl.h.

Referenced by create_datawriter(), delete_datawriter(), enable(), get_participant(), listener_for(), parent(), set_qos(), and ~PublisherImpl().

◆ pi_lock_

lock_type OpenDDS::DCPS::PublisherImpl::pi_lock_
mutableprivate

◆ pi_suspended_lock_

lock_type OpenDDS::DCPS::PublisherImpl::pi_suspended_lock_
mutableprivate

Definition at line 210 of file PublisherImpl.h.

Referenced by is_suspended(), resume_publications(), and suspend_publications().

◆ publication_map_

PublicationMap OpenDDS::DCPS::PublisherImpl::publication_map_
private

This map is used to support datawriter lookup by datawriter repository id.

Definition at line 188 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), delete_datawriter(), end_coherent_changes(), get_publication_ids(), is_clean(), resume_publications(), set_qos(), and writer_enabled().

◆ publisher_id_

GUID_t OpenDDS::DCPS::PublisherImpl::publisher_id_
private
Note
The publisher_id_ is not generated by repository, it's unique in DomainParticipant scope.

Definition at line 217 of file PublisherImpl.h.

◆ qos_

DDS::PublisherQos OpenDDS::DCPS::PublisherImpl::qos_
private

Publisher QoS policy list.

Definition at line 169 of file PublisherImpl.h.

Referenced by begin_coherent_changes(), create_datawriter(), enable(), end_coherent_changes(), get_qos(), and set_qos().

◆ reverse_pi_lock_

reverse_lock_type OpenDDS::DCPS::PublisherImpl::reverse_pi_lock_
private

Definition at line 209 of file PublisherImpl.h.

Referenced by delete_datawriter().

◆ sequence_number_

SequenceNumber OpenDDS::DCPS::PublisherImpl::sequence_number_
private

Unique sequence number used when the scope_access = GROUP.

  • NOT USED IN FIRST IMPL - not supporting GROUP scope

Definition at line 201 of file PublisherImpl.h.

◆ suspend_depth_count_

CORBA::Short OpenDDS::DCPS::PublisherImpl::suspend_depth_count_
private

The suspend depth count.

Definition at line 198 of file PublisherImpl.h.

Referenced by is_suspended(), resume_publications(), and suspend_publications().

◆ writers_not_enabled_

DataWriterSet OpenDDS::DCPS::PublisherImpl::writers_not_enabled_
private

Definition at line 182 of file PublisherImpl.h.

Referenced by create_datawriter(), enable(), and writer_enabled().


The documentation for this class was generated from the following files: