00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DURABILITY_QUEUE_H 00009 #define OPENDDS_DURABILITY_QUEUE_H 00010 00011 #include <ace/Unbounded_Queue.h> 00012 00013 #include <algorithm> 00014 #include "dds/DCPS/PoolAllocator.h" 00015 00016 namespace OpenDDS { 00017 namespace DCPS { 00018 00019 /** 00020 * @class DurabilityQueue 00021 * 00022 * @brief Queue class that provides a means to reset the 00023 * underlying @c ACE_Allocator. 00024 * 00025 * This class only exists to provide a means to reset the 00026 * allocator used by the @c ACE_Unbounded_Queue base class. It 00027 * has a specific use case, namely to correctly support instances 00028 * created by a persistent allocator. The allocator address may 00029 * change between process runs, meaning the allocator address 00030 * stored in the persistent @c ACE_Unbounded_Queue instance will 00031 * be invalid. Use the @c set_allocator() method to reset the 00032 * allocator address before performing any operations that will 00033 * require use of the allocator (e.g. enqueuing new items). 00034 */ 00035 template<typename T> 00036 class DurabilityQueue : public ACE_Unbounded_Queue<T> { 00037 public: 00038 00039 DurabilityQueue(ACE_Allocator * allocator) 00040 : ACE_Unbounded_Queue<T> (allocator) 00041 {} 00042 00043 DurabilityQueue(DurabilityQueue<T> const & rhs) 00044 : ACE_Unbounded_Queue<T> (rhs.allocator_) 00045 , fs_path_(rhs.fs_path_) 00046 { 00047 // Copied from ACE_Unbounded_Queue<>::copy_nodes(). 00048 for (ACE_Node<T> *curr = rhs.head_->next_; 00049 curr != rhs.head_; 00050 curr = curr->next_) 00051 if (this->enqueue_tail(curr->item_) == -1) 00052 this->delete_nodes(); 00053 } 00054 00055 ~DurabilityQueue() {} 00056 00057 void operator= (DurabilityQueue<T> const & rhs) { 00058 DurabilityQueue tmp(rhs); 00059 this->swap(rhs); 00060 } 00061 00062 /// Reset allocator 00063 void set_allocator(ACE_Allocator * allocator) { 00064 if (allocator == 0) 00065 allocator = ACE_Allocator::instance(); 00066 00067 this->allocator_ = allocator; 00068 } 00069 00070 void swap(DurabilityQueue<T> & rhs) { 00071 std::swap(this->head_, rhs.head_); 00072 std::swap(this->cur_size_, rhs.current_size_); 00073 std::swap(this->allocator_, rhs.allocator_); 00074 std::swap(this->fs_path_, rhs.fs_path_); 00075 } 00076 00077 //filesystem path 00078 typedef OPENDDS_VECTOR(OPENDDS_STRING) fs_path_t; 00079 fs_path_t fs_path_; 00080 }; 00081 00082 } // namespace DCPS 00083 } // namespace OpenDDS 00084 00085 #endif /* OPENDDS_DURABILITY_QUEUE_H */