OpenDDS  Snapshot(2023/04/28-20:55)
TransportSendBuffer.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDBUFFER_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDBUFFER_H
10 
11 #include "dds/DCPS/dcps_export.h"
12 
15 #include "TransportSendStrategy.h"
16 
17 #include "dds/DCPS/Definitions.h"
18 
19 #include "dds/DCPS/PoolAllocator.h"
20 #include "ace/Synch_Traits.h"
21 #include <utility>
22 
24 class ACE_Message_Block;
26 
28 
29 namespace OpenDDS {
30 namespace DCPS {
31 
32 class DisjointSequence;
33 
34 /// Abstract base class that forms the interface for TransportSendStrategy
35 /// to store data for potential retransmission. Derived classes actually
36 /// store the data and can utilize TransportSendBuffer's friendship in
37 /// TransportSendStrategy to retransmit (see method "resend_one").
39 public:
40  size_t capacity() const;
41  void bind(TransportSendStrategy* strategy);
42 
43  virtual void retain_all(const GUID_t& pub_id);
44  virtual void insert(SequenceNumber sequence,
46  ACE_Message_Block* chain) = 0;
47 
49  LockType& strategy_lock() { return this->strategy_->lock_; }
50 
51 protected:
52  explicit TransportSendBuffer(size_t capacity)
53  : strategy_(0), capacity_(capacity) {}
54  virtual ~TransportSendBuffer();
55 
57  typedef std::pair<QueueType*, ACE_Message_Block*> BufferType;
58 
59  void resend_one(const BufferType& buffer);
60 
62  const size_t capacity_;
63 
64 private:
65  TransportSendBuffer(const TransportSendBuffer&); // unimplemented
66  TransportSendBuffer& operator=(const TransportSendBuffer&); // unimplemented
67 };
68 
69 /// Implementation of TransportSendBuffer that manages data for a single
70 /// domain of SequenceNumbers -- for a given SingleSendBuffer object, the
71 /// sequence numbers passed to insert() must be generated from the same place.
73  : public TransportSendBuffer, public virtual RcObject {
74 public:
75 
76  static const size_t UNLIMITED;
77 
78  void release_all();
79  typedef OPENDDS_VECTOR(BufferType) BufferVec;
80  typedef OPENDDS_MAP(SequenceNumber, BufferType) BufferMap;
81  void release_acked(SequenceNumber seq);
82  void remove_acked(SequenceNumber seq, BufferVec& removed);
83  size_t n_chunks() const;
84 
85  SingleSendBuffer(size_t capacity, size_t max_samples_per_packet);
87 
88  bool resend(const SequenceRange& range, DisjointSequence* gaps = 0);
89 
90  void retain_all(const GUID_t& pub_id);
91  void insert(SequenceNumber sequence,
93  ACE_Message_Block* chain);
94  void insert_fragment(SequenceNumber sequence,
95  SequenceNumber fragment,
96  bool is_last_fragment,
98  ACE_Message_Block* chain);
99 
100  void pre_insert(SequenceNumber sequence);
101 
102  class Proxy {
103  public:
105  : ssb_(ssb)
106  {
107  ssb_.mutex_.acquire();
108  }
109 
111  {
112  ssb_.mutex_.release();
113  }
114 
116  {
117  if (ssb_.buffers_.empty()) throw std::exception();
118  return ssb_.buffers_.begin()->first;
119  }
120 
122  {
123  if (ssb_.buffers_.empty()) throw std::exception();
124  return ssb_.buffers_.rbegin()->first;
125  }
126 
127  bool empty() const
128  {
129  return ssb_.buffers_.empty();
130  }
131 
132  bool contains(SequenceNumber seq) const
133  {
134  return ssb_.buffers_.count(seq);
135  }
136 
137  bool contains(SequenceNumber seq, GUID_t& destination) const
138  {
139  if (ssb_.buffers_.count(seq)) {
140  DestinationMap::const_iterator pos = ssb_.destinations_.find(seq);
141  destination = pos == ssb_.destinations_.end() ? GUID_UNKNOWN : pos->second;
142  return true;
143  }
144  return false;
145  }
146 
148  {
149  if (ssb_.pre_seq_.empty()) throw std::exception();
150  return *ssb_.pre_seq_.begin();
151  }
152 
154  {
155  if (ssb_.pre_seq_.empty()) throw std::exception();
156  return *ssb_.pre_seq_.rbegin();
157  }
158 
159  bool pre_empty() const
160  {
161  return ssb_.pre_seq_.empty();
162  }
163 
164  bool pre_contains(SequenceNumber sequence) const
165  {
166  return ssb_.pre_seq_.count(sequence);
167  }
168 
169  // caller must already have the send strategy lock
170  bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0)
171  {
172  return ssb_.resend_i(range, gaps);
173  }
174 
175  bool resend_i(const SequenceRange& range, DisjointSequence* gaps,
176  const GUID_t& destination)
177  {
178  return ssb_.resend_i(range, gaps, destination);
179  }
180 
182  const DisjointSequence& fragments,
183  size_t& cumulative_send_count)
184  {
185  ssb_.resend_fragments_i(sequence, fragments, cumulative_send_count);
186  }
187 
188  bool has_frags(const SequenceNumber& seq) const
189  {
190  return ssb_.has_frags(seq);
191  }
192 
193 
194  private:
197  };
198 
199  void pre_clear()
200  {
201  pre_seq_.clear();
202  }
203 
204  bool has_frags(const SequenceNumber& seq) const;
205 
206 private:
207  void check_capacity_i(BufferVec& removed);
208  void release_i(BufferMap::iterator buffer_iter);
209  void remove_i(BufferMap::iterator buffer_iter, BufferVec& removed);
210 
211  RemoveResult retain_buffer(const GUID_t& pub_id, BufferType& buffer);
212  void insert_buffer(BufferType& buffer,
214  ACE_Message_Block* chain);
215 
216  // caller must already have the send strategy lock
217  bool resend_i(const SequenceRange& range, DisjointSequence* gaps = 0);
218  bool resend_i(const SequenceRange& range, DisjointSequence* gaps,
219  const GUID_t& destination);
220  void resend_fragments_i(SequenceNumber sequence,
221  const DisjointSequence& fragments,
222  size_t& cumulative_send_count);
223 
224  size_t n_chunks_;
225 
230 
231  BufferMap buffers_;
232 
233  typedef OPENDDS_MAP(SequenceNumber, BufferMap) FragmentMap;
234  FragmentMap fragments_;
235 
236  typedef OPENDDS_MAP(SequenceNumber, GUID_t) DestinationMap;
237  DestinationMap destinations_;
238 
239  typedef OPENDDS_SET(SequenceNumber) SequenceNumberSet;
240  SequenceNumberSet pre_seq_;
241 
243 
245 };
246 
247 } // namespace DCPS
248 } // namespace OpenDDS
249 
251 
252 #ifdef __ACE_INLINE__
253 # include "TransportSendBuffer.inl"
254 #endif /* __ACE_INLINE__ */
255 
256 #endif /* DCPS_TRANSPORTSENDBUFFER_H */
void resend_fragments_i(SequenceNumber sequence, const DisjointSequence &fragments, size_t &cumulative_send_count)
#define ACE_BEGIN_VERSIONED_NAMESPACE_DECL
bool resend_i(const SequenceRange &range, DisjointSequence *gaps, const GUID_t &destination)
bool contains(SequenceNumber seq) const
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
int bind(Container &c, const FirstType &first, const SecondType &second)
Definition: Util.h:20
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
#define OPENDDS_DELETED_COPY_MOVE_CTOR_ASSIGN(CLASS)
Definition: Definitions.h:35
TransportSendStrategy::LockType LockType
std::pair< QueueType *, ACE_Message_Block * > BufferType
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
bool resend_i(const SequenceRange &range, DisjointSequence *gaps=0)
bool pre_contains(SequenceNumber sequence) const
#define ACE_END_VERSIONED_NAMESPACE_DECL
bool has_frags(const SequenceNumber &seq) const
MessageBlockAllocator retained_mb_allocator_
std::pair< SequenceNumber, SequenceNumber > SequenceRange
Sequence number abstraction. Only allows positive 64 bit values.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
MessageBlockAllocator replaced_mb_allocator_
int insert(Container &c, const ValueType &v)
Definition: Util.h:105
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
bool contains(SequenceNumber seq, GUID_t &destination) const
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
TransportSendStrategy::QueueType QueueType
typedef OPENDDS_MAP(OPENDDS_STRING, OPENDDS_STRING) ValueMap
Helper types and functions for config file parsing.
typedef OPENDDS_SET(NetworkAddress) AddrSet