00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DURABILITY_QUEUE_H 00009 #define OPENDDS_DURABILITY_QUEUE_H 00010 00011 #include <ace/Unbounded_Queue.h> 00012 00013 #include <algorithm> 00014 #include "dds/DCPS/PoolAllocator.h" 00015 00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00017 00018 namespace OpenDDS { 00019 namespace DCPS { 00020 00021 /** 00022 * @class DurabilityQueue 00023 * 00024 * @brief Queue class that provides a means to reset the 00025 * underlying @c ACE_Allocator. 00026 * 00027 * This class only exists to provide a means to reset the 00028 * allocator used by the @c ACE_Unbounded_Queue base class. It 00029 * has a specific use case, namely to correctly support instances 00030 * created by a persistent allocator. The allocator address may 00031 * change between process runs, meaning the allocator address 00032 * stored in the persistent @c ACE_Unbounded_Queue instance will 00033 * be invalid. Use the @c set_allocator() method to reset the 00034 * allocator address before performing any operations that will 00035 * require use of the allocator (e.g. enqueuing new items). 00036 */ 00037 template<typename T> 00038 class DurabilityQueue : public ACE_Unbounded_Queue<T> { 00039 public: 00040 00041 DurabilityQueue(ACE_Allocator * allocator) 00042 : ACE_Unbounded_Queue<T> (allocator) 00043 {} 00044 00045 DurabilityQueue(DurabilityQueue<T> const & rhs) 00046 : ACE_Unbounded_Queue<T> (rhs.allocator_) 00047 , fs_path_(rhs.fs_path_) 00048 { 00049 // Copied from ACE_Unbounded_Queue<>::copy_nodes(). 00050 for (ACE_Node<T> *curr = rhs.head_->next_; 00051 curr != rhs.head_; 00052 curr = curr->next_) 00053 if (this->enqueue_tail(curr->item_) == -1) 00054 this->delete_nodes(); 00055 } 00056 00057 ~DurabilityQueue() {} 00058 00059 void operator= (DurabilityQueue<T> const & rhs) { 00060 DurabilityQueue tmp(rhs); 00061 this->swap(rhs); 00062 } 00063 00064 /// Reset allocator 00065 void set_allocator(ACE_Allocator * allocator) { 00066 if (allocator == 0) 00067 allocator = ACE_Allocator::instance(); 00068 00069 this->allocator_ = allocator; 00070 } 00071 00072 void swap(DurabilityQueue<T> & rhs) { 00073 std::swap(this->head_, rhs.head_); 00074 std::swap(this->cur_size_, rhs.current_size_); 00075 std::swap(this->allocator_, rhs.allocator_); 00076 std::swap(this->fs_path_, rhs.fs_path_); 00077 } 00078 00079 //filesystem path 00080 typedef OPENDDS_VECTOR(OPENDDS_STRING) fs_path_t; 00081 fs_path_t fs_path_; 00082 }; 00083 00084 } // namespace DCPS 00085 } // namespace OpenDDS 00086 00087 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00088 00089 #endif /* OPENDDS_DURABILITY_QUEUE_H */