OpenDDS  Snapshot(2023/04/28-20:55)
DurabilityQueue.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_DURABILITYQUEUE_H
9 #define OPENDDS_DCPS_DURABILITYQUEUE_H
10 
11 #include <ace/Unbounded_Queue.h>
12 
13 #include <algorithm>
14 #include "PoolAllocator.h"
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
21 /**
22  * @class DurabilityQueue
23  *
24  * @brief Queue class that provides a means to reset the
25  * underlying @c ACE_Allocator.
26  *
27  * This class only exists to provide a means to reset the
28  * allocator used by the @c ACE_Unbounded_Queue base class. It
29  * has a specific use case, namely to correctly support instances
30  * created by a persistent allocator. The allocator address may
31  * change between process runs, meaning the allocator address
32  * stored in the persistent @c ACE_Unbounded_Queue instance will
33  * be invalid. Use the @c set_allocator() method to reset the
34  * allocator address before performing any operations that will
35  * require use of the allocator (e.g. enqueuing new items).
36  */
37 template<typename T>
39 public:
40 
42  : ACE_Unbounded_Queue<T> (allocator)
43  {}
44 
47  , fs_path_(rhs.fs_path_)
48  {
49  // Copied from ACE_Unbounded_Queue<>::copy_nodes().
50  for (ACE_Node<T> *curr = rhs.head_->next_;
51  curr != rhs.head_;
52  curr = curr->next_)
53  if (this->enqueue_tail(curr->item_) == -1)
54  this->delete_nodes();
55  }
56 
58 
59  void operator= (DurabilityQueue<T> const & rhs) {
60  DurabilityQueue tmp(rhs);
61  this->swap(rhs);
62  }
63 
64  /// Reset allocator
65  void set_allocator(ACE_Allocator * allocator) {
66  if (allocator == 0)
67  allocator = ACE_Allocator::instance();
68 
69  this->allocator_ = allocator;
70  }
71 
72  void swap(DurabilityQueue<T> & rhs) {
73  std::swap(this->head_, rhs.head_);
74  std::swap(this->cur_size_, rhs.current_size_);
75  std::swap(this->allocator_, rhs.allocator_);
76  std::swap(this->fs_path_, rhs.fs_path_);
77  }
78 
79  //filesystem path
80  typedef OPENDDS_VECTOR(OPENDDS_STRING) fs_path_t;
81  fs_path_t fs_path_;
82 };
83 
84 } // namespace DCPS
85 } // namespace OpenDDS
86 
88 
89 #endif /* OPENDDS_DURABILITY_QUEUE_H */
void swap(MessageBlock &lhs, MessageBlock &rhs)
void swap(DurabilityQueue< T > &rhs)
ACE_Node< T, void > * next_
int enqueue_tail(const T &new_item)
DurabilityQueue(ACE_Allocator *allocator)
#define OPENDDS_STRING
Queue class that provides a means to reset the underlying ACE_Allocator.
DurabilityQueue(DurabilityQueue< T > const &rhs)
void set_allocator(ACE_Allocator *allocator)
Reset allocator.
static ACE_Allocator * instance(void)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void delete_nodes(void)
void operator=(DurabilityQueue< T > const &rhs)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
ACE_Allocator * allocator_
typedef OPENDDS_VECTOR(OPENDDS_STRING) fs_path_t
ACE_Node< T > * head_