OpenDDS  Snapshot(2023/04/28-20:55)
WriteDataContainer.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
9 #define OPENDDS_DCPS_WRITE_DATA_CONTAINER_H
10 
11 #include "DataSampleElement.h"
13 #include "WriterDataSampleList.h"
14 #include "DisjointSequence.h"
15 #include "PoolAllocator.h"
16 #include "PoolAllocationBase.h"
17 #include "Message_Block_Ptr.h"
18 #include "SporadicTask.h"
19 #include "ConditionVariable.h"
20 #include "TimeTypes.h"
21 
22 #include <dds/DdsDcpsInfrastructureC.h>
23 #include <dds/DdsDcpsCoreC.h>
24 
25 #include <ace/Synch_Traits.h>
26 #include <ace/Thread_Mutex.h>
28 #include <ace/Reverse_Lock_T.h>
29 
30 #include <memory>
31 
32 #if !defined (ACE_LACKS_PRAGMA_ONCE)
33 #pragma once
34 #endif /* ACE_LACKS_PRAGMA_ONCE */
35 
37 
38 namespace OpenDDS {
39 namespace DCPS {
40 
41 class InstanceDataSampleList;
42 class DataWriterImpl;
43 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
44 class DataDurabilityCache;
45 #endif
46 class FilterEvaluator;
47 
49  PublicationInstanceMapType;
50 
51 /**
52  * @class WriteDataContainer
53  *
54  * @brief A container for instances sample data.
55  *
56  * This container is instantiated per DataWriter. It maintains
57  * list of PublicationInstance objects which is internally
58  * referenced by the instance handle.
59  *
60  * This container contains threaded lists of all data written to a
61  * given DataWriter. The real data sample is represented by the
62  * DataSampleElement. The data_holder_ holds all
63  * DataSampleElement in the writing order via the
64  * next_writer_sample_/previous_writer_sample_ thread. The instance list in
65  * PublicationInstance links samples via the next_instance_sample_
66  * thread.
67  *
68  * There are three state transition lists used during write operations:
69  * unsent, sending, and sent. These lists are linked
70  * via the next_send_sample_/previous_send_sample_ thread. Any
71  * DataSampleElement should be in one of these three lists and
72  * SHOULD NOT be shared between these three lists.
73  * A normal transition of a DataSampleElement would be
74  * unsent->sending->sent. A DataSampleElement transitions from unsent
75  * to sent naturally during a write operation when get_unsent_data is called.
76  * A DataSampleElement transitions from sending to sent when data_delivered
77  * is called notifying the container that the transport is finished with the
78  * sample and it can be marked as a historical sample.
79  * A DataSampleElement may transition back to unsent from sending if
80  * the transport notifies that the sample was dropped and should be resent.
81  * A DataSampleElement is removed from sent list and freed when the instance
82  * queue or container (for another instance) needs more space.
83  * A DataSampleElement may be removed and freed directly from sending when
84  * and unreliable writer needs space. In this case the transport will be
85  * notified that the sample should be removed.
86  * A DataSampleElement may also be removed directly from unsent if an unreliable
87  * writer needs space for new samples.
88  * Note: The real data sample will be freed when the reference counting goes 0.
89  * The resend list is only used when the datawriter uses
90  * TRANSIENT_LOCAL_DURABILITY_QOS. It holds the DataSampleElements
91  * for the data sample duplicates of the sending and sent list and
92  * hands this list off to the transport.
93  *
94  *
95  *
96  * @note: 1) The PublicationInstance object is not removed from
97  * this container until the instance is
98  * unregistered. The same instance handle is reused for
99  * re-registration. The instance data is deleted when
100  * this container is deleted. This would simplify
101  * instance data memory management. An alternative way
102  * is to remove the handle from the instance list when
103  * unregister occurs and delete the instance data after
104  * the transport is done with the instance, but we do
105  * not have a way to know when the transport is done
106  * since we have the sending list for all instances in
107  * the same datawriter.
108  * 2) This container has the ownership of the instance data
109  * samples when the data is written. The type-specific
110  * datawriter that allocates the memory for the sample
111  * data gives the ownership to its base class.
112  * 3) It is the responsibility of the owner of objects of
113  * this class to ensure that access to the lists are
114  * properly locked. This means using the same
115  * lock/condition to access the lists via the enqueue(),
116  * get*(), and data_delivered() methods. For the case
117  * where these are all called from the same (client)
118  * thread, this should be a recursive lock so that: 1)
119  * we do not deadlock; and, 2) we incur the cost of
120  * obtaining the lock only once.
121  */
123 public:
124 
125  friend class DataWriterImpl;
126 
127  /**
128  * No default constructor, must be initialized.
129  */
131  /// The writer which owns this container.
132  DataWriterImpl* writer,
133  /// Max samples kept within each instance
134  CORBA::Long max_samples_per_instance,
135  CORBA::Long history_depth,
136  /// Max durable samples sent for each instance
137  CORBA::Long max_durable_per_instance,
138  /// The timeout for write.
139  DDS::Duration_t max_blocking_time,
140  /// The number of chunks that the DataSampleElementAllocator
141  /// needs allocate.
142  size_t n_chunks,
143  /// Domain ID.
144  DDS::DomainId_t domain_id,
145  /// Topic name.
146  char const * topic_name,
147  /// Type name.
148  char const * type_name,
149 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
150  /// The data durability cache for unsent data.
151  DataDurabilityCache * durability_cache,
152  /// DURABILITY_SERVICE QoS specific to the DataWriter.
153  DDS::DurabilityServiceQosPolicy const & durability_service,
154 #endif
155  /// maximum number of instances, 0 for unlimited
156  CORBA::Long max_instances,
157  /// maximum total number of samples, 0 for unlimited
158  CORBA::Long max_total_samples,
159  ACE_Recursive_Thread_Mutex& deadline_status_lock,
160  DDS::OfferedDeadlineMissedStatus& deadline_status,
161  CORBA::Long& deadline_last_total_count);
162 
164 
166  enqueue_control(DataSampleElement* control_sample);
167 
168  /**
169  * Enqueue the data sample in its instance thread. This method
170  * assumes there is an available space for the sample in the
171  * instance list.
172  */
174  enqueue(
175  DataSampleElement* sample,
176  DDS::InstanceHandle_t instance);
177 
178  /**
179  * Create a resend list with the copies of all current "sending"
180  * and "sent" samples. The samples will be sent to the
181  * subscriber specified.
182  */
183  DDS::ReturnCode_t reenqueue_all(const GUID_t& reader_id,
184  const DDS::LifespanQosPolicy& lifespan
185 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
186  ,
187  const OPENDDS_STRING& filterClassName,
188  const FilterEvaluator* eval,
189  const DDS::StringSeq& params
190 #endif
191  );
192 
193  /**
194  * Dynamically allocate a PublicationInstance object and add to
195  * the instances_ list.
196  *
197  * @note: The registered_sample is an input and output parameter.
198  * A shallow copy of the sample data will be given to
199  * datawriter as part of the control message.
200  */
202  register_instance(DDS::InstanceHandle_t& instance_handle,
203  Message_Block_Ptr& registered_sample);
204 
205  /**
206  * Remove the provided instance from the instances_ list.
207  * The registered sample data will be released upon the deletion
208  * of the PublicationInstance. A shallow copy of the sample data
209  * will be given to datawriter as part of the control message if
210  * the dup_registered_sample is true.
211  *
212  * This method returns error if the instance is not registered.
213  */
214  DDS::ReturnCode_t unregister(
215  DDS::InstanceHandle_t handle,
216  Message_Block_Ptr& registered_sample,
217  bool dup_registered_sample = true);
218 
219  /**
220  * Delete the samples for the provided instance.
221  * A shallow copy of the sample data will be given to datawriter
222  * as part of the control message if the dup_registered_sample
223  * is true.
224  *
225  * This method returns error if the instance is not registered.
226  */
228  DDS::InstanceHandle_t handle,
229  Message_Block_Ptr& registered_sample,
230  bool dup_registered_sample = true);
231 
232  /**
233  * Return the number of samples for the given instance.
234  */
235  DDS::ReturnCode_t num_samples(
236  DDS::InstanceHandle_t handle,
237  size_t& size);
238 
239  /**
240  * Return the number of samples for all instances.
241  */
242  size_t num_all_samples();
243 
244  /**
245  * Obtain a list of data that has not yet been sent. The data
246  * on the list returned is moved from the internal unsent_data_
247  * list to the internal sending_data_ list as part of this call.
248  * The entire list is linked via the
249  * DataSampleElement.next_send_sample_ link as well.
250  */
251  ACE_UINT64 get_unsent_data(SendStateDataSampleList& list);
252 
253  /**
254  * Obtain a list of data for resending. This is only used when
255  * TRANSIENT_LOCAL_DURABILITY_QOS is used. The data on the list
256  * returned is not put on any SendStateDataSampleList.
257  */
258  SendStateDataSampleList get_resend_data();
259 
260  /**
261  * Acknowledge the delivery of data. The sample that resides in
262  * this container will be moved from sending_data_ list to the
263  * internal sent_data_ list. If there are any threads waiting for
264  * available space, it wakes up these threads.
265  */
266  void data_delivered(const DataSampleElement* sample);
267 
268  /**
269  * This method is called by the transport to notify the sample
270  * is dropped. Which the transport was told to do by the
271  * publication code by calling
272  * TransportClient::remove_sample(). If the sample was
273  * "sending" then it is moved to the "unsent" list. If there are any
274  * threads waiting for available space then it needs wake up
275  * these threads. The dropped_by_transport flag true indicates
276  * the dropping initiated by transport when the transport send
277  * strategy is in a MODE_TERMINATED. The dropped_by_transport
278  * flag false indicates the dropping is initiated by the
279  * remove_sample and data_dropped() is a result of
280  * remove_sample().
281  */
282  void data_dropped(const DataSampleElement* element,
283  bool dropped_by_transport);
284 
285  DDS::ReturnCode_t obtain_buffer_for_control(DataSampleElement*& element);
286 
287  /**
288  * Allocate a DataSampleElement object and check the space
289  * availability for newly allocated element according to qos settings.
290  * For the blocking write case, if resource limits or history qos limits
291  * are reached, then it blocks for max blocking time for a previous sample
292  * to be delivered or dropped by the transport. In non-blocking write
293  * case, if resource limits or history qos limits are reached, will attempt
294  * to remove oldest samples (forcing the transport to drop samples if necessary)
295  * to make space. If there are several threads waiting then
296  * the first one in the waiting list can enqueue, others continue
297  * waiting. Note: the lock should be held before calling this method
298  */
299  DDS::ReturnCode_t obtain_buffer(
300  DataSampleElement*& element,
301  DDS::InstanceHandle_t handle);
302 
303  /**
304  * Release the memory previously allocated.
305  * This method is corresponding to the obtain_buffer method. If
306  * the memory is allocated by some allocator then the memory
307  * needs to be released to the allocator.
308  */
309  void release_buffer(DataSampleElement* element);
310 
311  /**
312  * Unregister all instances managed by this data containers.
313  */
314  void unregister_all();
315 
316  /**
317  * @todo remove/document this!
318  */
319  PublicationInstance_rch get_handle_instance(
320  DDS::InstanceHandle_t handle);
321 
322 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
323  /**
324  * Copy sent data to data DURABILITY cache.
325  */
326  bool persist_data();
327 #endif
328 
329  /**
330  * Block until pending samples have either been delivered, dropped, or the
331  * deadline has passed. Blocks indefinitely if deadline is zero.
332  */
333  void wait_pending(const MonotonicTimePoint& deadline);
334 
335  /**
336  * Returns a vector of handles for the instances registered for this
337  * data writer.
338  */
339  typedef OPENDDS_VECTOR(DDS::InstanceHandle_t) InstanceHandleVec;
340  void get_instance_handles(InstanceHandleVec& instance_handles);
341 
342  DDS::ReturnCode_t wait_ack_of_seq(const MonotonicTimePoint& abs_deadline,
343  bool deadline_is_infinite,
344  const SequenceNumber& sequence);
345 
346  bool sequence_acknowledged(const SequenceNumber& sequence);
347 
348 private:
349 
350  DDS::ReturnCode_t remove_instance(PublicationInstance_rch instance,
351  Message_Block_Ptr& registered_sample,
352  bool dup_registered_sample);
353 
354  // A class, normally provided by an unit test, that needs access to
355  // private methods/members.
356  friend class ::DDS_TEST;
357 
358  // --------------------------
359  // Preventing copying
360  // --------------------------
362  WriteDataContainer & operator= (WriteDataContainer const &);
363  // --------------------------
364 
365  /**
366  * Returns if pending data exists. This includes
367  * sending, and unsent data.
368  */
369  bool pending_data();
370 
371  void copy_and_prepend(SendStateDataSampleList& list,
372  const SendStateDataSampleList& appended,
373  const GUID_t& reader_id,
374  const DDS::LifespanQosPolicy& lifespan,
375 #ifndef OPENDDS_NO_CONTENT_FILTERED_TOPIC
376  const OPENDDS_STRING& filterClassName,
377  const FilterEvaluator* eval,
378  const DDS::StringSeq& params,
379 #endif
380  ssize_t& max_resend_samples);
381 
382  /**
383  * Remove the oldest "n" samples from each instance list that are
384  * in a state such that they could only be used for durability
385  * purposes (see reenqueue_all).
386  * "n" is determined by max_durable_per_instance_, so these samples
387  * are truly unneeded -- there are max_durable_per_instance_ newer
388  * samples available in the instance.
389  */
390  void remove_excess_durable();
391 
392  /**
393  * Remove the oldest sample (head) from the instance history list.
394  * This method also updates the internal lists to reflect
395  * the change.
396  * If the sample is in the unsent_data_ or sent_data_ list then
397  * it will be released. If the sample is in the sending_data_ list
398  * then the transport will be notified to release the sample, then
399  * the sample will be released. Otherwise an error
400  * is returned.
401  * The "released" boolean value indicates whether the sample is
402  * released.
403  */
404  DDS::ReturnCode_t remove_oldest_sample(
405  InstanceDataSampleList& instance_list,
406  bool& released);
407 
408  /**
409  * Called when data has been dropped or delivered and any
410  * blocked writers should be notified
411  */
412  void wakeup_blocking_writers (DataSampleElement* stale);
413 
414  void add_reader_acks(const GUID_t& reader, const SequenceNumber& base);
415  void remove_reader_acks(const GUID_t& reader);
416 
417 private:
418 
419  void log_send_state_lists (OPENDDS_STRING description);
420 
421 #ifdef ACE_HAS_CPP11
422  typedef OPENDDS_UNORDERED_MAP(GUID_t, DisjointSequence) AckedSequenceMap;
423 #else
424  typedef OPENDDS_MAP_CMP(GUID_t, DisjointSequence, GUID_tKeyLessThan) AckedSequenceMap;
425 #endif
426  AckedSequenceMap acked_sequences_;
429 
430  SequenceNumber get_cumulative_ack();
431  SequenceNumber get_last_ack();
432  void update_acked(const SequenceNumber& seq, const GUID_t& id = GUID_UNKNOWN);
433  bool sequence_acknowledged_i(const SequenceNumber& sequence);
434 
435  /// List of data that has not been sent yet.
437 
438  /// Id used to keep track of which send transaction
439  /// DataWriter is currently creating
441 
442  /// List of data that is currently being sent.
444 
445  /// List of data that has already been sent.
447 
448  /// List of data that has been released by WriteDataContainer
449  /// but is still in process of delivery (or dropping) by transport
451 
452  /// The list of all samples written to this datawriter in
453  /// writing order.
455 
456  /// List of the data reenqueued to support the
457  /// TRANSIENT_LOCAL_DURABILITY_QOS policy. It duplicates the
458  /// samples in sent and sending list. This
459  /// list will be passed to the transport for re-sending.
461 
462  /// The individual instance queue threads in the data.
463  PublicationInstanceMapType instances_;
464 
465  /// The publication Id from repo.
467 
468  /// The writer that owns this container.
470 
471  /// The maximum size a container should allow for
472  /// an instance sample list
474 
476 
477  /// The maximum number of samples from each instance that
478  /// can be added to the resend_data_ for durability.
480 
481  /// The maximum number of instances allowed or zero
482  /// to indicate unlimited.
483  /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
484  /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
486 
487  /// The maximum number of samples allowed or zero
488  /// to indicate unlimited.
489  /// It corresponds to the QoS.RESOURCE_LIMITS.max_instances
490  /// when QoS.RELIABILITY.kind == DDS::RELIABLE_RELIABILITY_QOS
491  /// It also covers QoS.RESOURCE_LIMITS.max_samples and
492  /// max_instances * max_samples_per_instance
494 
495  /// The maximum time to block on write operation.
496  /// This comes from DataWriter's QoS HISTORY.max_blocking_time
498 
499  /// The block waiting flag.
501 
502  /// This lock is used to protect the container and the map
503  /// in the type-specific DataWriter.
504  /// This lock can be accessible via the datawriter.
505  /// This lock is made to be globally accessible for
506  /// performance concern. The lock is acquired as the external
507  /// call (e.g. FooDataWriterImpl::write) started and the
508  /// same lock will be used by the transport thread to notify
509  /// the datawriter the data is delivered. Other internal
510  /// operations will not lock.
513  ConditionVariableType condition_;
514  ConditionVariableType empty_condition_;
515 
516  /// Lock used for wait_for_acks() processing.
518 
520  /// Used to block in wait_for_acks().
521  WfaConditionVariableType wfa_condition_;
522 
523  /// The number of chunks that sample_list_element_allocator_
524  /// needs initialize.
525  size_t n_chunks_;
526 
527  /// The cached allocator to allocate DataSampleElement
528  /// objects.
530 
531  /// The flag indicates the datawriter will be destroyed.
532  bool shutdown_;
533 
534  /// Domain ID.
536 
537  /// Topic name.
538  char const * const topic_name_;
539 
540  /// Type name.
541  char const * const type_name_;
542 
543 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
544 
545  /// Pointer to the data durability cache.
546  /**
547  * This a pointer to the data durability cache owned by the
548  * Service Participant singleton, which means this cache is also
549  * a singleton.
550  */
552 
553  /// DURABILITY_SERVICE QoS specific to the DataWriter.
555 
556 #endif
557 
558  /// Timer responsible for reporting missed offered deadlines.
560  TimeDuration deadline_period_; // TimeDuration::zero_value means no deadline.
562  DeadlineMapType deadline_map_;
563 
564  /// Lock for synchronization of @c status_ member.
566 
567  /// Reference to the missed requested deadline status structure.
569 
570  /// Last total_count when status was last checked.
572 
573  void set_deadline_period(const TimeDuration& deadline_period);
574  void process_deadlines(const MonotonicTimePoint& now);
575  void extend_deadline(const PublicationInstance_rch& instance);
576  void cancel_deadline(const PublicationInstance_rch& instance);
577 };
578 
579 } /// namespace DCPS
580 } /// namespace OpenDDS
581 
583 
584 #endif /* OPENDDS_DCPS_WRITE_DATA_CONTAINER_H */
DataSampleElementAllocator sample_list_element_allocator_
RcHandle< PublicationInstance > PublicationInstance_rch
ACE_CDR::Long Long
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
GUID_t publication_id_
The publication Id from repo.
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
#define OPENDDS_MULTIMAP(K, T)
SendStateDataSampleList orphaned_to_transport_
ACE_Recursive_Thread_Mutex & deadline_status_lock_
Lock for synchronization of status_ member.
int ssize_t
::DDS::ReturnCode_t dispose(in<%SCOPED%> instance_data, in ::DDS::InstanceHandle_t instance_handle)
#define OPENDDS_STRING
DOMAINID_TYPE_NATIVE DomainId_t
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
ACE_Thread_Mutex wfa_lock_
Lock used for wait_for_acks() processing.
DDS::DurabilityServiceQosPolicy const & durability_service_
DURABILITY_SERVICE QoS specific to the DataWriter.
SendStateDataSampleList sending_data_
List of data that is currently being sent.
typedef OPENDDS_MAP_CMP(GUID_t, WriterCoherentSample, GUID_tKeyLessThan) GroupCoherentSamples
HANDLE_TYPE_NATIVE InstanceHandle_t
Definition: DdsDcpsCore.idl:51
ACE_Recursive_Thread_Mutex lock_
PublicationInstanceMapType instances_
The individual instance queue threads in the data.
bool waiting_on_release_
The block waiting flag.
unsigned long long ACE_UINT64
DDS::OfferedDeadlineMissedStatus & deadline_status_
Reference to the missed requested deadline status structure.
SendStateDataSampleList resend_data_
WfaConditionVariableType wfa_condition_
Used to block in wait_for_acks().
DataWriterImpl * writer_
The writer that owns this container.
CORBA::Long & deadline_last_total_count_
Last total_count when status was last checked.
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
SendStateDataSampleList sent_data_
List of data that has already been sent.
DDS::DomainId_t const domain_id_
Domain ID.
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
DataDurabilityCache *const durability_cache_
Pointer to the data durability cache.
char const *const topic_name_
Topic name.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
A container for instances sample data.
ConditionVariable< ACE_Thread_Mutex > WfaConditionVariableType
SendStateDataSampleList unsent_data_
List of data that has not been sent yet.
ConditionVariableType empty_condition_
char const *const type_name_
Type name.
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
ConditionVariable< ACE_Recursive_Thread_Mutex > ConditionVariableType
sequence< string > StringSeq
Definition: DdsDcpsCore.idl:50
bool shutdown_
The flag indicates the datawriter will be destroyed.
RcHandle< DCPS::PmfSporadicTask< WriteDataContainer > > deadline_task_
Timer responsible for reporting missed offered deadlines.