OpenDDS  Snapshot(2023/04/28-20:55)
PublisherImpl.h
Go to the documentation of this file.
1 /*
2  * Distributed under the OpenDDS License.
3  * See: http://www.opendds.org/license.html
4  */
5 
6 #ifndef OPENDDS_DCPS_PUBLISHER_IMPL_H
7 #define OPENDDS_DCPS_PUBLISHER_IMPL_H
8 
9 #include "EntityImpl.h"
10 #include "DataWriterImpl.h"
11 
12 #include <dds/DdsDcpsInfoUtilsC.h>
13 
14 #include <ace/Reverse_Lock_T.h>
15 
16 #ifndef ACE_LACKS_PRAGMA_ONCE
17 # pragma once
18 #endif
19 
21 
22 namespace OpenDDS {
23 namespace DCPS {
24 
25 class DomainParticipantImpl;
26 class Monitor;
27 
28 /**
29 * @class PublisherImpl
30 *
31 * @brief Implements the OpenDDS::DCPS::Publisher interfaces.
32 *
33 * This class acts as a factory and container of the datawriter.
34 *
35 * See the DDS specification, OMG formal/2015-04-10, for a description of
36 * the interface this class is implementing.
37 */
39  : public virtual LocalObject<DDS::Publisher>
40  , public virtual EntityImpl {
41 public:
42 
43  friend class DataWriterImpl;
44 
46  GUID_t id,
47  const DDS::PublisherQos& qos,
48  DDS::PublisherListener_ptr a_listener,
49  const DDS::StatusMask& mask,
50  DomainParticipantImpl* participant);
51 
52  virtual ~PublisherImpl();
53 
54  virtual DDS::InstanceHandle_t get_instance_handle();
55 
56  bool contains_writer(DDS::InstanceHandle_t a_handle);
57 
58  virtual DDS::DataWriter_ptr create_datawriter(
59  DDS::Topic_ptr a_topic,
60  const DDS::DataWriterQos& qos,
61  DDS::DataWriterListener_ptr a_listener,
62  DDS::StatusMask mask);
63 
64  virtual DDS::ReturnCode_t delete_datawriter(
65  DDS::DataWriter_ptr a_datawriter);
66 
67  virtual DDS::DataWriter_ptr lookup_datawriter(
68  const char* topic_name);
69 
70  virtual DDS::ReturnCode_t delete_contained_entities();
71 
72  virtual DDS::ReturnCode_t set_qos(
73  const DDS::PublisherQos& qos);
74 
75  virtual DDS::ReturnCode_t get_qos(
76  DDS::PublisherQos& qos);
77 
78  virtual DDS::ReturnCode_t set_listener(
79  DDS::PublisherListener_ptr a_listener,
80  DDS::StatusMask mask);
81 
82  virtual DDS::PublisherListener_ptr get_listener();
83 
84  virtual DDS::ReturnCode_t suspend_publications();
85 
86  virtual DDS::ReturnCode_t resume_publications();
87 
88 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
89 
90  virtual DDS::ReturnCode_t begin_coherent_changes();
91 
92  virtual DDS::ReturnCode_t end_coherent_changes();
93 
94 #endif
95 
96  virtual DDS::ReturnCode_t wait_for_acknowledgments(
97  const DDS::Duration_t& max_wait);
98 
99  virtual DDS::DomainParticipant_ptr get_participant();
100 
101  virtual DDS::ReturnCode_t set_default_datawriter_qos(
102  const DDS::DataWriterQos& qos);
103 
104  virtual DDS::ReturnCode_t get_default_datawriter_qos(
105  DDS::DataWriterQos& qos);
106 
107  virtual DDS::ReturnCode_t copy_from_topic_qos(
108  DDS::DataWriterQos& a_datawriter_qos,
109  const DDS::TopicQos& a_topic_qos);
110 
111  virtual DDS::ReturnCode_t enable();
112 
113  ACE_Recursive_Thread_Mutex& get_pi_lock() { return pi_lock_; }
114 
115  /**
116  * This method is not defined in the IDL and is defined for
117  * internal use.
118  * Check if there is any datawriter associated with this publisher.
119  */
120  bool is_clean(String* leftover_entities = 0) const;
121 
122  /** This method is called when the datawriter created by this
123  * publisher was enabled.
124  */
125  DDS::ReturnCode_t writer_enabled(const char* topic_name,
126  DataWriterImpl* impl);
127 
128  /**
129  * This is used to retrieve the listener for a certain status change.
130  * If this publisher has a registered listener and the status kind
131  * is in the listener mask then the listener is returned.
132  * Otherwise, the query for listener is propagated up to the
133  * factory/DomainParticipant.
134  */
135  DDS::PublisherListener_ptr listener_for(::DDS::StatusKind kind);
136 
137  DDS::ReturnCode_t assert_liveliness_by_participant();
138  TimeDuration liveliness_check_interval(DDS::LivelinessQosPolicyKind kind);
139  bool participant_liveliness_activity_after(const MonotonicTimePoint& tv);
140 
141  typedef OPENDDS_VECTOR(GUID_t) PublicationIdVec;
142  /// Populates a std::vector with the GUID_ts
143  /// of this Publisher's Data Writers
144  void get_publication_ids(PublicationIdVec& pubs);
145 
146  bool is_suspended() const;
147 
148  virtual RcHandle<EntityImpl> parent() const;
149  static bool validate_datawriter_qos(const DDS::DataWriterQos& qos,
150  const DDS::DataWriterQos& default_qos,
151  DDS::Topic_ptr a_topic,
152  DDS::DataWriterQos& dw_qos);
153 
154  bool prepare_to_delete_datawriters();
155  bool set_wait_pending_deadline(const MonotonicTimePoint& deadline);
156 
157 private:
158  typedef OPENDDS_MULTIMAP(OPENDDS_STRING, DataWriterImpl_rch) DataWriterMap;
159 
161  PublicationMap;
162 
163  // DataWriter id to qos map.
165 
167 
168  /// Publisher QoS policy list.
170  /// Default datawriter Qos policy list.
172 
173  /// Mutex to protect listener info
175  /// The StatusKind bit mask indicates which status condition change
176  /// can be notified by the listener of this entity.
178  /// Used to notify the entity for relevant events.
179  DDS::PublisherListener_var listener_;
180 
181  typedef OPENDDS_SET(DataWriterImpl_rch) DataWriterSet;
182  DataWriterSet writers_not_enabled_;
183 
184  /// This map is used to support datawriter lookup by topic name.
185  DataWriterMap datawriter_map_;
186  /// This map is used to support datawriter lookup by datawriter
187  /// repository id.
188  PublicationMap publication_map_;
189 #ifndef OPENDDS_NO_OBJECT_MODEL_PROFILE
190  /// The number of times begin_coherent_changes as been called.
191  std::size_t change_depth_;
192 #endif
193  /// Domain in which we are contained.
195  /// The DomainParticipant servant that owns this Publisher.
197  /// The suspend depth count.
199  /// Unique sequence number used when the scope_access = GROUP.
200  /// - NOT USED IN FIRST IMPL - not supporting GROUP scope
202  /// Start of current aggregation period. - NOT USED IN FIRST IMPL
204 
207  /// The recursive lock to protect datawriter map and suspend count.
208  mutable lock_type pi_lock_;
209  reverse_lock_type reverse_pi_lock_;
210  mutable lock_type pi_suspended_lock_;
211 
212  /// Monitor object for this entity
214 
215  /// @note The publisher_id_ is not generated by repository, it's unique
216  /// in DomainParticipant scope.
218 };
219 
220 } // namespace DDS
221 } // namespace OpenDDS
222 
224 
225 #endif /* OPENDDS_DCPS_PUBLISHER_IMPL_H */
DDS::PublisherQos qos_
Publisher QoS policy list.
DataWriterMap datawriter_map_
This map is used to support datawriter lookup by topic name.
Implements the OpenDDS::DCPS::DomainParticipant interfaces.
Implements the OpenDDS::DCPS::Entity interfaces.
Definition: EntityImpl.h:37
DDS::PublisherListener_var listener_
Used to notify the entity for relevant events.
ACE_Reverse_Lock< lock_type > reverse_lock_type
std::string String
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
ACE_CDR::Short Short
DDS::InstanceHandle_t handle_
#define OPENDDS_MULTIMAP(K, T)
DDS::StatusMask listener_mask_
ACE_Recursive_Thread_Mutex lock_type
DDS::DomainId_t domain_id_
Domain in which we are contained.
Implements the OpenDDS::DCPS::Publisher interfaces.
Definition: PublisherImpl.h:38
unique_ptr< Monitor > monitor_
Monitor object for this entity.
#define OPENDDS_STRING
DOMAINID_TYPE_NATIVE DomainId_t
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
std::size_t change_depth_
The number of times begin_coherent_changes as been called.
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
unsigned long StatusMask
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
MonotonicTimePoint aggregation_period_start_
Start of current aggregation period. - NOT USED IN FIRST IMPL.
CORBA::Short suspend_depth_count_
The suspend depth count.
DataWriterSet writers_not_enabled_
ACE_Thread_Mutex listener_mutex_
Mutex to protect listener info.
WeakRcHandle< DomainParticipantImpl > participant_
The DomainParticipant servant that owns this Publisher.
unsigned long StatusKind
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
reverse_lock_type reverse_pi_lock_
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
DDS::DataWriterQos default_datawriter_qos_
Default datawriter Qos policy list.
LivelinessQosPolicyKind
ACE_Recursive_Thread_Mutex & get_pi_lock()
typedef OPENDDS_SET(NetworkAddress) AddrSet
lock_type pi_lock_
The recursive lock to protect datawriter map and suspend count.