DataDurabilityCache.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DATA_DURABILITY_CACHE_H
00009 #define OPENDDS_DATA_DURABILITY_CACHE_H
00010 
00011 #ifndef OPENDDS_NO_PERSISTENCE_PROFILE
00012 
00013 #include "dds/DdsDcpsInfrastructureC.h"
00014 
00015 #if !defined (ACE_LACKS_PRAGMA_ONCE)
00016 # pragma once
00017 #endif /* ACE_LACKS_PRAGMA_ONCE */
00018 
00019 #include "dds/DCPS/DurabilityArray.h"
00020 #include "dds/DCPS/DurabilityQueue.h"
00021 #include "dds/DCPS/FileSystemStorage.h"
00022 #include "dds/DCPS/PoolAllocator.h"
00023 
00024 #include "ace/Hash_Map_With_Allocator_T.h"
00025 #include "ace/Array_Base.h"
00026 #include "ace/String_Base.h"
00027 #include "ace/SStringfwd.h"
00028 #include "ace/Thread_Mutex.h"
00029 #include "ace/Null_Mutex.h"
00030 #include "ace/Synch_Traits.h"
00031 #include "ace/Functor_T.h"
00032 #include "PoolAllocator.h"
00033 
00034 #include <memory>
00035 #include <utility>
00036 
00037 ACE_BEGIN_VERSIONED_NAMESPACE_DECL
00038 class ACE_Message_Block;
00039 ACE_END_VERSIONED_NAMESPACE_DECL
00040 
00041 namespace DDS {
00042 
00043 struct DurabilityServiceQosPolicy;
00044 struct LifespanQosPolicy;
00045 
00046 } // namespace DDS
00047 
00048 namespace OpenDDS {
00049 namespace DCPS {
00050 
00051 
00052 class DataWriterImpl;
00053 class DataSampleElement;
00054 class SendStateDataSampleList;
00055 class WriteDataContainer;
00056 
00057 /**
00058  * @class DataDurabilityCache
00059  *
00060  * @brief Underlying data cache for both OpenDDS @c TRANSIENT and
00061  *        @c PERSISTENT @c DURABILITY implementations..
00062  *
00063  * This class implements a cache that outlives @c DataWriters.
00064  */
00065 class DataDurabilityCache {
00066 public:
00067 
00068   /**
00069    * @class key_type
00070    *
00071    * @brief Key type for underlying maps.
00072    *
00073    * Each sample may be uniquely identified by its domain ID,
00074    * topic name and type name.  We use that property to establish
00075    * a map key type.
00076    */
00077   class key_type {
00078   public:
00079 
00080     key_type()
00081       : domain_id_()
00082       , topic_name_()
00083       , type_name_()
00084     {}
00085 
00086     key_type(DDS::DomainId_t domain_id,
00087              char const * topic,
00088              char const * type,
00089              ACE_Allocator * allocator)
00090         : domain_id_(domain_id)
00091         , topic_name_(topic, allocator)
00092         , type_name_(type, allocator) {
00093     }
00094 
00095     key_type(key_type const & rhs)
00096         : domain_id_(rhs.domain_id_)
00097         , topic_name_(rhs.topic_name_)
00098         , type_name_(rhs.type_name_) {
00099     }
00100 
00101     key_type & operator= (key_type const & rhs) {
00102       this->domain_id_ = rhs.domain_id_;
00103       this->topic_name_ = rhs.topic_name_;
00104       this->type_name_ = rhs.type_name_;
00105 
00106       return *this;
00107     }
00108 
00109     bool operator== (key_type const & rhs) const {
00110       return
00111         this->domain_id_ == rhs.domain_id_
00112         && this->topic_name_ == rhs.topic_name_
00113         && this->type_name_ == rhs.type_name_;
00114     }
00115 
00116     bool operator<(key_type const & rhs) const {
00117       return
00118         this->domain_id_ < rhs.domain_id_
00119         && this->topic_name_ < rhs.topic_name_
00120         && this->type_name_ < rhs.type_name_;
00121     }
00122 
00123     u_long hash() const {
00124       return
00125         static_cast<u_long>(this->domain_id_)
00126         + this->topic_name_.hash()
00127         + this->type_name_.hash();
00128     }
00129 
00130   private:
00131 
00132     DDS::DomainId_t domain_id_;
00133     ACE_CString topic_name_;
00134     ACE_CString type_name_;
00135 
00136   };
00137 
00138   /**
00139    * @class sample_data_type
00140    *
00141    * @brief Sample list data type for all samples.
00142    */
00143   class sample_data_type {
00144   public:
00145 
00146     sample_data_type();
00147     sample_data_type(DataSampleElement & element,
00148                      ACE_Allocator * allocator);
00149     sample_data_type(DDS::Time_t timestamp,
00150                      const ACE_Message_Block & mb,
00151                      ACE_Allocator * allocator);
00152     sample_data_type(sample_data_type const & rhs);
00153 
00154     ~sample_data_type();
00155 
00156     sample_data_type & operator= (sample_data_type const & rhs);
00157 
00158     void get_sample(char const *& s,
00159                     size_t & len,
00160                     DDS::Time_t & source_timestamp);
00161 
00162     void set_allocator(ACE_Allocator * allocator);
00163 
00164   private:
00165     void init(const ACE_Message_Block * data);
00166 
00167     size_t length_;
00168     char * sample_;
00169     DDS::Time_t source_timestamp_;
00170     ACE_Allocator * allocator_;
00171 
00172   };
00173 
00174   /**
00175    * @typedef Define an ACE array of ACE queues to simplify
00176    *          access to data corresponding to a specific
00177    *          DurabilityServiceQosPolicy's cleanup delay.
00178    */
00179   typedef DurabilityArray<
00180   DurabilityQueue<sample_data_type> *> sample_list_type;
00181 
00182   typedef ACE_Hash_Map_With_Allocator<key_type,
00183   sample_list_type *> sample_map_type;
00184   typedef OPENDDS_LIST(long) timer_id_list_type;
00185 
00186   /// Constructors.
00187   DataDurabilityCache(DDS::DurabilityQosPolicyKind kind);
00188 
00189   DataDurabilityCache(DDS::DurabilityQosPolicyKind kind,
00190                       ACE_CString & data_dir);
00191 
00192   /// Destructor.
00193   ~DataDurabilityCache();
00194 
00195   /// Insert the samples corresponding to the given topic instance
00196   /// (uniquely identify by its domain, topic name and type name)
00197   /// into the data durability cache.
00198   bool insert(DDS::DomainId_t domain_id,
00199               char const * topic_name,
00200               char const * type_name,
00201               SendStateDataSampleList & the_data,
00202               DDS::DurabilityServiceQosPolicy const & qos);
00203 
00204   /// Write cached data corresponding to given domain, topic and
00205   /// type to @c DataWriter.
00206   bool get_data(DDS::DomainId_t domain_id,
00207                 char const * topic_name,
00208                 char const * type_name,
00209                 DataWriterImpl * data_writer,
00210                 ACE_Allocator * mb_allocator,
00211                 ACE_Allocator * db_allocator,
00212                 DDS::LifespanQosPolicy const & /* lifespan */);
00213 
00214 private:
00215 
00216   // Prevent copying.
00217   DataDurabilityCache(DataDurabilityCache const &);
00218   DataDurabilityCache & operator= (DataDurabilityCache const &);
00219 
00220   void init();
00221 
00222   /// Make allocator suitable to support specified kind of
00223   /// @c DURABILITY.
00224   static std::auto_ptr<ACE_Allocator>
00225   make_allocator(DDS::DurabilityQosPolicyKind kind);
00226 
00227 private:
00228 
00229   /// Allocator used to allocate memory for sample map and lists.
00230   std::auto_ptr<ACE_Allocator> const allocator_;
00231 
00232   DDS::DurabilityQosPolicyKind kind_;
00233 
00234   ACE_CString data_dir_;
00235 
00236   /// Map of all data samples.
00237   sample_map_type * samples_;
00238 
00239   /// Timer ID list.
00240   /**
00241    * Keep track of cleanup timer IDs in case we need to cancel
00242    * before they expire.
00243    */
00244   timer_id_list_type cleanup_timer_ids_;
00245 
00246   /// Lock for synchronized access to the underlying map.
00247   ACE_SYNCH_MUTEX lock_;
00248 
00249   /// Reactor with which cleanup timers will be registered.
00250   ACE_Reactor_Timer_Interface* reactor_;
00251 
00252 };
00253 
00254 } // namespace DCPS
00255 } // namespace OpenDDS
00256 
00257 #endif  /* OPENDDS_NO_PERSISTENCE_PROFILE */
00258 
00259 #endif  /* OPENDDS_DATA_DURABILITY_CACHE_H */

Generated on Fri Feb 12 20:05:19 2016 for OpenDDS by  doxygen 1.4.7