OpenDDS  Snapshot(2023/04/28-20:55)
InternalDataWriter.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_INTERNAL_DATA_WRITER_H
9 #define OPENDDS_DCPS_INTERNAL_DATA_WRITER_H
10 
11 #include "dcps_export.h"
12 
13 #ifndef ACE_LACKS_PRAGMA_ONCE
14 # pragma once
15 #endif /* ACE_LACKS_PRAGMA_ONCE */
16 
17 #include "RcObject.h"
18 #include "InternalDataReader.h"
19 
21 
22 namespace OpenDDS {
23 namespace DCPS {
24 
25 /*
26  The InternalDataWriter supports the following QoS:
27 
28  DurabilityQosPolicy durability; => VOLATILE, TRANSIENT_LOCAL
29  DurabilityServiceQosPolicy durability_service; => None
30  DeadlineQosPolicy deadline; => None
31  LatencyBudgetQosPolicy latency_budget => None
32  LivelinessQosPolicy liveliness => None
33  ReliabilityQosPolicy reliability => RELIABLE
34  DestinationOrderQosPolicy destination_order; => None
35  HistoryQosPolicy history; => KEEP_LAST_HISTORY, KEEP_ALL_HISTORY
36  ResourceLimitsQosPolicy resource_limits; => None
37  TransportPriorityQosPolicy transport_priority; => None
38  LifespanQosPolicy lifespan; => None
39  UserDataQosPolicy user_data; => None
40  OwnershipQosPolicy ownership; => None
41  OwnershipStrengthQosPolicy ownership_strength; => None
42  WriterDataLifecycleQosPolicy writer_data_lifecycle; => Yes
43 */
44 
45 template <typename T>
47 public:
50 
52  : qos_(qos)
53  {}
54 
55  /// @name InternalTopic Interface
56  /// @{
57  void add_reader(InternalDataReader_rch reader)
58  {
60  readers_.insert(reader);
61 
62  if (qos_.durability.kind == DDS::TRANSIENT_LOCAL_DURABILITY_QOS && reader->durable()) {
63  for (typename InstanceMap::iterator pos = instance_map_.begin(), limit = instance_map_.end();
64  pos != limit; ++pos) {
65  pos->second.add_reader(reader, static_rchandle_cast<InternalEntity>(rchandle_from(this)));
66  }
67  }
68  }
69 
70  void remove_reader(InternalDataReader_rch reader)
71  {
73  if (readers_.erase(reader)) {
74  reader->remove_publication(static_rchandle_cast<InternalEntity>(rchandle_from(this)), qos_.writer_data_lifecycle.autodispose_unregistered_instances);
75  }
76  }
77 
78  bool has_reader(InternalDataReader_rch reader)
79  {
81  return readers_.count(reader);
82  }
83 
85  {
87  }
88  /// @}
89 
90  /// @name User Interface
91  /// @{
92  void write(const T& sample)
93  {
95 
97  const std::pair<typename InstanceMap::iterator, bool> p = instance_map_.insert(std::make_pair(sample, SampleHolder()));
98  p.first->second.write(sample, qos_);
99  }
100 
101  for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
102  InternalDataReader_rch reader = pos->lock();
103  if (reader) {
104  reader->write(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
105  }
106  }
107  }
108 
109  void dispose(const T& sample)
110  {
112 
114  typename InstanceMap::iterator pos = instance_map_.find(sample);
115  if (pos != instance_map_.end()) {
116  pos->second.dispose();
117  }
118  }
119 
120  for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
121  InternalDataReader_rch reader = pos->lock();
122  if (reader) {
123  reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
124  }
125  }
126  }
127 
128  void unregister_instance(const T& sample)
129  {
131 
133  instance_map_.erase(sample);
134  }
135 
136  for (typename ReaderSet::const_iterator pos = readers_.begin(), limit = readers_.end(); pos != limit; ++pos) {
137  InternalDataReader_rch reader = pos->lock();
138  if (reader) {
140  reader->dispose(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
141  }
142  reader->unregister_instance(static_rchandle_cast<InternalEntity>(rchandle_from(this)), sample);
143  }
144  }
145  }
146  /// @}
147 
148 private:
150 
151  typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet;
152  ReaderSet readers_;
153 
154  class SampleHolder {
155  public:
156  bool empty() const { return samples_.empty(); }
157 
158  void add_reader(InternalDataReader_rch reader, RcHandle<InternalEntity> writer)
159  {
160  for (typename SampleList::const_iterator pos = samples_.begin(), limit = samples_.end(); pos != limit; ++pos) {
161  reader->write(writer, *pos);
162  }
163  }
164 
165  void write(const T& sample,
166  const DDS::DataWriterQos& qos)
167  {
168  samples_.push_back(sample);
170  while (samples_.size() > static_cast<std::size_t>(qos.history.depth)) {
171  samples_.pop_front();
172  }
173  }
174  }
175 
176  void dispose()
177  {
178  samples_.clear();
179  }
180 
181  private:
182  typedef OPENDDS_LIST(T) SampleList;
183  SampleList samples_;
184  };
185 
186  typedef OPENDDS_MAP_T(T, SampleHolder) InstanceMap;
187  InstanceMap instance_map_;
188 
190 };
191 
192 } // namespace DCPS
193 } // namespace OpenDDS
194 
196 
197 #endif /* OPENDDS_DCPS_INTERNAL_DATA_WRITER_H */
bool has_reader(InternalDataReader_rch reader)
RcHandle< T > rchandle_from(T *pointer)
Definition: RcHandle_T.h:310
HistoryQosPolicy history
#define ACE_GUARD(MUTEX, OBJ, LOCK)
WeakRcHandle< InternalDataReader< T > > InternalDataReader_wrch
DurabilityQosPolicy durability
HistoryQosPolicyKind kind
typedef OPENDDS_MAP_T(T, SampleHolder) InstanceMap
RcHandle< InternalDataReader< T > > InternalDataReader_rch
InternalDataWriter(const DDS::DataWriterQos &qos)
void unregister_instance(const T &sample)
void write(const T &sample, const DDS::DataWriterQos &qos)
DurabilityQosPolicyKind kind
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
typedef OPENDDS_SET(InternalDataReader_wrch) ReaderSet
void add_reader(InternalDataReader_rch reader, RcHandle< InternalEntity > writer)
void add_reader(InternalDataReader_rch reader)
RcHandle< T > static_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:202
void remove_reader(InternalDataReader_rch reader)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
WriterDataLifecycleQosPolicy writer_data_lifecycle
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
InternalEntity_wrch publication_handle()