Line data Source code
1 : /* 2 : * 3 : * 4 : * Distributed under the OpenDDS License. 5 : * See: http://www.opendds.org/license.html 6 : */ 7 : 8 : #ifndef OPENDDS_DCPS_DATADURABILITYCACHE_H 9 : #define OPENDDS_DCPS_DATADURABILITYCACHE_H 10 : 11 : #ifndef OPENDDS_NO_PERSISTENCE_PROFILE 12 : 13 : #include "dds/DdsDcpsInfrastructureC.h" 14 : 15 : #if !defined (ACE_LACKS_PRAGMA_ONCE) 16 : # pragma once 17 : #endif /* ACE_LACKS_PRAGMA_ONCE */ 18 : 19 : #include "DurabilityArray.h" 20 : #include "DurabilityQueue.h" 21 : #include "FileSystemStorage.h" 22 : #include "PoolAllocator.h" 23 : #include "unique_ptr.h" 24 : 25 : 26 : #include "ace/Hash_Map_With_Allocator_T.h" 27 : #include "ace/Array_Base.h" 28 : #include "ace/String_Base.h" 29 : #include "ace/SStringfwd.h" 30 : #include "ace/Thread_Mutex.h" 31 : #include "ace/Null_Mutex.h" 32 : #include "ace/Synch_Traits.h" 33 : #include "ace/Functor_T.h" 34 : #include "PoolAllocator.h" 35 : 36 : #include <memory> 37 : #include <utility> 38 : 39 : ACE_BEGIN_VERSIONED_NAMESPACE_DECL 40 : class ACE_Message_Block; 41 : ACE_END_VERSIONED_NAMESPACE_DECL 42 : 43 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 44 : 45 : namespace DDS { 46 : 47 : struct DurabilityServiceQosPolicy; 48 : struct LifespanQosPolicy; 49 : 50 : } // namespace DDS 51 : 52 : namespace OpenDDS { 53 : namespace DCPS { 54 : 55 : 56 : class DataWriterImpl; 57 : class DataSampleElement; 58 : class SendStateDataSampleList; 59 : class WriteDataContainer; 60 : 61 : /** 62 : * @class DataDurabilityCache 63 : * 64 : * @brief Underlying data cache for both OpenDDS @c TRANSIENT and 65 : * @c PERSISTENT @c DURABILITY implementations.. 66 : * 67 : * This class implements a cache that outlives @c DataWriters. 68 : */ 69 : class DataDurabilityCache { 70 : public: 71 : 72 : /** 73 : * @class key_type 74 : * 75 : * @brief Key type for underlying maps. 76 : * 77 : * Each sample may be uniquely identified by its domain ID, 78 : * topic name and type name. We use that property to establish 79 : * a map key type. 80 : */ 81 : class key_type { 82 : public: 83 : 84 0 : key_type() 85 0 : : domain_id_() 86 0 : , topic_name_() 87 0 : , type_name_() 88 0 : {} 89 : 90 0 : key_type(DDS::DomainId_t domain_id, 91 : char const * topic, 92 : char const * type, 93 : ACE_Allocator * allocator) 94 0 : : domain_id_(domain_id) 95 0 : , topic_name_(topic, allocator) 96 0 : , type_name_(type, allocator) { 97 0 : } 98 : 99 0 : key_type(key_type const & rhs) 100 0 : : domain_id_(rhs.domain_id_) 101 0 : , topic_name_(rhs.topic_name_) 102 0 : , type_name_(rhs.type_name_) { 103 0 : } 104 : 105 : key_type & operator= (key_type const & rhs) { 106 : this->domain_id_ = rhs.domain_id_; 107 : this->topic_name_ = rhs.topic_name_; 108 : this->type_name_ = rhs.type_name_; 109 : 110 : return *this; 111 : } 112 : 113 0 : bool operator== (key_type const & rhs) const { 114 : return 115 0 : this->domain_id_ == rhs.domain_id_ 116 0 : && this->topic_name_ == rhs.topic_name_ 117 0 : && this->type_name_ == rhs.type_name_; 118 : } 119 : 120 : bool operator<(key_type const & rhs) const { 121 : return 122 : this->domain_id_ < rhs.domain_id_ 123 : && this->topic_name_ < rhs.topic_name_ 124 : && this->type_name_ < rhs.type_name_; 125 : } 126 : 127 0 : u_long hash() const { 128 : return 129 0 : static_cast<u_long>(this->domain_id_) 130 0 : + this->topic_name_.hash() 131 0 : + this->type_name_.hash(); 132 : } 133 : 134 : private: 135 : 136 : DDS::DomainId_t domain_id_; 137 : ACE_CString topic_name_; 138 : ACE_CString type_name_; 139 : 140 : }; 141 : 142 : /** 143 : * @class sample_data_type 144 : * 145 : * @brief Sample list data type for all samples. 146 : */ 147 : class sample_data_type { 148 : public: 149 : 150 : sample_data_type(); 151 : sample_data_type(DataSampleElement & element, 152 : ACE_Allocator * allocator); 153 : sample_data_type(DDS::Time_t timestamp, 154 : const ACE_Message_Block & mb, 155 : ACE_Allocator * allocator); 156 : sample_data_type(sample_data_type const & rhs); 157 : 158 : ~sample_data_type(); 159 : 160 : sample_data_type & operator= (sample_data_type const & rhs); 161 : 162 : void get_sample(char const *& s, 163 : size_t & len, 164 : DDS::Time_t & source_timestamp); 165 : 166 : void set_allocator(ACE_Allocator * allocator); 167 : 168 : private: 169 : void init(const ACE_Message_Block * data); 170 : 171 : size_t length_; 172 : char * sample_; 173 : DDS::Time_t source_timestamp_; 174 : ACE_Allocator * allocator_; 175 : 176 : }; 177 : 178 : /** 179 : * @typedef Define an ACE array of ACE queues to simplify 180 : * access to data corresponding to a specific 181 : * DurabilityServiceQosPolicy's cleanup delay. 182 : */ 183 : typedef DurabilityArray< 184 : DurabilityQueue<sample_data_type> *> sample_list_type; 185 : 186 : typedef ACE_Hash_Map_With_Allocator<key_type, 187 : sample_list_type *> sample_map_type; 188 : typedef OPENDDS_LIST(long) timer_id_list_type; 189 : 190 : DataDurabilityCache(DDS::DurabilityQosPolicyKind kind); 191 : 192 : DataDurabilityCache(DDS::DurabilityQosPolicyKind kind, 193 : ACE_CString & data_dir); 194 : 195 : ~DataDurabilityCache(); 196 : 197 : /// Insert the samples corresponding to the given topic instance 198 : /// (uniquely identify by its domain, topic name and type name) 199 : /// into the data durability cache. 200 : bool insert(DDS::DomainId_t domain_id, 201 : char const * topic_name, 202 : char const * type_name, 203 : SendStateDataSampleList & the_data, 204 : DDS::DurabilityServiceQosPolicy const & qos); 205 : 206 : /// Write cached data corresponding to given domain, topic and 207 : /// type to @c DataWriter. 208 : bool get_data(DDS::DomainId_t domain_id, 209 : char const * topic_name, 210 : char const * type_name, 211 : DataWriterImpl * data_writer, 212 : ACE_Allocator * mb_allocator, 213 : ACE_Allocator * db_allocator, 214 : DDS::LifespanQosPolicy const & /* lifespan */); 215 : 216 : private: 217 : 218 : // Prevent copying. 219 : DataDurabilityCache(DataDurabilityCache const &); 220 : DataDurabilityCache & operator= (DataDurabilityCache const &); 221 : 222 : void init(); 223 : 224 : private: 225 : 226 : /// Allocator used to allocate memory for sample map and lists. 227 : unique_ptr<ACE_Allocator> const allocator_; 228 : 229 : DDS::DurabilityQosPolicyKind kind_; 230 : 231 : ACE_CString data_dir_; 232 : 233 : /// Map of all data samples. 234 : sample_map_type * samples_; 235 : 236 : /// Timer ID list. 237 : /** 238 : * Keep track of cleanup timer IDs in case we need to cancel 239 : * before they expire. 240 : */ 241 : timer_id_list_type cleanup_timer_ids_; 242 : 243 : /// Lock for synchronized access to the underlying map. 244 : ACE_SYNCH_MUTEX lock_; 245 : 246 : /// Reactor with which cleanup timers will be registered. 247 : ACE_Reactor_Timer_Interface* reactor_; 248 : 249 : }; 250 : 251 : } // namespace DCPS 252 : } // namespace OpenDDS 253 : 254 : OPENDDS_END_VERSIONED_NAMESPACE_DECL 255 : 256 : #endif /* OPENDDS_NO_PERSISTENCE_PROFILE */ 257 : 258 : #endif /* OPENDDS_DATA_DURABILITY_CACHE_H */