DataDurabilityCache.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DATA_DURABILITY_CACHE_H
00009 #define OPENDDS_DATA_DURABILITY_CACHE_H
00010 
00011 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00012 
00013 #include "dds/DdsDcpsInfrastructureC.h"
00014 
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif /* ACE_LACKS_PRAGMA_ONCE */
00018 
00019 #include "dds/DCPS/DurabilityArray.h"
00020 #include "dds/DCPS/DurabilityQueue.h"
00021 #include "dds/DCPS/FileSystemStorage.h"
00022 #include "dds/DCPS/PoolAllocator.h"
00023 #include "dds/DCPS/unique_ptr.h"
00024 
00025 
00026 #include "ace/Hash_Map_With_Allocator_T.h"
00027 #include "ace/Array_Base.h"
00028 #include "ace/String_Base.h"
00029 #include "ace/SStringfwd.h"
00030 #include "ace/Thread_Mutex.h"
00031 #include "ace/Null_Mutex.h"
00032 #include "ace/Synch_Traits.h"
00033 #include "ace/Functor_T.h"
00034 #include "PoolAllocator.h"
00035 
00036 #include <memory>
00037 #include <utility>
00038 
00039 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00040 class ACE_Message_Block;
00041 ACE_END_VERSIONED_NAMESPACE_DECL
00042 
00043 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00044 
00045 namespace DDS {
00046 
00047 struct DurabilityServiceQosPolicy;
00048 struct LifespanQosPolicy;
00049 
00050 } // namespace DDS
00051 
00052 namespace OpenDDS {
00053 namespace DCPS {
00054 
00055 
00056 class DataWriterImpl;
00057 class DataSampleElement;
00058 class SendStateDataSampleList;
00059 class WriteDataContainer;
00060 
00061 /**
00062  * @class DataDurabilityCache
00063  *
00064  * @brief Underlying data cache for both OpenDDS @c TRANSIENT and
00065  *        @c PERSISTENT @c DURABILITY implementations..
00066  *
00067  * This class implements a cache that outlives @c DataWriters.
00068  */
00069 class DataDurabilityCache {
00070 public:
00071 
00072   /**
00073    * @class key_type
00074    *
00075    * @brief Key type for underlying maps.
00076    *
00077    * Each sample may be uniquely identified by its domain ID,
00078    * topic name and type name.  We use that property to establish
00079    * a map key type.
00080    */
00081   class key_type {
00082   public:
00083 
00084     key_type()
00085       : domain_id_()
00086       , topic_name_()
00087       , type_name_()
00088     {}
00089 
00090     key_type(DDS::DomainId_t domain_id,
00091              char const * topic,
00092              char const * type,
00093              ACE_Allocator * allocator)
00094         : domain_id_(domain_id)
00095         , topic_name_(topic, allocator)
00096         , type_name_(type, allocator) {
00097     }
00098 
00099     key_type(key_type const & rhs)
00100         : domain_id_(rhs.domain_id_)
00101         , topic_name_(rhs.topic_name_)
00102         , type_name_(rhs.type_name_) {
00103     }
00104 
00105     key_type & operator= (key_type const & rhs) {
00106       this->domain_id_ = rhs.domain_id_;
00107       this->topic_name_ = rhs.topic_name_;
00108       this->type_name_ = rhs.type_name_;
00109 
00110       return *this;
00111     }
00112 
00113     bool operator== (key_type const & rhs) const {
00114       return
00115         this->domain_id_ == rhs.domain_id_
00116         && this->topic_name_ == rhs.topic_name_
00117         && this->type_name_ == rhs.type_name_;
00118     }
00119 
00120     bool operator<(key_type const & rhs) const {
00121       return
00122         this->domain_id_ < rhs.domain_id_
00123         && this->topic_name_ < rhs.topic_name_
00124         && this->type_name_ < rhs.type_name_;
00125     }
00126 
00127     u_long hash() const {
00128       return
00129         static_cast<u_long>(this->domain_id_)
00130         + this->topic_name_.hash()
00131         + this->type_name_.hash();
00132     }
00133 
00134   private:
00135 
00136     DDS::DomainId_t domain_id_;
00137     ACE_CString topic_name_;
00138     ACE_CString type_name_;
00139 
00140   };
00141 
00142   /**
00143    * @class sample_data_type
00144    *
00145    * @brief Sample list data type for all samples.
00146    */
00147   class sample_data_type {
00148   public:
00149 
00150     sample_data_type();
00151     sample_data_type(DataSampleElement & element,
00152                      ACE_Allocator * allocator);
00153     sample_data_type(DDS::Time_t timestamp,
00154                      const ACE_Message_Block & mb,
00155                      ACE_Allocator * allocator);
00156     sample_data_type(sample_data_type const & rhs);
00157 
00158     ~sample_data_type();
00159 
00160     sample_data_type & operator= (sample_data_type const & rhs);
00161 
00162     void get_sample(char const *& s,
00163                     size_t & len,
00164                     DDS::Time_t & source_timestamp);
00165 
00166     void set_allocator(ACE_Allocator * allocator);
00167 
00168   private:
00169     void init(const ACE_Message_Block * data);
00170 
00171     size_t length_;
00172     char * sample_;
00173     DDS::Time_t source_timestamp_;
00174     ACE_Allocator * allocator_;
00175 
00176   };
00177 
00178   /**
00179    * @typedef Define an ACE array of ACE queues to simplify
00180    *          access to data corresponding to a specific
00181    *          DurabilityServiceQosPolicy's cleanup delay.
00182    */
00183   typedef DurabilityArray<
00184   DurabilityQueue<sample_data_type> *> sample_list_type;
00185 
00186   typedef ACE_Hash_Map_With_Allocator<key_type,
00187   sample_list_type *> sample_map_type;
00188   typedef OPENDDS_LIST(long) timer_id_list_type;
00189 
00190   DataDurabilityCache(DDS::DurabilityQosPolicyKind kind);
00191 
00192   DataDurabilityCache(DDS::DurabilityQosPolicyKind kind,
00193                       ACE_CString & data_dir);
00194 
00195   ~DataDurabilityCache();
00196 
00197   /// Insert the samples corresponding to the given topic instance
00198   /// (uniquely identify by its domain, topic name and type name)
00199   /// into the data durability cache.
00200   bool insert(DDS::DomainId_t domain_id,
00201               char const * topic_name,
00202               char const * type_name,
00203               SendStateDataSampleList & the_data,
00204               DDS::DurabilityServiceQosPolicy const & qos);
00205 
00206   /// Write cached data corresponding to given domain, topic and
00207   /// type to @c DataWriter.
00208   bool get_data(DDS::DomainId_t domain_id,
00209                 char const * topic_name,
00210                 char const * type_name,
00211                 DataWriterImpl * data_writer,
00212                 ACE_Allocator * mb_allocator,
00213                 ACE_Allocator * db_allocator,
00214                 DDS::LifespanQosPolicy const & /* lifespan */);
00215 
00216 private:
00217 
00218   // Prevent copying.
00219   DataDurabilityCache(DataDurabilityCache const &);
00220   DataDurabilityCache & operator= (DataDurabilityCache const &);
00221 
00222   void init();
00223 
00224 private:
00225 
00226   /// Allocator used to allocate memory for sample map and lists.
00227   unique_ptr<ACE_Allocator> const allocator_;
00228 
00229   DDS::DurabilityQosPolicyKind kind_;
00230 
00231   ACE_CString data_dir_;
00232 
00233   /// Map of all data samples.
00234   sample_map_type * samples_;
00235 
00236   /// Timer ID list.
00237   /**
00238    * Keep track of cleanup timer IDs in case we need to cancel
00239    * before they expire.
00240    */
00241   timer_id_list_type cleanup_timer_ids_;
00242 
00243   /// Lock for synchronized access to the underlying map.
00244   ACE_SYNCH_MUTEX lock_;
00245 
00246   /// Reactor with which cleanup timers will be registered.
00247   ACE_Reactor_Timer_Interface* reactor_;
00248 
00249 };
00250 
00251 } // namespace DCPS
00252 } // namespace OpenDDS
00253 
00254 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00255 
00256 #endif  /* OPENDDS_NO_PERSISTENCE_PROFILE */
00257 
00258 #endif  /* OPENDDS_DATA_DURABILITY_CACHE_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1