OpenDDS  Snapshot(2023/04/28-20:55)
TransportSendBuffer.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 
10 #include "TransportSendBuffer.h"
11 #include "CopyChainVisitor.h"
12 #include "PacketRemoveVisitor.h"
13 #include "RemoveAllVisitor.h"
14 
17 
18 #include "ace/Log_Msg.h"
19 
20 #include "dds/DCPS/GuidConverter.h"
21 
22 #ifndef __ACE_INLINE__
23 # include "TransportSendBuffer.inl"
24 #endif /* __ACE_INLINE__ */
25 
27 
28 namespace OpenDDS {
29 namespace DCPS {
30 
31 
33 {
34 }
35 
36 void
38 {
39 }
40 
41 void
43 {
44  int bp = 0;
45  strategy_->do_send_packet(buffer.second, bp);
46 }
47 
48 
49 // class SingleSendBuffer
50 
51 const size_t SingleSendBuffer::UNLIMITED = 0;
52 
54  size_t max_samples_per_packet)
55  : TransportSendBuffer(capacity),
56  n_chunks_(capacity * max_samples_per_packet),
57  retained_mb_allocator_(n_chunks_ * 2),
58  retained_db_allocator_(n_chunks_ * 2),
59  replaced_mb_allocator_(n_chunks_ * 2),
60  replaced_db_allocator_(n_chunks_ * 2)
61 {
62 }
63 
65 {
66  release_all();
67 }
68 
69 void
71 {
73  for (BufferMap::iterator it = buffers_.begin();
74  it != buffers_.end();) {
75  release_i(it++);
76  }
77 }
78 
79 void
82  BufferMap::iterator buffer_iter = buffers_.find(seq);
83  if (buffer_iter != buffers_.end()) {
84  release_i(buffer_iter);
85  }
86  minimum_sn_allowed_ = std::max(minimum_sn_allowed_, seq + 1);
87 }
88 
89 void
92  BufferMap::iterator buffer_iter = buffers_.find(seq);
93  if (buffer_iter != buffers_.end()) {
94  remove_i(buffer_iter, removed);
95  }
96  minimum_sn_allowed_ = std::max(minimum_sn_allowed_, seq + 1);
97 }
98 
99 void
100 SingleSendBuffer::release_i(BufferMap::iterator buffer_iter)
101 {
102  BufferType& buffer(buffer_iter->second);
103  if (Transport_debug_level > 5) {
105  ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
106  ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
107  buffer.first, buffer.second
108  ));
109  }
110 
111  if (buffer.first && buffer.second) {
112  // not a fragment
113  RemoveAllVisitor visitor;
114  buffer.first->accept_remove_visitor(visitor);
115  delete buffer.first;
116 
117  Message_Block_Ptr to_release(buffer.second);
118  buffer.second = 0;
119 
120  } else {
121  // data actually stored in fragments_
122  const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
123  if (fm_it != fragments_.end()) {
124  for (BufferMap::iterator bm_it = fm_it->second.begin();
125  bm_it != fm_it->second.end(); ++bm_it) {
126  RemoveAllVisitor visitor;
127  bm_it->second.first->accept_remove_visitor(visitor);
128  delete bm_it->second.first;
129 
130  Message_Block_Ptr to_release(bm_it->second.second);
131  bm_it->second.second = 0;
132  }
133  fragments_.erase(fm_it);
134  }
135  }
136 
137  destinations_.erase(buffer_iter->first);
138  buffers_.erase(buffer_iter);
139 }
140 
141 void
142 SingleSendBuffer::remove_i(BufferMap::iterator buffer_iter, BufferVec& removed)
143 {
144  BufferType& buffer(buffer_iter->second);
145  if (Transport_debug_level > 5) {
147  ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
148  ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
149  buffer.first, buffer.second
150  ));
151  }
152 
153  if (buffer.first && buffer.second) {
154  // not a fragment
155  removed.push_back(buffer);
156  } else {
157  // data actually stored in fragments_
158  const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
159  if (fm_it != fragments_.end()) {
160  for (BufferMap::iterator bm_it = fm_it->second.begin();
161  bm_it != fm_it->second.end(); ++bm_it) {
162  removed.push_back(bm_it->second);
163  }
164  fragments_.erase(fm_it);
165  }
166  }
167 
168  destinations_.erase(buffer_iter->first);
169  buffers_.erase(buffer_iter);
170 }
171 
172 void
174 {
175  if (Transport_debug_level > 5) {
176  LogGuid logger(pub_id);
178  ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
179  ACE_TEXT("copying out blocks for publication: %C\n"),
180  logger.c_str()
181  ));
182  }
184  for (BufferMap::iterator it(buffers_.begin());
185  it != buffers_.end();) {
186  if (it->second.first && it->second.second) {
187  if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
188  LogGuid logger(pub_id);
190  ACE_TEXT("(%P|%t) WARNING: ")
191  ACE_TEXT("SingleSendBuffer::retain_all: ")
192  ACE_TEXT("failed to retain data from publication: %C!\n"),
193  logger.c_str()));
194  release_i(it++);
195  } else {
196  ++it;
197  }
198 
199  } else {
200  const FragmentMap::iterator fm_it = fragments_.find(it->first);
201  if (fm_it != fragments_.end()) {
202  for (BufferMap::iterator bm_it = fm_it->second.begin();
203  bm_it != fm_it->second.end();) {
204  if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
205  LogGuid logger(pub_id);
207  ACE_TEXT("(%P|%t) WARNING: ")
208  ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
209  ACE_TEXT("retain fragment data from publication: %C!\n"),
210  logger.c_str()));
211  release_i(bm_it++);
212  } else {
213  ++bm_it;
214  }
215  }
216  }
217  ++it;
218  }
219  }
220 }
221 
224 {
226  PacketRemoveVisitor visitor(match,
227  buffer.second,
228  buffer.second,
231 
232  buffer.first->accept_replace_visitor(visitor);
233  if (visitor.status() != REMOVE_ERROR) {
234  // Copy sample's message/data block descriptors:
235  ACE_Message_Block* data = buffer.second;
236  buffer.second = TransportQueueElement::clone_mb(data,
239  data->release();
240  }
241  return visitor.status();
242 }
243 
244 void
247  ACE_Message_Block* chain)
248 {
249  BufferVec removed;
251  if (sequence < minimum_sn_allowed_) {
252  return;
253  }
254  check_capacity_i(removed);
255 
256  BufferType& buffer = buffers_[sequence];
257  pre_seq_.erase(sequence);
258  insert_buffer(buffer, queue, chain);
259 
260  if (Transport_debug_level > 5) {
262  ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
263  ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
264  sequence.getValue(),
265  buffer.first, buffer.second
266  ));
267  }
268 
269  if (queue && queue->size() == 1) {
270  const TransportQueueElement* elt = queue->peek();
271  const GUID_t subId = elt->subscription_id();
272  const ACE_Message_Block* msg = elt->msg();
273  if (msg && subId != GUID_UNKNOWN &&
275  destinations_[sequence] = subId;
276  }
277  }
278  g.release();
279  for (size_t i = 0; i < removed.size(); ++i) {
280  RemoveAllVisitor visitor;
281  removed[i].first->accept_remove_visitor(visitor);
282  delete removed[i].first;
283  Message_Block_Ptr to_release(removed[i].second);
284  }
285 }
286 
287 void
290  ACE_Message_Block* chain)
291 {
292  // Copy sample's TransportQueueElements:
293  TransportSendStrategy::QueueType*& elems = buffer.first;
295 
296  CopyChainVisitor visitor(*elems,
299  true);
300  queue->accept_visitor(visitor);
301 
302  buffer.second = chain->duplicate();
303 }
304 
305 void
307  SequenceNumber fragment,
308  bool is_last_fragment,
310  ACE_Message_Block* chain)
311 {
312  BufferVec removed;
314  if (sequence < minimum_sn_allowed_) {
315  return;
316  }
317  check_capacity_i(removed);
318 
319  // Insert into buffers_ so that the overall capacity is maintained
320  // The entry in buffers_ with two null pointers indicates that the
321  // actual data is stored in fragments_[sequence].
322  buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
323  static_cast<ACE_Message_Block*>(0));
324 
325  BufferType& buffer = fragments_[sequence][fragment];
326  if (is_last_fragment) {
327  pre_seq_.erase(sequence);
328  }
329  insert_buffer(buffer, queue, chain);
330 
331  if (Transport_debug_level > 5) {
333  ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
334  ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
335  sequence.getValue(), fragment.getValue(),
336  buffer.first, buffer.second
337  ));
338  }
339  g.release();
340  for (size_t i = 0; i < removed.size(); ++i) {
341  RemoveAllVisitor visitor;
342  removed[i].first->accept_remove_visitor(visitor);
343  delete removed[i].first;
344  Message_Block_Ptr to_release(removed[i].second);
345  }
346 }
347 
348 void
350 {
351  if (capacity_ == SingleSendBuffer::UNLIMITED) {
352  return;
353  }
354  // Age off oldest sample if we are at capacity:
355  if (buffers_.size() == capacity_) {
356  BufferMap::iterator it(buffers_.begin());
357  if (it == buffers_.end()) return;
358 
359  if (Transport_debug_level > 5) {
361  ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
362  ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
363  it->first.getValue(),
364  it->second.first, it->second.second
365  ));
366  }
367 
368  remove_i(it, removed);
369  }
370 }
371 
372 bool
374 {
375  return fragments_.find(seq) != fragments_.end();
376 }
377 
378 bool
380 {
381  ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
383  return resend_i(range, gaps);
384 }
385 
386 bool
388 {
389  return resend_i(range, gaps, GUID_UNKNOWN);
390 }
391 
392 bool
394  const GUID_t& destination)
395 {
396  //Special case, nak to make sure it has all history
397  if (buffers_.empty()) throw std::exception();
398  const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? buffers_.begin()->first : range.first;
399  const bool has_dest = destination != GUID_UNKNOWN;
400 
401  for (SequenceNumber sequence(range.first);
402  sequence <= range.second; ++sequence) {
403  // Re-send requested sample if still buffered; missing samples
404  // will be scored against the given DisjointSequence:
405  BufferMap::iterator it(buffers_.find(sequence));
406  DestinationMap::iterator dest_data;
407  if (has_dest) {
408  dest_data = destinations_.find(sequence);
409  }
410  if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() ||
411  dest_data->second != destination))) {
412  if (gaps) {
413  gaps->insert(sequence);
414  }
415  } else {
416  if (Transport_debug_level > 5) {
418  ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
419  ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
420  sequence.getValue(),
421  it->second.first,
422  it->second.second));
423  }
424  if (it->second.first && it->second.second) {
425  resend_one(it->second);
426  } else {
427  const FragmentMap::iterator fm_it = fragments_.find(it->first);
428  if (fm_it != fragments_.end()) {
429  for (BufferMap::iterator bm_it = fm_it->second.begin();
430  bm_it != fm_it->second.end(); ++bm_it) {
431  resend_one(bm_it->second);
432  }
433  }
434  }
435  }
436  }
437  // Have we resent all requested data?
438  return lowForAllResent >= buffers_.begin()->first && range.second <= buffers_.rbegin()->first;
439 }
440 
441 void
443  const DisjointSequence& requested_frags,
444  size_t& cumulative_send_count)
445 {
446  if (fragments_.empty() || requested_frags.empty()) {
447  return;
448  }
449  const FragmentMap::const_iterator fm_it = fragments_.find(seq);
450  if (fm_it == fragments_.end()) {
451  return;
452  }
453  const BufferMap& buffers = fm_it->second;
454  const OPENDDS_VECTOR(SequenceRange)& psr = requested_frags.present_sequence_ranges();
455 
456  BufferMap::const_iterator it = buffers.lower_bound(psr.front().first);
457  BufferMap::const_iterator end = buffers.lower_bound(psr.back().second);
458  if (end != buffers.end()) {
459  ++end;
460  }
461 
462  SequenceNumber frag_min;
463  size_t i = 0;
464 
465  // Iterate over both containers simultaneously
466  while (i < psr.size() && it != end) {
467  if (psr[i].second < frag_min) {
468  ++i;
469  } else {
470  // Once the range max is over our fragment minimum, we either
471  // expect overlap (resend fragment) or the range is too high (skip fragment)
472  // Either way, we will increment the fragment now to avoid duplicate resends
473  if (it->first >= psr[i].first) {
474  resend_one(it->second); // overlap - resend fragment buffer
475  ++cumulative_send_count;
476  }
477  frag_min = it->first + 1; // increment fragment buffer
478  ++it;
479  }
480  }
481 }
482 
483 } // namespace DCPS
484 } // namespace OpenDDS
485 
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void insert_buffer(BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
void retain_all(const GUID_t &pub_id)
#define ACE_NEW(POINTER, CONSTRUCTOR)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
RemoveResult retain_buffer(const GUID_t &pub_id, BufferType &buffer)
bool has_frags(const SequenceNumber &seq) const
ssize_t do_send_packet(const ACE_Message_Block *packet, int &bp)
Form an IOV and call the send_bytes() template method.
void release_acked(SequenceNumber seq)
TransportSendStrategy::LockType LockType
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
const char * c_str() const
void check_capacity_i(BufferVec &removed)
void resend_fragments_i(SequenceNumber sequence, const DisjointSequence &fragments, size_t &cumulative_send_count)
std::pair< QueueType *, ACE_Message_Block * > BufferType
bool resend_i(const SequenceRange &range, DisjointSequence *gaps=0)
void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
virtual void retain_all(const GUID_t &pub_id)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LM_DEBUG
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
virtual ACE_Message_Block * release(void)
virtual ACE_Message_Block * duplicate(void) const
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
bool insert(const SequenceRange &range, OPENDDS_VECTOR(SequenceRange)&added)
LM_WARNING
void insert_fragment(SequenceNumber sequence, SequenceNumber fragment, bool is_last_fragment, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
MessageBlockAllocator retained_mb_allocator_
ACE_TEXT("TCP_Factory")
std::pair< SequenceNumber, SequenceNumber > SequenceRange
void resend_one(const BufferType &buffer)
Sequence number abstraction. Only allows positive 64 bit values.
virtual GUID_t subscription_id() const
Accessor for the subscription id, if sent the sample is sent to 1 sub.
bool resend(const SequenceRange &range, DisjointSequence *gaps=0)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void accept_visitor(VisitorType &visitor) const
Definition: BasicQueue_T.h:74
static ACE_Message_Block * clone_mb(const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)
void remove_acked(SequenceNumber seq, BufferVec &removed)
MessageBlockAllocator replaced_mb_allocator_
SingleSendBuffer(size_t capacity, size_t max_samples_per_packet)
void release_i(BufferMap::iterator buffer_iter)
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Base wrapper class around a data/control sample to be sent.
void remove_i(BufferMap::iterator buffer_iter, BufferVec &removed)
typedef OPENDDS_VECTOR(BufferType) BufferVec
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)