OpenDDS  Snapshot(2023/04/28-20:55)
DataDurabilityCache.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_DATADURABILITYCACHE_H
9 #define OPENDDS_DCPS_DATADURABILITYCACHE_H
10 
11 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
12 
13 #include "dds/DdsDcpsInfrastructureC.h"
14 
15 #if !defined (ACE_LACKS_PRAGMA_ONCE)
16 # pragma once
17 #endif /* ACE_LACKS_PRAGMA_ONCE */
18 
19 #include "DurabilityArray.h"
20 #include "DurabilityQueue.h"
21 #include "FileSystemStorage.h"
22 #include "PoolAllocator.h"
23 #include "unique_ptr.h"
24 
25 
27 #include "ace/Array_Base.h"
28 #include "ace/String_Base.h"
29 #include "ace/SStringfwd.h"
30 #include "ace/Thread_Mutex.h"
31 #include "ace/Null_Mutex.h"
32 #include "ace/Synch_Traits.h"
33 #include "ace/Functor_T.h"
34 #include "PoolAllocator.h"
35 
36 #include <memory>
37 #include <utility>
38 
40 class ACE_Message_Block;
42 
44 
45 namespace DDS {
46 
48 struct LifespanQosPolicy;
49 
50 } // namespace DDS
51 
52 namespace OpenDDS {
53 namespace DCPS {
54 
55 
56 class DataWriterImpl;
57 class DataSampleElement;
59 class WriteDataContainer;
60 
61 /**
62  * @class DataDurabilityCache
63  *
64  * @brief Underlying data cache for both OpenDDS @c TRANSIENT and
65  * @c PERSISTENT @c DURABILITY implementations..
66  *
67  * This class implements a cache that outlives @c DataWriters.
68  */
70 public:
71 
72  /**
73  * @class key_type
74  *
75  * @brief Key type for underlying maps.
76  *
77  * Each sample may be uniquely identified by its domain ID,
78  * topic name and type name. We use that property to establish
79  * a map key type.
80  */
81  class key_type {
82  public:
83 
85  : domain_id_()
86  , topic_name_()
87  , type_name_()
88  {}
89 
91  char const * topic,
92  char const * type,
93  ACE_Allocator * allocator)
94  : domain_id_(domain_id)
95  , topic_name_(topic, allocator)
96  , type_name_(type, allocator) {
97  }
98 
99  key_type(key_type const & rhs)
100  : domain_id_(rhs.domain_id_)
101  , topic_name_(rhs.topic_name_)
102  , type_name_(rhs.type_name_) {
103  }
104 
105  key_type & operator= (key_type const & rhs) {
106  this->domain_id_ = rhs.domain_id_;
107  this->topic_name_ = rhs.topic_name_;
108  this->type_name_ = rhs.type_name_;
109 
110  return *this;
111  }
112 
113  bool operator== (key_type const & rhs) const {
114  return
115  this->domain_id_ == rhs.domain_id_
116  && this->topic_name_ == rhs.topic_name_
117  && this->type_name_ == rhs.type_name_;
118  }
119 
120  bool operator<(key_type const & rhs) const {
121  return
122  this->domain_id_ < rhs.domain_id_
123  && this->topic_name_ < rhs.topic_name_
124  && this->type_name_ < rhs.type_name_;
125  }
126 
127  u_long hash() const {
128  return
129  static_cast<u_long>(this->domain_id_)
130  + this->topic_name_.hash()
131  + this->type_name_.hash();
132  }
133 
134  private:
135 
139 
140  };
141 
142  /**
143  * @class sample_data_type
144  *
145  * @brief Sample list data type for all samples.
146  */
148  public:
149 
152  ACE_Allocator * allocator);
154  const ACE_Message_Block & mb,
155  ACE_Allocator * allocator);
156  sample_data_type(sample_data_type const & rhs);
157 
158  ~sample_data_type();
159 
160  sample_data_type & operator= (sample_data_type const & rhs);
161 
162  void get_sample(char const *& s,
163  size_t & len,
164  DDS::Time_t & source_timestamp);
165 
166  void set_allocator(ACE_Allocator * allocator);
167 
168  private:
169  void init(const ACE_Message_Block * data);
170 
171  size_t length_;
172  char * sample_;
175 
176  };
177 
178  /**
179  * @typedef Define an ACE array of ACE queues to simplify
180  * access to data corresponding to a specific
181  * DurabilityServiceQosPolicy's cleanup delay.
182  */
183  typedef DurabilityArray<
185 
187  sample_list_type *> sample_map_type;
188  typedef OPENDDS_LIST(long) timer_id_list_type;
189 
191 
193  ACE_CString & data_dir);
194 
196 
197  /// Insert the samples corresponding to the given topic instance
198  /// (uniquely identify by its domain, topic name and type name)
199  /// into the data durability cache.
200  bool insert(DDS::DomainId_t domain_id,
201  char const * topic_name,
202  char const * type_name,
203  SendStateDataSampleList & the_data,
204  DDS::DurabilityServiceQosPolicy const & qos);
205 
206  /// Write cached data corresponding to given domain, topic and
207  /// type to @c DataWriter.
208  bool get_data(DDS::DomainId_t domain_id,
209  char const * topic_name,
210  char const * type_name,
211  DataWriterImpl * data_writer,
212  ACE_Allocator * mb_allocator,
213  ACE_Allocator * db_allocator,
214  DDS::LifespanQosPolicy const & /* lifespan */);
215 
216 private:
217 
218  // Prevent copying.
220  DataDurabilityCache & operator= (DataDurabilityCache const &);
221 
222  void init();
223 
224 private:
225 
226  /// Allocator used to allocate memory for sample map and lists.
228 
230 
232 
233  /// Map of all data samples.
234  sample_map_type * samples_;
235 
236  /// Timer ID list.
237  /**
238  * Keep track of cleanup timer IDs in case we need to cancel
239  * before they expire.
240  */
241  timer_id_list_type cleanup_timer_ids_;
242 
243  /// Lock for synchronized access to the underlying map.
245 
246  /// Reactor with which cleanup timers will be registered.
248 
249 };
250 
251 } // namespace DCPS
252 } // namespace OpenDDS
253 
255 
256 #endif /* OPENDDS_NO_PERSISTENCE_PROFILE */
257 
258 #endif /* OPENDDS_DATA_DURABILITY_CACHE_H */
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
DurabilityArray< DurabilityQueue< sample_data_type > * > sample_list_type
#define ACE_SYNCH_MUTEX
#define OPENDDS_LIST(T)
DDS::DurabilityQosPolicyKind kind_
ACE_SYNCH_MUTEX lock_
Lock for synchronized access to the underlying map.
Underlying data cache for both OpenDDS TRANSIENT and PERSISTENT DURABILITY implementations..
unique_ptr< ACE_Allocator > const allocator_
Allocator used to allocate memory for sample map and lists.
bool operator==(const DisjointSequence::OrderedRanges< T > &a, const DisjointSequence::OrderedRanges< T > &b)
Queue class that provides a means to reset the underlying ACE_Allocator.
DOMAINID_TYPE_NATIVE DomainId_t
Implements the OpenDDS::DCPS::DataWriterRemote interfaces and DDS::DataWriter interfaces.
sample_map_type * samples_
Map of all data samples.
Array class that provides a means to reset the underlying ACE_Allocator.
#define ACE_END_VERSIONED_NAMESPACE_DECL
ACE_Hash_Map_With_Allocator< key_type, sample_list_type * > sample_map_type
int init(void)
DurabilityQosPolicyKind
The End User API.
timer_id_list_type cleanup_timer_ids_
Timer ID list.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
ACE_Reactor_Timer_Interface * reactor_
Reactor with which cleanup timers will be registered.
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
A container for instances sample data.
key_type(DDS::DomainId_t domain_id, char const *topic, char const *type, ACE_Allocator *allocator)
ACE_TCHAR * timestamp(const ACE_Time_Value &time_value, ACE_TCHAR date_and_time[], size_t time_len, bool return_pointer_to_first_digit=false)