00001
00002
00003
00004
00005
00006
00007
00008 #ifndef OPENDDS_DATA_DURABILITY_CACHE_H
00009 #define OPENDDS_DATA_DURABILITY_CACHE_H
00010
00011 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00012
00013 #include "dds/DdsDcpsInfrastructureC.h"
00014
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif
00018
00019 #include "dds/DCPS/DurabilityArray.h"
00020 #include "dds/DCPS/DurabilityQueue.h"
00021 #include "dds/DCPS/FileSystemStorage.h"
00022 #include "dds/DCPS/PoolAllocator.h"
00023
00024 #include "ace/Hash_Map_With_Allocator_T.h"
00025 #include "ace/Array_Base.h"
00026 #include "ace/String_Base.h"
00027 #include "ace/SStringfwd.h"
00028 #include "ace/Thread_Mutex.h"
00029 #include "ace/Null_Mutex.h"
00030 #include "ace/Synch_Traits.h"
00031 #include "ace/Functor_T.h"
00032 #include "PoolAllocator.h"
00033
00034 #include <memory>
00035 #include <utility>
00036
00037 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00038 class ACE_Message_Block;
00039 ACE_END_VERSIONED_NAMESPACE_DECL
00040
00041 namespace DDS {
00042
00043 struct DurabilityServiceQosPolicy;
00044 struct LifespanQosPolicy;
00045
00046 }
00047
00048 namespace OpenDDS {
00049 namespace DCPS {
00050
00051
00052 class DataWriterImpl;
00053 class DataSampleElement;
00054 class SendStateDataSampleList;
00055 class WriteDataContainer;
00056
00057
00058
00059
00060
00061
00062
00063
00064
00065 class DataDurabilityCache {
00066 public:
00067
00068
00069
00070
00071
00072
00073
00074
00075
00076
00077 class key_type {
00078 public:
00079
00080 key_type()
00081 : domain_id_()
00082 , topic_name_()
00083 , type_name_()
00084 {}
00085
00086 key_type(DDS::DomainId_t domain_id,
00087 char const * topic,
00088 char const * type,
00089 ACE_Allocator * allocator)
00090 : domain_id_(domain_id)
00091 , topic_name_(topic, allocator)
00092 , type_name_(type, allocator) {
00093 }
00094
00095 key_type(key_type const & rhs)
00096 : domain_id_(rhs.domain_id_)
00097 , topic_name_(rhs.topic_name_)
00098 , type_name_(rhs.type_name_) {
00099 }
00100
00101 key_type & operator= (key_type const & rhs) {
00102 this->domain_id_ = rhs.domain_id_;
00103 this->topic_name_ = rhs.topic_name_;
00104 this->type_name_ = rhs.type_name_;
00105
00106 return *this;
00107 }
00108
00109 bool operator== (key_type const & rhs) const {
00110 return
00111 this->domain_id_ == rhs.domain_id_
00112 && this->topic_name_ == rhs.topic_name_
00113 && this->type_name_ == rhs.type_name_;
00114 }
00115
00116 bool operator<(key_type const & rhs) const {
00117 return
00118 this->domain_id_ < rhs.domain_id_
00119 && this->topic_name_ < rhs.topic_name_
00120 && this->type_name_ < rhs.type_name_;
00121 }
00122
00123 u_long hash() const {
00124 return
00125 static_cast<u_long>(this->domain_id_)
00126 + this->topic_name_.hash()
00127 + this->type_name_.hash();
00128 }
00129
00130 private:
00131
00132 DDS::DomainId_t domain_id_;
00133 ACE_CString topic_name_;
00134 ACE_CString type_name_;
00135
00136 };
00137
00138
00139
00140
00141
00142
00143 class sample_data_type {
00144 public:
00145
00146 sample_data_type();
00147 sample_data_type(DataSampleElement & element,
00148 ACE_Allocator * allocator);
00149 sample_data_type(DDS::Time_t timestamp,
00150 const ACE_Message_Block & mb,
00151 ACE_Allocator * allocator);
00152 sample_data_type(sample_data_type const & rhs);
00153
00154 ~sample_data_type();
00155
00156 sample_data_type & operator= (sample_data_type const & rhs);
00157
00158 void get_sample(char const *& s,
00159 size_t & len,
00160 DDS::Time_t & source_timestamp);
00161
00162 void set_allocator(ACE_Allocator * allocator);
00163
00164 private:
00165 void init(const ACE_Message_Block * data);
00166
00167 size_t length_;
00168 char * sample_;
00169 DDS::Time_t source_timestamp_;
00170 ACE_Allocator * allocator_;
00171
00172 };
00173
00174
00175
00176
00177
00178
00179 typedef DurabilityArray<
00180 DurabilityQueue<sample_data_type> *> sample_list_type;
00181
00182 typedef ACE_Hash_Map_With_Allocator<key_type,
00183 sample_list_type *> sample_map_type;
00184 typedef OPENDDS_LIST(long) timer_id_list_type;
00185
00186
00187 DataDurabilityCache(DDS::DurabilityQosPolicyKind kind);
00188
00189 DataDurabilityCache(DDS::DurabilityQosPolicyKind kind,
00190 ACE_CString & data_dir);
00191
00192
00193 ~DataDurabilityCache();
00194
00195
00196
00197
00198 bool insert(DDS::DomainId_t domain_id,
00199 char const * topic_name,
00200 char const * type_name,
00201 SendStateDataSampleList & the_data,
00202 DDS::DurabilityServiceQosPolicy const & qos);
00203
00204
00205
00206 bool get_data(DDS::DomainId_t domain_id,
00207 char const * topic_name,
00208 char const * type_name,
00209 DataWriterImpl * data_writer,
00210 ACE_Allocator * mb_allocator,
00211 ACE_Allocator * db_allocator,
00212 DDS::LifespanQosPolicy const & );
00213
00214 private:
00215
00216
00217 DataDurabilityCache(DataDurabilityCache const &);
00218 DataDurabilityCache & operator= (DataDurabilityCache const &);
00219
00220 void init();
00221
00222
00223
00224 static std::auto_ptr<ACE_Allocator>
00225 make_allocator(DDS::DurabilityQosPolicyKind kind);
00226
00227 private:
00228
00229
00230 std::auto_ptr<ACE_Allocator> const allocator_;
00231
00232 DDS::DurabilityQosPolicyKind kind_;
00233
00234 ACE_CString data_dir_;
00235
00236
00237 sample_map_type * samples_;
00238
00239
00240
00241
00242
00243
00244 timer_id_list_type cleanup_timer_ids_;
00245
00246
00247 ACE_SYNCH_MUTEX lock_;
00248
00249
00250 ACE_Reactor_Timer_Interface* reactor_;
00251
00252 };
00253
00254 }
00255 }
00256
00257 #endif
00258
00259 #endif