OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Static Public Attributes | Private Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::SingleSendBuffer Class Reference

#include <TransportSendBuffer.h>

Inheritance diagram for OpenDDS::DCPS::SingleSendBuffer:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::SingleSendBuffer:
Collaboration graph
[legend]

Classes

class  Proxy
 

Public Member Functions

void release_all ()
 
typedef OPENDDS_VECTOR (BufferType) BufferVec
 
typedef OPENDDS_MAP (SequenceNumber, BufferType) BufferMap
 
void release_acked (SequenceNumber seq)
 
void remove_acked (SequenceNumber seq, BufferVec &removed)
 
size_t n_chunks () const
 
 SingleSendBuffer (size_t capacity, size_t max_samples_per_packet)
 
 ~SingleSendBuffer ()
 
bool resend (const SequenceRange &range, DisjointSequence *gaps=0)
 
void retain_all (const GUID_t &pub_id)
 
void insert (SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
 
void insert_fragment (SequenceNumber sequence, SequenceNumber fragment, bool is_last_fragment, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
 
void pre_insert (SequenceNumber sequence)
 
void pre_clear ()
 
bool has_frags (const SequenceNumber &seq) const
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendBuffer
size_t capacity () const
 
void bind (TransportSendStrategy *strategy)
 
LockTypestrategy_lock ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Attributes

static const size_t UNLIMITED = 0
 

Private Member Functions

void check_capacity_i (BufferVec &removed)
 
void release_i (BufferMap::iterator buffer_iter)
 
void remove_i (BufferMap::iterator buffer_iter, BufferVec &removed)
 
RemoveResult retain_buffer (const GUID_t &pub_id, BufferType &buffer)
 
void insert_buffer (BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
 
bool resend_i (const SequenceRange &range, DisjointSequence *gaps=0)
 
bool resend_i (const SequenceRange &range, DisjointSequence *gaps, const GUID_t &destination)
 
void resend_fragments_i (SequenceNumber sequence, const DisjointSequence &fragments, size_t &cumulative_send_count)
 
typedef OPENDDS_MAP (SequenceNumber, BufferMap) FragmentMap
 
typedef OPENDDS_MAP (SequenceNumber, GUID_t) DestinationMap
 
typedef OPENDDS_SET (SequenceNumber) SequenceNumberSet
 

Private Attributes

size_t n_chunks_
 
MessageBlockAllocator retained_mb_allocator_
 
DataBlockAllocator retained_db_allocator_
 
MessageBlockAllocator replaced_mb_allocator_
 
DataBlockAllocator replaced_db_allocator_
 
BufferMap buffers_
 
FragmentMap fragments_
 
DestinationMap destinations_
 
SequenceNumberSet pre_seq_
 
SequenceNumber minimum_sn_allowed_
 
ACE_Thread_Mutex mutex_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportSendBuffer
typedef TransportSendStrategy::LockType LockType
 
- Protected Types inherited from OpenDDS::DCPS::TransportSendBuffer
typedef TransportSendStrategy::QueueType QueueType
 
typedef std::pair< QueueType *, ACE_Message_Block * > BufferType
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendBuffer
 TransportSendBuffer (size_t capacity)
 
virtual ~TransportSendBuffer ()
 
void resend_one (const BufferType &buffer)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 
- Protected Attributes inherited from OpenDDS::DCPS::TransportSendBuffer
TransportSendStrategystrategy_
 
const size_t capacity_
 

Detailed Description

Implementation of TransportSendBuffer that manages data for a single domain of SequenceNumbers – for a given SingleSendBuffer object, the sequence numbers passed to insert() must be generated from the same place.

Definition at line 72 of file TransportSendBuffer.h.

Constructor & Destructor Documentation

◆ SingleSendBuffer()

OpenDDS::DCPS::SingleSendBuffer::SingleSendBuffer ( size_t  capacity,
size_t  max_samples_per_packet 
)

◆ ~SingleSendBuffer()

OpenDDS::DCPS::SingleSendBuffer::~SingleSendBuffer ( )

Definition at line 64 of file TransportSendBuffer.cpp.

References release_all().

Member Function Documentation

◆ check_capacity_i()

void OpenDDS::DCPS::SingleSendBuffer::check_capacity_i ( BufferVec &  removed)
private

Definition at line 349 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_TEXT(), buffers_, OpenDDS::DCPS::TransportSendBuffer::capacity_, LM_DEBUG, remove_i(), and OpenDDS::DCPS::Transport_debug_level.

Referenced by insert(), and insert_fragment().

350 {
352  return;
353  }
354  // Age off oldest sample if we are at capacity:
355  if (buffers_.size() == capacity_) {
356  BufferMap::iterator it(buffers_.begin());
357  if (it == buffers_.end()) return;
358 
359  if (Transport_debug_level > 5) {
360  ACE_DEBUG((LM_DEBUG,
361  ACE_TEXT("(%P|%t) SingleSendBuffer::check_capacity() - ")
362  ACE_TEXT("aging off PDU: %q as buffer(0x%@,0x%@)\n"),
363  it->first.getValue(),
364  it->second.first, it->second.second
365  ));
366  }
367 
368  remove_i(it, removed);
369  }
370 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void remove_i(BufferMap::iterator buffer_iter, BufferVec &removed)
ACE_TEXT("TCP_Factory")

◆ has_frags()

bool OpenDDS::DCPS::SingleSendBuffer::has_frags ( const SequenceNumber seq) const

Definition at line 373 of file TransportSendBuffer.cpp.

References fragments_.

374 {
375  return fragments_.find(seq) != fragments_.end();
376 }

◆ insert()

void OpenDDS::DCPS::SingleSendBuffer::insert ( SequenceNumber  sequence,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
)
virtual

Implements OpenDDS::DCPS::TransportSendBuffer.

Definition at line 245 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), buffers_, check_capacity_i(), destinations_, OpenDDS::DCPS::SequenceNumber::getValue(), OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::HISTORIC_SAMPLE_FLAG, insert_buffer(), LM_DEBUG, minimum_sn_allowed_, OpenDDS::DCPS::TransportQueueElement::msg(), mutex_, OpenDDS::DCPS::BasicQueue< T >::peek(), pre_seq_, OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::TransportQueueElement::subscription_id(), OpenDDS::DCPS::DataSampleHeader::test_flag(), and OpenDDS::DCPS::Transport_debug_level.

248 {
249  BufferVec removed;
251  if (sequence < minimum_sn_allowed_) {
252  return;
253  }
254  check_capacity_i(removed);
255 
256  BufferType& buffer = buffers_[sequence];
257  pre_seq_.erase(sequence);
258  insert_buffer(buffer, queue, chain);
259 
260  if (Transport_debug_level > 5) {
261  ACE_DEBUG((LM_DEBUG,
262  ACE_TEXT("(%P|%t) SingleSendBuffer::insert() - ")
263  ACE_TEXT("saved PDU: %q as buffer(0x%@,0x%@)\n"),
264  sequence.getValue(),
265  buffer.first, buffer.second
266  ));
267  }
268 
269  if (queue && queue->size() == 1) {
270  const TransportQueueElement* elt = queue->peek();
271  const GUID_t subId = elt->subscription_id();
272  const ACE_Message_Block* msg = elt->msg();
273  if (msg && subId != GUID_UNKNOWN &&
275  destinations_[sequence] = subId;
276  }
277  }
278  g.release();
279  for (size_t i = 0; i < removed.size(); ++i) {
280  RemoveAllVisitor visitor;
281  removed[i].first->accept_remove_visitor(visitor);
282  delete removed[i].first;
283  Message_Block_Ptr to_release(removed[i].second);
284  }
285 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
void insert_buffer(BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
static bool test_flag(DataSampleHeaderFlag flag, const ACE_Message_Block *buffer)
ACE_TEXT("TCP_Factory")
std::pair< QueueType *, ACE_Message_Block * > BufferType
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
void check_capacity_i(BufferVec &removed)

◆ insert_buffer()

void OpenDDS::DCPS::SingleSendBuffer::insert_buffer ( BufferType buffer,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
)
private

Definition at line 288 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_NEW, ACE_Message_Block::duplicate(), retained_db_allocator_, and retained_mb_allocator_.

Referenced by insert(), and insert_fragment().

291 {
292  // Copy sample's TransportQueueElements:
293  TransportSendStrategy::QueueType*& elems = buffer.first;
295 
296  CopyChainVisitor visitor(*elems,
299  true);
300  queue->accept_visitor(visitor);
301 
302  buffer.second = chain->duplicate();
303 }
BasicQueue< TransportQueueElement > QueueType
#define ACE_NEW(POINTER, CONSTRUCTOR)
virtual ACE_Message_Block * duplicate(void) const
MessageBlockAllocator retained_mb_allocator_

◆ insert_fragment()

void OpenDDS::DCPS::SingleSendBuffer::insert_fragment ( SequenceNumber  sequence,
SequenceNumber  fragment,
bool  is_last_fragment,
TransportSendStrategy::QueueType queue,
ACE_Message_Block chain 
)

Definition at line 306 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_GUARD, ACE_TEXT(), buffers_, check_capacity_i(), fragments_, OpenDDS::DCPS::SequenceNumber::getValue(), insert_buffer(), LM_DEBUG, minimum_sn_allowed_, mutex_, pre_seq_, and OpenDDS::DCPS::Transport_debug_level.

311 {
312  BufferVec removed;
314  if (sequence < minimum_sn_allowed_) {
315  return;
316  }
317  check_capacity_i(removed);
318 
319  // Insert into buffers_ so that the overall capacity is maintained
320  // The entry in buffers_ with two null pointers indicates that the
321  // actual data is stored in fragments_[sequence].
322  buffers_[sequence] = std::make_pair(static_cast<QueueType*>(0),
323  static_cast<ACE_Message_Block*>(0));
324 
325  BufferType& buffer = fragments_[sequence][fragment];
326  if (is_last_fragment) {
327  pre_seq_.erase(sequence);
328  }
329  insert_buffer(buffer, queue, chain);
330 
331  if (Transport_debug_level > 5) {
332  ACE_DEBUG((LM_DEBUG,
333  ACE_TEXT("(%P|%t) SingleSendBuffer::insert_fragment() - ")
334  ACE_TEXT("saved PDU: %q,%q as buffer(0x%@,0x%@)\n"),
335  sequence.getValue(), fragment.getValue(),
336  buffer.first, buffer.second
337  ));
338  }
339  g.release();
340  for (size_t i = 0; i < removed.size(); ++i) {
341  RemoveAllVisitor visitor;
342  removed[i].first->accept_remove_visitor(visitor);
343  delete removed[i].first;
344  Message_Block_Ptr to_release(removed[i].second);
345  }
346 }
#define ACE_DEBUG(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void insert_buffer(BufferType &buffer, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)
ACE_TEXT("TCP_Factory")
std::pair< QueueType *, ACE_Message_Block * > BufferType
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr
void check_capacity_i(BufferVec &removed)

◆ n_chunks()

ACE_INLINE size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks ( ) const

Definition at line 29 of file TransportSendBuffer.inl.

References ACE_GUARD_RETURN, and ACE_INLINE.

30 {
32  return this->n_chunks_;
33 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)

◆ OPENDDS_MAP() [1/3]

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferType   
)

◆ OPENDDS_MAP() [2/3]

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
BufferMap   
)
private

◆ OPENDDS_MAP() [3/3]

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_MAP ( SequenceNumber  ,
GUID_t   
)
private

◆ OPENDDS_SET()

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_SET ( SequenceNumber  )
private

◆ OPENDDS_VECTOR()

typedef OpenDDS::DCPS::SingleSendBuffer::OPENDDS_VECTOR ( BufferType  )

Referenced by resend_fragments_i().

◆ pre_clear()

void OpenDDS::DCPS::SingleSendBuffer::pre_clear ( )
inline

Definition at line 199 of file TransportSendBuffer.h.

200  {
201  pre_seq_.clear();
202  }

◆ pre_insert()

ACE_INLINE void OpenDDS::DCPS::SingleSendBuffer::pre_insert ( SequenceNumber  sequence)

Definition at line 36 of file TransportSendBuffer.inl.

References ACE_GUARD, and OPENDDS_END_VERSIONED_NAMESPACE_DECL.

37 {
39  pre_seq_.insert(sequence);
40 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)

◆ release_acked()

void OpenDDS::DCPS::SingleSendBuffer::release_acked ( SequenceNumber  seq)

Definition at line 80 of file TransportSendBuffer.cpp.

References ACE_GUARD, buffers_, minimum_sn_allowed_, mutex_, and release_i().

80  {
82  BufferMap::iterator buffer_iter = buffers_.find(seq);
83  if (buffer_iter != buffers_.end()) {
84  release_i(buffer_iter);
85  }
86  minimum_sn_allowed_ = std::max(minimum_sn_allowed_, seq + 1);
87 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release_i(BufferMap::iterator buffer_iter)

◆ release_all()

void OpenDDS::DCPS::SingleSendBuffer::release_all ( )

Definition at line 70 of file TransportSendBuffer.cpp.

References ACE_GUARD, buffers_, mutex_, and release_i().

Referenced by ~SingleSendBuffer().

71 {
73  for (BufferMap::iterator it = buffers_.begin();
74  it != buffers_.end();) {
75  release_i(it++);
76  }
77 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void release_i(BufferMap::iterator buffer_iter)

◆ release_i()

void OpenDDS::DCPS::SingleSendBuffer::release_i ( BufferMap::iterator  buffer_iter)
private

Definition at line 100 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_TEXT(), buffers_, destinations_, fragments_, LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.

Referenced by release_acked(), release_all(), and retain_all().

101 {
102  BufferType& buffer(buffer_iter->second);
103  if (Transport_debug_level > 5) {
104  ACE_DEBUG((LM_DEBUG,
105  ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
106  ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
107  buffer.first, buffer.second
108  ));
109  }
110 
111  if (buffer.first && buffer.second) {
112  // not a fragment
113  RemoveAllVisitor visitor;
114  buffer.first->accept_remove_visitor(visitor);
115  delete buffer.first;
116 
117  Message_Block_Ptr to_release(buffer.second);
118  buffer.second = 0;
119 
120  } else {
121  // data actually stored in fragments_
122  const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
123  if (fm_it != fragments_.end()) {
124  for (BufferMap::iterator bm_it = fm_it->second.begin();
125  bm_it != fm_it->second.end(); ++bm_it) {
126  RemoveAllVisitor visitor;
127  bm_it->second.first->accept_remove_visitor(visitor);
128  delete bm_it->second.first;
129 
130  Message_Block_Ptr to_release(bm_it->second.second);
131  bm_it->second.second = 0;
132  }
133  fragments_.erase(fm_it);
134  }
135  }
136 
137  destinations_.erase(buffer_iter->first);
138  buffers_.erase(buffer_iter);
139 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_TEXT("TCP_Factory")
std::pair< QueueType *, ACE_Message_Block * > BufferType
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ remove_acked()

void OpenDDS::DCPS::SingleSendBuffer::remove_acked ( SequenceNumber  seq,
BufferVec &  removed 
)

Definition at line 90 of file TransportSendBuffer.cpp.

References ACE_GUARD, buffers_, minimum_sn_allowed_, mutex_, and remove_i().

90  {
92  BufferMap::iterator buffer_iter = buffers_.find(seq);
93  if (buffer_iter != buffers_.end()) {
94  remove_i(buffer_iter, removed);
95  }
96  minimum_sn_allowed_ = std::max(minimum_sn_allowed_, seq + 1);
97 }
#define ACE_GUARD(MUTEX, OBJ, LOCK)
void remove_i(BufferMap::iterator buffer_iter, BufferVec &removed)

◆ remove_i()

void OpenDDS::DCPS::SingleSendBuffer::remove_i ( BufferMap::iterator  buffer_iter,
BufferVec &  removed 
)
private

Definition at line 142 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_TEXT(), buffers_, destinations_, fragments_, LM_DEBUG, and OpenDDS::DCPS::Transport_debug_level.

Referenced by check_capacity_i(), and remove_acked().

143 {
144  BufferType& buffer(buffer_iter->second);
145  if (Transport_debug_level > 5) {
146  ACE_DEBUG((LM_DEBUG,
147  ACE_TEXT("(%P|%t) SingleSendBuffer::release() - ")
148  ACE_TEXT("releasing buffer at: (0x%@,0x%@)\n"),
149  buffer.first, buffer.second
150  ));
151  }
152 
153  if (buffer.first && buffer.second) {
154  // not a fragment
155  removed.push_back(buffer);
156  } else {
157  // data actually stored in fragments_
158  const FragmentMap::iterator fm_it = fragments_.find(buffer_iter->first);
159  if (fm_it != fragments_.end()) {
160  for (BufferMap::iterator bm_it = fm_it->second.begin();
161  bm_it != fm_it->second.end(); ++bm_it) {
162  removed.push_back(bm_it->second);
163  }
164  fragments_.erase(fm_it);
165  }
166  }
167 
168  destinations_.erase(buffer_iter->first);
169  buffers_.erase(buffer_iter);
170 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_TEXT("TCP_Factory")
std::pair< QueueType *, ACE_Message_Block * > BufferType

◆ resend()

bool OpenDDS::DCPS::SingleSendBuffer::resend ( const SequenceRange range,
DisjointSequence gaps = 0 
)

Definition at line 379 of file TransportSendBuffer.cpp.

References ACE_GUARD_RETURN, mutex_, resend_i(), and OpenDDS::DCPS::TransportSendBuffer::strategy_lock().

Referenced by OpenDDS::DCPS::ReliableSession::nak_received().

380 {
381  ACE_GUARD_RETURN(LockType, guard, strategy_lock(), false);
383  return resend_i(range, gaps);
384 }
#define ACE_GUARD_RETURN(MUTEX, OBJ, LOCK, RETURN)
TransportSendStrategy::LockType LockType
bool resend_i(const SequenceRange &range, DisjointSequence *gaps=0)

◆ resend_fragments_i()

void OpenDDS::DCPS::SingleSendBuffer::resend_fragments_i ( SequenceNumber  sequence,
const DisjointSequence fragments,
size_t &  cumulative_send_count 
)
private

Definition at line 442 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::DisjointSequence::empty(), fragments_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, OPENDDS_VECTOR(), and OpenDDS::DCPS::TransportSendBuffer::resend_one().

445 {
446  if (fragments_.empty() || requested_frags.empty()) {
447  return;
448  }
449  const FragmentMap::const_iterator fm_it = fragments_.find(seq);
450  if (fm_it == fragments_.end()) {
451  return;
452  }
453  const BufferMap& buffers = fm_it->second;
454  const OPENDDS_VECTOR(SequenceRange)& psr = requested_frags.present_sequence_ranges();
455 
456  BufferMap::const_iterator it = buffers.lower_bound(psr.front().first);
457  BufferMap::const_iterator end = buffers.lower_bound(psr.back().second);
458  if (end != buffers.end()) {
459  ++end;
460  }
461 
462  SequenceNumber frag_min;
463  size_t i = 0;
464 
465  // Iterate over both containers simultaneously
466  while (i < psr.size() && it != end) {
467  if (psr[i].second < frag_min) {
468  ++i;
469  } else {
470  // Once the range max is over our fragment minimum, we either
471  // expect overlap (resend fragment) or the range is too high (skip fragment)
472  // Either way, we will increment the fragment now to avoid duplicate resends
473  if (it->first >= psr[i].first) {
474  resend_one(it->second); // overlap - resend fragment buffer
475  ++cumulative_send_count;
476  }
477  frag_min = it->first + 1; // increment fragment buffer
478  ++it;
479  }
480  }
481 }
void resend_one(const BufferType &buffer)
typedef OPENDDS_VECTOR(BufferType) BufferVec
std::pair< SequenceNumber, SequenceNumber > SequenceRange

◆ resend_i() [1/2]

bool OpenDDS::DCPS::SingleSendBuffer::resend_i ( const SequenceRange range,
DisjointSequence gaps = 0 
)
private

Definition at line 387 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::GUID_UNKNOWN.

Referenced by resend().

388 {
389  return resend_i(range, gaps, GUID_UNKNOWN);
390 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
bool resend_i(const SequenceRange &range, DisjointSequence *gaps=0)

◆ resend_i() [2/2]

bool OpenDDS::DCPS::SingleSendBuffer::resend_i ( const SequenceRange range,
DisjointSequence gaps,
const GUID_t destination 
)
private

Definition at line 393 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_TEXT(), buffers_, destinations_, fragments_, OpenDDS::DCPS::GUID_UNKNOWN, OpenDDS::DCPS::DisjointSequence::insert(), LM_DEBUG, OpenDDS::DCPS::TransportSendBuffer::resend_one(), and OpenDDS::DCPS::Transport_debug_level.

395 {
396  //Special case, nak to make sure it has all history
397  if (buffers_.empty()) throw std::exception();
398  const SequenceNumber lowForAllResent = range.first == SequenceNumber() ? buffers_.begin()->first : range.first;
399  const bool has_dest = destination != GUID_UNKNOWN;
400 
401  for (SequenceNumber sequence(range.first);
402  sequence <= range.second; ++sequence) {
403  // Re-send requested sample if still buffered; missing samples
404  // will be scored against the given DisjointSequence:
405  BufferMap::iterator it(buffers_.find(sequence));
406  DestinationMap::iterator dest_data;
407  if (has_dest) {
408  dest_data = destinations_.find(sequence);
409  }
410  if (it == buffers_.end() || (has_dest && (dest_data == destinations_.end() ||
411  dest_data->second != destination))) {
412  if (gaps) {
413  gaps->insert(sequence);
414  }
415  } else {
416  if (Transport_debug_level > 5) {
417  ACE_DEBUG((LM_DEBUG,
418  ACE_TEXT("(%P|%t) SingleSendBuffer::resend() - ")
419  ACE_TEXT("resending PDU: %q, (0x%@,0x%@)\n"),
420  sequence.getValue(),
421  it->second.first,
422  it->second.second));
423  }
424  if (it->second.first && it->second.second) {
425  resend_one(it->second);
426  } else {
427  const FragmentMap::iterator fm_it = fragments_.find(it->first);
428  if (fm_it != fragments_.end()) {
429  for (BufferMap::iterator bm_it = fm_it->second.begin();
430  bm_it != fm_it->second.end(); ++bm_it) {
431  resend_one(bm_it->second);
432  }
433  }
434  }
435  }
436  }
437  // Have we resent all requested data?
438  return lowForAllResent >= buffers_.begin()->first && range.second <= buffers_.rbegin()->first;
439 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
void resend_one(const BufferType &buffer)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
ACE_TEXT("TCP_Factory")

◆ retain_all()

void OpenDDS::DCPS::SingleSendBuffer::retain_all ( const GUID_t pub_id)
virtual

Reimplemented from OpenDDS::DCPS::TransportSendBuffer.

Definition at line 173 of file TransportSendBuffer.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_GUARD, ACE_TEXT(), buffers_, OpenDDS::DCPS::LogGuid::c_str(), fragments_, LM_DEBUG, LM_WARNING, mutex_, release_i(), OpenDDS::DCPS::REMOVE_ERROR, retain_buffer(), and OpenDDS::DCPS::Transport_debug_level.

174 {
175  if (Transport_debug_level > 5) {
176  LogGuid logger(pub_id);
177  ACE_DEBUG((LM_DEBUG,
178  ACE_TEXT("(%P|%t) SingleSendBuffer::retain_all() - ")
179  ACE_TEXT("copying out blocks for publication: %C\n"),
180  logger.c_str()
181  ));
182  }
184  for (BufferMap::iterator it(buffers_.begin());
185  it != buffers_.end();) {
186  if (it->second.first && it->second.second) {
187  if (retain_buffer(pub_id, it->second) == REMOVE_ERROR) {
188  LogGuid logger(pub_id);
189  ACE_ERROR((LM_WARNING,
190  ACE_TEXT("(%P|%t) WARNING: ")
191  ACE_TEXT("SingleSendBuffer::retain_all: ")
192  ACE_TEXT("failed to retain data from publication: %C!\n"),
193  logger.c_str()));
194  release_i(it++);
195  } else {
196  ++it;
197  }
198 
199  } else {
200  const FragmentMap::iterator fm_it = fragments_.find(it->first);
201  if (fm_it != fragments_.end()) {
202  for (BufferMap::iterator bm_it = fm_it->second.begin();
203  bm_it != fm_it->second.end();) {
204  if (retain_buffer(pub_id, bm_it->second) == REMOVE_ERROR) {
205  LogGuid logger(pub_id);
206  ACE_ERROR((LM_WARNING,
207  ACE_TEXT("(%P|%t) WARNING: ")
208  ACE_TEXT("SingleSendBuffer::retain_all: failed to ")
209  ACE_TEXT("retain fragment data from publication: %C!\n"),
210  logger.c_str()));
211  release_i(bm_it++);
212  } else {
213  ++bm_it;
214  }
215  }
216  }
217  ++it;
218  }
219  }
220 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
#define ACE_GUARD(MUTEX, OBJ, LOCK)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
ACE_TEXT("TCP_Factory")
RemoveResult retain_buffer(const GUID_t &pub_id, BufferType &buffer)
void release_i(BufferMap::iterator buffer_iter)

◆ retain_buffer()

RemoveResult OpenDDS::DCPS::SingleSendBuffer::retain_buffer ( const GUID_t pub_id,
BufferType buffer 
)
private

Definition at line 223 of file TransportSendBuffer.cpp.

References OpenDDS::DCPS::TransportQueueElement::clone_mb(), ACE_Message_Block::release(), OpenDDS::DCPS::REMOVE_ERROR, replaced_db_allocator_, replaced_mb_allocator_, retained_db_allocator_, retained_mb_allocator_, and OpenDDS::DCPS::PacketRemoveVisitor::status().

Referenced by retain_all().

224 {
225  TransportQueueElement::MatchOnPubId match(pub_id);
226  PacketRemoveVisitor visitor(match,
227  buffer.second,
228  buffer.second,
231 
232  buffer.first->accept_replace_visitor(visitor);
233  if (visitor.status() != REMOVE_ERROR) {
234  // Copy sample's message/data block descriptors:
235  ACE_Message_Block* data = buffer.second;
236  buffer.second = TransportQueueElement::clone_mb(data,
239  data->release();
240  }
241  return visitor.status();
242 }
virtual ACE_Message_Block * release(void)
MessageBlockAllocator replaced_mb_allocator_
MessageBlockAllocator retained_mb_allocator_
static ACE_Message_Block * clone_mb(const ACE_Message_Block *msg, MessageBlockAllocator *mb_allocator, DataBlockAllocator *db_allocator)

Member Data Documentation

◆ buffers_

BufferMap OpenDDS::DCPS::SingleSendBuffer::buffers_
private

◆ destinations_

DestinationMap OpenDDS::DCPS::SingleSendBuffer::destinations_
private

Definition at line 237 of file TransportSendBuffer.h.

Referenced by insert(), release_i(), remove_i(), and resend_i().

◆ fragments_

FragmentMap OpenDDS::DCPS::SingleSendBuffer::fragments_
private

◆ minimum_sn_allowed_

SequenceNumber OpenDDS::DCPS::SingleSendBuffer::minimum_sn_allowed_
private

Definition at line 242 of file TransportSendBuffer.h.

Referenced by insert(), insert_fragment(), release_acked(), and remove_acked().

◆ mutex_

ACE_Thread_Mutex OpenDDS::DCPS::SingleSendBuffer::mutex_
mutableprivate

◆ n_chunks_

size_t OpenDDS::DCPS::SingleSendBuffer::n_chunks_
private

Definition at line 224 of file TransportSendBuffer.h.

◆ pre_seq_

SequenceNumberSet OpenDDS::DCPS::SingleSendBuffer::pre_seq_
private

Definition at line 240 of file TransportSendBuffer.h.

Referenced by insert(), and insert_fragment().

◆ replaced_db_allocator_

DataBlockAllocator OpenDDS::DCPS::SingleSendBuffer::replaced_db_allocator_
private

Definition at line 229 of file TransportSendBuffer.h.

Referenced by retain_buffer().

◆ replaced_mb_allocator_

MessageBlockAllocator OpenDDS::DCPS::SingleSendBuffer::replaced_mb_allocator_
private

Definition at line 228 of file TransportSendBuffer.h.

Referenced by retain_buffer().

◆ retained_db_allocator_

DataBlockAllocator OpenDDS::DCPS::SingleSendBuffer::retained_db_allocator_
private

Definition at line 227 of file TransportSendBuffer.h.

Referenced by insert_buffer(), and retain_buffer().

◆ retained_mb_allocator_

MessageBlockAllocator OpenDDS::DCPS::SingleSendBuffer::retained_mb_allocator_
private

Definition at line 226 of file TransportSendBuffer.h.

Referenced by insert_buffer(), and retain_buffer().

◆ UNLIMITED

const size_t OpenDDS::DCPS::SingleSendBuffer::UNLIMITED = 0
static

The documentation for this class was generated from the following files: