DataDurabilityCache.h
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DATA_DURABILITY_CACHE_H
00009 #define OPENDDS_DATA_DURABILITY_CACHE_H
00010
00011 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00012
00013 #include "dds/DdsDcpsInfrastructureC.h"
00014
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif
00018
00019 #include "dds/DCPS/DurabilityArray.h"
00020 #include "dds/DCPS/DurabilityQueue.h"
00021 #include "dds/DCPS/FileSystemStorage.h"
00022 #include "dds/DCPS/PoolAllocator.h"
00023 #include "dds/DCPS/unique_ptr.h"
00024
00025
00026 #include "ace/Hash_Map_With_Allocator_T.h"
00027 #include "ace/Array_Base.h"
00028 #include "ace/String_Base.h"
00029 #include "ace/SStringfwd.h"
00030 #include "ace/Thread_Mutex.h"
00031 #include "ace/Null_Mutex.h"
00032 #include "ace/Synch_Traits.h"
00033 #include "ace/Functor_T.h"
00034 #include "PoolAllocator.h"
00035
00036 #include <memory>
00037 #include <utility>
00038
00039 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00040 class ACE_Message_Block;
00041 ACE_END_VERSIONED_NAMESPACE_DECL
00042
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044
00045 namespace DDS {
00046
00047 struct DurabilityServiceQosPolicy;
00048 struct LifespanQosPolicy;
00049
00050 }
00051
00052 namespace OpenDDS {
00053 namespace DCPS {
00054
00055
00056 class DataWriterImpl;
00057 class DataSampleElement;
00058 class SendStateDataSampleList;
00059 class WriteDataContainer;
00060
00061
00062
00063
00064
00065
00066
00067
00068
00069 class DataDurabilityCache {
00070 public:
00071
00072
00073
00074
00075
00076
00077
00078
00079
00080
00081 class key_type {
00082 public:
00083
00084 key_type()
00085 : domain_id_()
00086 , topic_name_()
00087 , type_name_()
00088 {}
00089
00090 key_type(DDS::DomainId_t domain_id,
00091 char const * topic,
00092 char const * type,
00093 ACE_Allocator * allocator)
00094 : domain_id_(domain_id)
00095 , topic_name_(topic, allocator)
00096 , type_name_(type, allocator) {
00097 }
00098
00099 key_type(key_type const & rhs)
00100 : domain_id_(rhs.domain_id_)
00101 , topic_name_(rhs.topic_name_)
00102 , type_name_(rhs.type_name_) {
00103 }
00104
00105 key_type & operator= (key_type const & rhs) {
00106 this->domain_id_ = rhs.domain_id_;
00107 this->topic_name_ = rhs.topic_name_;
00108 this->type_name_ = rhs.type_name_;
00109
00110 return *this;
00111 }
00112
00113 bool operator== (key_type const & rhs) const {
00114 return
00115 this->domain_id_ == rhs.domain_id_
00116 && this->topic_name_ == rhs.topic_name_
00117 && this->type_name_ == rhs.type_name_;
00118 }
00119
00120 bool operator<(key_type const & rhs) const {
00121 return
00122 this->domain_id_ < rhs.domain_id_
00123 && this->topic_name_ < rhs.topic_name_
00124 && this->type_name_ < rhs.type_name_;
00125 }
00126
00127 u_long hash() const {
00128 return
00129 static_cast<u_long>(this->domain_id_)
00130 + this->topic_name_.hash()
00131 + this->type_name_.hash();
00132 }
00133
00134 private:
00135
00136 DDS::DomainId_t domain_id_;
00137 ACE_CString topic_name_;
00138 ACE_CString type_name_;
00139
00140 };
00141
00142
00143
00144
00145
00146
00147 class sample_data_type {
00148 public:
00149
00150 sample_data_type();
00151 sample_data_type(DataSampleElement & element,
00152 ACE_Allocator * allocator);
00153 sample_data_type(DDS::Time_t timestamp,
00154 const ACE_Message_Block & mb,
00155 ACE_Allocator * allocator);
00156 sample_data_type(sample_data_type const & rhs);
00157
00158 ~sample_data_type();
00159
00160 sample_data_type & operator= (sample_data_type const & rhs);
00161
00162 void get_sample(char const *& s,
00163 size_t & len,
00164 DDS::Time_t & source_timestamp);
00165
00166 void set_allocator(ACE_Allocator * allocator);
00167
00168 private:
00169 void init(const ACE_Message_Block * data);
00170
00171 size_t length_;
00172 char * sample_;
00173 DDS::Time_t source_timestamp_;
00174 ACE_Allocator * allocator_;
00175
00176 };
00177
00178
00179
00180
00181
00182
00183 typedef DurabilityArray<
00184 DurabilityQueue<sample_data_type> *> sample_list_type;
00185
00186 typedef ACE_Hash_Map_With_Allocator<key_type,
00187 sample_list_type *> sample_map_type;
00188 typedef OPENDDS_LIST(long) timer_id_list_type;
00189
00190 DataDurabilityCache(DDS::DurabilityQosPolicyKind kind);
00191
00192 DataDurabilityCache(DDS::DurabilityQosPolicyKind kind,
00193 ACE_CString & data_dir);
00194
00195 ~DataDurabilityCache();
00196
00197
00198
00199
00200 bool insert(DDS::DomainId_t domain_id,
00201 char const * topic_name,
00202 char const * type_name,
00203 SendStateDataSampleList & the_data,
00204 DDS::DurabilityServiceQosPolicy const & qos);
00205
00206
00207
00208 bool get_data(DDS::DomainId_t domain_id,
00209 char const * topic_name,
00210 char const * type_name,
00211 DataWriterImpl * data_writer,
00212 ACE_Allocator * mb_allocator,
00213 ACE_Allocator * db_allocator,
00214 DDS::LifespanQosPolicy const & );
00215
00216 private:
00217
00218
00219 DataDurabilityCache(DataDurabilityCache const &);
00220 DataDurabilityCache & operator= (DataDurabilityCache const &);
00221
00222 void init();
00223
00224 private:
00225
00226
00227 unique_ptr<ACE_Allocator> const allocator_;
00228
00229 DDS::DurabilityQosPolicyKind kind_;
00230
00231 ACE_CString data_dir_;
00232
00233
00234 sample_map_type * samples_;
00235
00236
00237
00238
00239
00240
00241 timer_id_list_type cleanup_timer_ids_;
00242
00243
00244 ACE_SYNCH_MUTEX lock_;
00245
00246
00247 ACE_Reactor_Timer_Interface* reactor_;
00248
00249 };
00250
00251 }
00252 }
00253
00254 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00255
00256 #endif
00257
00258 #endif