OpenDDS  Snapshot(2023/04/07-19:43)
Classes | Public Member Functions | Protected Member Functions | Protected Attributes | List of all members
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH > Class Template Referenceabstract

#include <TransportReceiveStrategy_T.h>

Inheritance diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:
Collaboration graph
[legend]

Classes

class  ScopedHeaderProcessing
 

Public Member Functions

virtual ~TransportReceiveStrategy ()
 
int start ()
 
void stop ()
 
int handle_dds_input (ACE_HANDLE fd)
 
virtual void relink (bool do_suspend=true)
 
const TH & received_header () const
 
TH & received_header ()
 
const DSH & received_sample_header () const
 
DSH & received_sample_header ()
 
ACE_Message_Blockto_msgblock (const ReceivedDataSample &sample)
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportStrategy
virtual ~TransportStrategy ()
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

 TransportReceiveStrategy (const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
 
virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)=0
 Only our subclass knows how to do this. More...
 
virtual bool check_header (const TH &header)
 Check the transport header for suitability. More...
 
virtual bool check_header (const DSH &header)
 Check the data sample header for suitability. More...
 
virtual void begin_transport_header_processing ()
 Begin Current Transport Header Processing. More...
 
virtual void end_transport_header_processing ()
 End Current Transport Header Processing. More...
 
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0
 Called when there is a ReceivedDataSample to be delivered. More...
 
virtual void finish_message ()
 
virtual int start_i ()=0
 Let the subclass start. More...
 
virtual void stop_i ()=0
 Let the subclass stop. More...
 
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them. More...
 
void reset ()
 
size_t pdu_remaining () const
 
size_t successor_index (size_t index) const
 Manage an index into the receive buffer array. More...
 
void update_buffer_index (bool &done)
 
virtual bool reassemble (ReceivedDataSample &data)
 
 OPENDDS_VECTOR (ACE_Message_Block *) receive_buffers_
 Set of receive buffers in use. More...
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Protected Attributes

bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received. More...
 
size_t receive_sample_remaining_
 Bytes remaining in the current DataSample. More...
 
TH receive_transport_header_
 Current receive TransportHeader. More...
 
TransportMessageBlockAllocator mb_allocator_
 
TransportDataBlockAllocator db_allocator_
 
TransportDataAllocator data_allocator_
 
ACE_Lock_Adapter< ACE_SYNCH_MUTEXreceive_lock_
 Locking strategy for the allocators. More...
 
size_t buffer_index_
 Current receive buffer index in use. More...
 
DSH data_sample_header_
 Current data sample header. More...
 
ACE_Message_Blockpayload_
 
bool good_pdu_
 
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet. More...
 

Additional Inherited Members

- Static Public Attributes inherited from OpenDDS::DCPS::TransportReceiveConstants
static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS
 
static const size_t BUFFER_LOW_WATER = 4096
 
static const size_t MESSAGE_BLOCKS = 1000
 
static const size_t DATA_BLOCKS = 100
 

Detailed Description

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
class OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >

This class provides buffer for data received by transports, de-assemble the data to individual samples and deliver them.

Definition at line 50 of file TransportReceiveStrategy_T.h.

Constructor & Destructor Documentation

◆ ~TransportReceiveStrategy()

template<typename TH , typename DSH >
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy ( )
virtual

Definition at line 54 of file TransportReceiveStrategy_T.cpp.

55 {
56  DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
57 
58  if (this->receive_buffers_[this->buffer_index_] != 0) {
59  size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
60 
61  if (size > 0) {
62  ACE_DEBUG((LM_WARNING,
63  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
64  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
65  size));
66  }
67  }
68 
69  for (size_t index = 0; index < receive_buffers_.size(); ++index) {
70  if (receive_buffers_[index] != 0) {
72  receive_buffers_[index],
75  }
76  }
77 }
#define ACE_DEBUG(X)
void free(void *ptr)
Return a chunk of memory back to free list cache.
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
size_t buffer_index_
Current receive buffer index in use.

◆ TransportReceiveStrategy()

template<typename TH , typename DSH >
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy ( const TransportInst_rch config,
size_t  receive_buffers_count = RECEIVE_BUFFERS 
)
explicitprotected

Definition at line 25 of file TransportReceiveStrategy_T.cpp.

27  : gracefully_disconnected_(false),
29  mb_allocator_((config && config->receive_preallocated_message_blocks_) ? config->receive_preallocated_message_blocks_ : MESSAGE_BLOCKS),
30  db_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : DATA_BLOCKS),
31  data_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : receive_buffers_count * 2),
32  receive_buffers_(receive_buffers_count, 0),
33  buffer_index_(0),
34  payload_(0),
35  good_pdu_(true),
37 {
38  DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
39 
40  if (Transport_debug_level >= 2) {
41  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
42  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
44  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
45  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
47  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
48  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
50  }
51 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
size_t receive_sample_remaining_
Bytes remaining in the current DataSample.
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
size_t buffer_index_
Current receive buffer index in use.

Member Function Documentation

◆ begin_transport_header_processing()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::begin_transport_header_processing ( )
inlineprotectedvirtual

Begin Current Transport Header Processing.

Reimplemented in OpenDDS::DCPS::RtpsUdpReceiveStrategy.

Definition at line 98 of file TransportReceiveStrategy_T.h.

98 {}

◆ check_header() [1/2]

template<typename TH, typename DSH >
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const TH &  header)
protectedvirtual

Check the transport header for suitability.

Reimplemented in OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::UdpReceiveStrategy, and OpenDDS::DCPS::MulticastReceiveStrategy.

Definition at line 81 of file TransportReceiveStrategy_T.cpp.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::check_header().

82 {
83  return true;
84 }

◆ check_header() [2/2]

template<typename TH, typename DSH>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const DSH &  header)
protectedvirtual

Check the data sample header for suitability.

Reimplemented in OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::UdpReceiveStrategy, and OpenDDS::DCPS::MulticastReceiveStrategy.

Definition at line 88 of file TransportReceiveStrategy_T.cpp.

89 {
90  return true;
91 }

◆ deliver_sample()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)
protectedpure virtual

◆ end_transport_header_processing()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::end_transport_header_processing ( )
inlineprotectedvirtual

End Current Transport Header Processing.

Reimplemented in OpenDDS::DCPS::RtpsUdpReceiveStrategy.

Definition at line 101 of file TransportReceiveStrategy_T.h.

101 {}

◆ finish_message()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::finish_message ( )
inlineprotectedvirtual

Definition at line 115 of file TransportReceiveStrategy_T.h.

115 {}

◆ handle_dds_input()

template<typename TH , typename DSH >
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input ( ACE_HANDLE  fd)

Note that this is just an initial implementation. We may take some shortcuts (we will) that will need to be dealt with later once a more robust implementation can be put in place.

Our handle_dds_input() method is called by the reactor when there is data to be pulled from our peer() ACE_SOCK_Stream.

Definition at line 101 of file TransportReceiveStrategy_T.cpp.

102 {
103  DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
104 
105  //
106  // What we will be doing here:
107  //
108  // 1) Read as much data as possible from the handle.
109  // 2) Process each Transport layer packet
110  // A) Parse the Transport header
111  // B) Process each DataSample in the packet
112  // a) Parse the DataSampleHeader
113  // b) Parse the remainder of the sample
114  // c) call data_received for complete samples
115  // d) store any partial sample or header data
116  // 3) return
117  //
118  // The state that we might need to maintain includes:
119  // I) Current Transport layer header
120  // II) Current DataSample header
121  // III) Current DataSample data
122  //
123  // For each of these elements, they can be:
124  // i) empty
125  // ii) partial (not able to parse yet).
126  // iii) complete (parsed values available).
127  //
128  // The read buffers will be a series of data buffers of a fixed size
129  // that are each managed by an ACE_Data_Block, each of which is
130  // managed by an ACE_Message_Block containing the read and write
131  // pointers into the data.
132  //
133  // As messages are parsed, new ACE_Message_Blocks will be formed with
134  // read and write pointers adjusted to reference the individual
135  // DataSample buffer contents.
136  //
137  // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
138  // will be acquired from a Cached_Allocater_with_overflow. The size
139  // of the individual data buffers will be set initially to the
140  // ethernet MTU to allow for full TCP messages to be received into
141  // individual blocks.
142  //
143  // Reading will be performed with a two member struct iov, to allow
144  // for the remainder of the current buffer to be used along with at
145  // least one completely empty buffer. There will be a low water
146  // parameter used to stop the use of the current block when the
147  // available space in it is reduced.
148  //
149 
150  //
151  // Establish the current receive buffer.
152  //
153  // This involves checking for any remainder from the previous trip
154  // through the code. If there is no current buffer, or the buffer
155  // has less than the low water mark space() left, then promote the
156  // next receive buffer to the current. If none is present, create a
157  // new one and use it.
158  //
159  //
160  // Establish the next receive buffer.
161  //
162  // This involves checking for any previous complete buffers that
163  // were not promoted in the previous step. If none are present,
164  // create a new one and use that.
165  //
166 
167  //
168  // Remove any buffers that have been completely read and have less
169  // than the low water amount of space left.
170  //
171  size_t index;
172 
173  for (index = 0; index < receive_buffers_.size(); ++index) {
174  if ((this->receive_buffers_[index] != 0)
175  && (this->receive_buffers_[index]->length() == 0)
176  && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
177  VDBG((LM_DEBUG,"(%P|%t) DBG: "
178  "Remove a receive_buffer_[%d] from use.\n",
179  index));
180 
181  // unlink any Message_Block that continues to this one
182  // being removed.
183  // This avoids a possible infinite ->cont() loop.
184  for (size_t ii =0; ii < receive_buffers_.size(); ii++) {
185  if ((0 != this->receive_buffers_[ii]) &&
186  (this->receive_buffers_[ii]->cont() ==
187  this->receive_buffers_[index])) {
188  this->receive_buffers_[ii]->cont(0);
189  }
190  }
191 
192  //
193  // Remove the receive buffer from use.
194  //
195  ACE_DES_FREE(
196  this->receive_buffers_[index],
197  this->mb_allocator_.free,
199  this->receive_buffers_[index] = 0;
200  }
201  }
202 
203  //
204  // Allocate buffers for any empty slots. We may have emptied one just
205  // here, but others may have been emptied by a large read during the
206  // last trip through the code as well.
207  //
208  size_t previous = this->buffer_index_;
209 
210  for (index = this->buffer_index_;
211  this->successor_index(previous) != this->buffer_index_;
212  index = this->successor_index(index)) {
213  if (this->receive_buffers_[index] == 0) {
214  VDBG((LM_DEBUG,"(%P|%t) DBG: "
215  "Allocate a Message_Block for new receive_buffer_[%d].\n",
216  index));
217 
219  this->receive_buffers_[index],
221  sizeof(ACE_Message_Block)),
223  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
224  ACE_Message_Block::MB_DATA, // Default
225  0, // Start with no continuation
226  0, // Let the constructor allocate
227  &this->data_allocator_, // Our buffer cache
228  &this->receive_lock_, // Our locking strategy
229  ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
230  ACE_Time_Value::zero, // Default
231  ACE_Time_Value::max_time, // Default
232  &this->db_allocator_, // Our data block cache
233  &this->mb_allocator_ // Our message block cache
234  ),
235  -1);
236  }
237 
238  //
239  // Chain the buffers. This allows us to have portions of parsed
240  // data cross buffer boundaries without a problem.
241  //
242  if (previous != index) {
243  this->receive_buffers_[previous]->cont(
244  this->receive_buffers_[index]);
245  }
246 
247  previous = index;
248  }
249 
250  //
251  // Read from handle.
252  //
253  // This involves a non-blocking recvv_n(). Any remaining space in
254  // the buffers should be saved for the next pass through the code.
255  // If more than one receive buffer has data, chain them. Promote
256  // all receive buffers so that the next pass through, the first
257  // buffer with space remaining will appear as the current receive
258  // buffer.
259  //
260  // This is probably what should remain in the specific
261  // implementation, with the rest of this factored back up into the
262  // framework.
263  //
264 
265  VDBG((LM_DEBUG,"(%P|%t) DBG: "
266  "Form the iovec from the message block\n"));
267 
268  //
269  // Form the iovec from the message block chain of receive buffers.
270  //
271  iovec iov[RECEIVE_BUFFERS];
272  size_t vec_index = 0;
273  size_t current = this->buffer_index_;
274 
275  for (index = 0;
276  index < receive_buffers_.size();
277  ++index, current = this->successor_index(current)) {
278  // Invariant. ASSERT?
279  if (this->receive_buffers_[current] == 0) {
280  ACE_ERROR_RETURN((LM_ERROR,
281  ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
282  ACE_TEXT("receive buffer management detected.\n")),
283  -1);
284  }
285 
286 #ifdef _MSC_VER
287 #pragma warning(push)
288 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
289 // since on other platforms iov_len is 64-bit
290 #pragma warning(disable : 4267)
291 #endif
292  // This check covers the case where we have unread data in
293  // the first buffer, but no space to write any more data.
294  if (this->receive_buffers_[current]->space() > 0) {
295  iov[vec_index].iov_len = this->receive_buffers_[current]->space();
296  iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
297 
298  VDBG((LM_DEBUG,"(%P|%t) DBG: "
299  "index==%d, len==%d, base==%x\n",
300  vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
301 
302  vec_index++;
303  }
304  }
305 
306 #ifdef _MSC_VER
307 #pragma warning(pop)
308 #endif
309 
310  VDBG((LM_DEBUG,"(%P|%t) DBG: "
311  "Perform the recvv() call\n"));
312 
313  //
314  // Read into the buffers.
315  //
316  ACE_INET_Addr remote_address;
317  bool stop = false;
318  const ssize_t bytes_remaining = this->receive_bytes(iov,
319  static_cast<int>(vec_index),
320  remote_address,
321  fd,
322  stop);
323 
324  if (stop) {
325  return 0;
326  }
327 
328  if (bytes_remaining < 0) {
329  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
330  ACE_TEXT("with data link detected: %p.\n"),
331  ACE_TEXT("receive_bytes")));
332 
333  // The relink() will handle the connection to the ReconnectTask to do
334  // the reconnect so this reactor thread will not be block.
335  this->relink();
336 
337  // Close connection anyway.
338  return -1;
339  // Returning -1 takes the handle out of the reactor read mask.
340  }
341 
342  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
343  "recvv() return %d - we call this the bytes_remaining.\n",
344  bytes_remaining), 5);
345 
346  if (bytes_remaining == 0) {
347  if (this->gracefully_disconnected_) {
348  VDBG_LVL((LM_DEBUG, "(%P|%t) Peer has gracefully disconnected.\n"), 1);
349  return -1;
350 
351  } else {
352  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ")
353  ACE_TEXT("with data link detected\n")), 1);
354 
355  // The relink() will handle the connection to the ReconnectTask to do
356  // the reconnect so this reactor thread will not be block.
357  this->relink();
358 
359  // Close connection anyway.
360  return -1;
361  // Returning -1 takes the handle out of the reactor read mask.
362  }
363  }
364 
365  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
366  "START Adjust the message block chain pointers to account for the "
367  "new data.\n"), 5);
368 
369  //
370  // Adjust the message block chain pointers to account for the new
371  // data.
372  //
373  size_t bytes = bytes_remaining;
374 
375  if (!this->pdu_remaining_) {
376  this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
377  }
378 
379  for (index = this->buffer_index_;
380  bytes > 0;
381  index = this->successor_index(index)) {
382  VDBG((LM_DEBUG,"(%P|%t) DBG: -> "
383  "At top of for..loop block.\n"));
384 
385  const size_t amount = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
386 
387  VDBG((LM_DEBUG,"(%P|%t) DBG: "
388  "index == %d bytes == %d amount == %d.\n",
389  index, bytes, amount));
390 
391  VDBG((LM_DEBUG,"(%P|%t) DBG: "
392  "this->receive_buffers_[index]->rd_ptr() == %u.\n",
393  this->receive_buffers_[index]->rd_ptr()));
394  VDBG((LM_DEBUG,"(%P|%t) DBG: "
395  "this->receive_buffers_[index]->wr_ptr() == %u.\n",
396  this->receive_buffers_[index]->wr_ptr()));
397 
398  this->receive_buffers_[index]->wr_ptr(amount);
399 
400  VDBG((LM_DEBUG,"(%P|%t) DBG: "
401  "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n",
402  this->receive_buffers_[index]->wr_ptr()));
403 
404  bytes -= amount;
405 
406  VDBG((LM_DEBUG,"(%P|%t) DBG: "
407  "Now, bytes == %d .\n", bytes));
408 
409  // This is yukky to do here, but there is a fine line between
410  // where things are moved and where they are checked.
411  if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
412  // Here we have read more data than we passed in buffer. Bad.
413  ACE_ERROR_RETURN((LM_ERROR,
414  ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
415  ACE_TEXT("receive buffer management detected: ")
416  ACE_TEXT("read more bytes than available.\n")),
417  -1);
418  }
419  }
420 
421  VDBG((LM_DEBUG,"(%P|%t) DBG: "
422  "DONE Adjust the message block chain pointers to account "
423  "for the new data.\n"));
424 
425  //
426  // While the receive buffer is not empty:
427  //
428  // This is the receive buffer(s) that was just read. The receive
429  // buffer message block is used to manage our parsing through this
430  // data. When we have parsed out all the data, then it will be
431  // considered empty. This message block read pointer indicates
432  // where we are in the parsing process. Other message blocks refer
433  // to the parsed data and retain the data block references to
434  // prevent the data from being released until we have completed all
435  // uses for the data in the receive buffer. This will normally be
436  // after the sample data is demarshaled in the DataReader
437  // components.
438  //
439  VDBG((LM_DEBUG,"(%P|%t) DBG: "
440  "Start processing the data we just received.\n"));
441 
442  //
443  // Operate while we have more data to process.
444  //
445  while (true) {
446 
447  //
448  // Manage the current transport header.
449  //
450  // This involves checking to see if we have remaining data in the
451  // current packet. If not, or if we have not read a complete
452  // header, then we read a transport header and check it for
453  // validity. As we have a valid transport header with data
454  // remaining to read in the packet that it represents, we move on.
455  //
456  // As new transport headers are to be read, they are read into a
457  // message buffer member variable and demarshaled directly. The
458  // values are retained for the lifetime of the processing of the
459  // instant transport packet. The member message buffer allows us
460  // to retain partially read transport headers until we can read
461  // more data.
462  //
463 
464  //
465  // Check the remaining transport packet length to see if we are
466  // expecting a new one.
467  //
468  if (this->pdu_remaining_ == 0) {
469  VDBG((LM_DEBUG,"(%P|%t) DBG: "
470  "We are expecting a transport packet header.\n"));
471 
472  VDBG((LM_DEBUG,"(%P|%t) DBG: "
473  "this->buffer_index_ == %d.\n",this->buffer_index_));
474 
475  VDBG((LM_DEBUG,"(%P|%t) DBG: "
476  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
477  "== %u.\n",
478  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
479 
480  VDBG((LM_DEBUG,"(%P|%t) DBG: "
481  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
482  "== %u.\n",
483  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
484 
485  if (this->receive_buffers_[this->buffer_index_]->total_length()
486  < this->receive_transport_header_.get_max_serialized_size()) {
487  //
488  // Not enough room in the buffer for the entire Transport
489  // header that we need to read, so relinquish control until
490  // we get more data.
491  //
492  VDBG((LM_DEBUG,"(%P|%t) DBG: "
493  "Not enough bytes read to account for a transport "
494  "packet header. We are done here - we need to "
495  "receive more bytes.\n"));
496 
497  this->receive_transport_header_.incomplete(
498  *this->receive_buffers_[this->buffer_index_]);
499 
500  return 0;
501 
502  } else {
503  VDBG((LM_DEBUG,"(%P|%t) DBG: "
504  "We have enough bytes to demarshall the transport "
505  "packet header.\n"));
506 
507  // only do the hexdump if it will be printed - to not impact performance.
508  if (Transport_debug_level > 5) {
509  ACE_TCHAR xbuffer[4096];
510  const ACE_Message_Block& mb =
511  *this->receive_buffers_[this->buffer_index_];
512  size_t xbytes = mb.length();
513 
514  xbytes = (std::min)(xbytes, TH::get_max_serialized_size());
515 
516  ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
517 
518  VDBG((LM_DEBUG,"(%P|%t) DBG: "
519  "Hex Dump of transport header block "
520  "(%d bytes):\n%s", xbytes, xbuffer));
521  }
522 
523  //
524  // Demarshal the transport header.
525  //
527  *this->receive_buffers_[this->buffer_index_];
528 
529  //
530  // Check the TransportHeader.
531  //
532  if (!this->receive_transport_header_.valid()) {
533  ACE_ERROR_RETURN((LM_ERROR,
534  ACE_TEXT
535  ("(%P|%t) ERROR: TransportHeader invalid.\n")),
536  -1);
537  }
538 
540  this->pdu_remaining_ = this->receive_transport_header_.length_;
541 
542  VDBG((LM_DEBUG,"(%P|%t) DBG: "
543  "Amount of transport packet bytes (remaining): %d.\n",
544  this->pdu_remaining_));
545  }
546 
547  // The receive_transport_header_ assignment advanced the read index
548  // that can be across the buffers. Advance the buffer index.
549  bool last_buffer = false;
550  update_buffer_index(last_buffer);
551  }
552 
553  //
554  // Ignore bad PDUs by skipping over them. We do this out here
555  // in case we did not skip over the entire bad PDU last time.
556  //
557  {
558  int rtn_code = skip_bad_pdus();
559  if (rtn_code <= 0) return rtn_code;
560  }
561 
562  //
563  // Keep processing samples while we have data to read.
564  //
565  while (this->pdu_remaining_ > 0) {
566  VDBG((LM_DEBUG,"(%P|%t) DBG: "
567  "We have a transport packet now. There are more sample "
568  "bytes to parse in order to complete the packet.\n"));
569 
570  //
571  // Manage the current sample header.
572  //
573  // This involves checking to see if we have remaining data in the
574  // current sample. If not, or if we have not read a complete
575  // header, then we read a sample header and check it for validity,
576  // which currently involves checking the message type only. As we
577  // have a valid sample header with data remaining to read in the
578  // sample that it represents, we move on.
579  //
580  // As new sample headers are read, the are read into a message
581  // buffer member variable and demarshaled directly. The values are
582  // retained for the lifetime of the sample and are passed as part
583  // of the receive data sample itself. The member message buffer
584  // allows us to retain partially read sample headers until we can
585  // read more data.
586  //
587  if (this->receive_sample_remaining_ == 0) {
588  VDBG((LM_DEBUG,"(%P|%t) DBG: "
589  "We are not working on some remaining sample data. "
590  "We are expecting to parse a sample header now.\n"));
591 
592  VDBG((LM_DEBUG,"(%P|%t) DBG: "
593  "this->buffer_index_ == %d.\n",this->buffer_index_));
594 
595  VDBG((LM_DEBUG,"(%P|%t) DBG: "
596  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
597  "== %u.\n",
598  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
599 
600  VDBG((LM_DEBUG,"(%P|%t) DBG: "
601  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
602  "== %u.\n",
603  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
604 
605  if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
606  //
607  // Not enough room in the buffer for the entire Sample
608  // header that we need to read, so relinquish control until
609  // we get more data.
610  //
611  VDBG((LM_DEBUG,"(%P|%t) DBG: "
612  "Not enough bytes have been received to account "
613  "for a complete sample header. We are done for "
614  "now - we need to receive more data before we "
615  "can go on.\n"));
616  if (Transport_debug_level > 2) {
617  ACE_TCHAR ebuffer[350];
618  ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
619  const size_t sz = (std::min)(DSH::get_max_serialized_size(), mb.length());
620  ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
621  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: "
622  "Partial DataSampleHeader:\n%s\n", ebuffer));
623  }
624 
625  return 0;
626 
627  } else {
628  VDBG((LM_DEBUG,"(%P|%t) DBG: "
629  "We have received enough bytes for the sample "
630  "header. Demarshall the sample header now.\n"));
631 
632  // only do the hexdump if it will be printed - to not impact performance.
633  if (Transport_debug_level > 5) {
634  ACE_TCHAR ebuffer[4096];
636  this->receive_buffers_[this->buffer_index_]->rd_ptr(),
637  this->data_sample_header_.get_max_serialized_size(),
638  ebuffer, sizeof(ebuffer));
639 
640  VDBG((LM_DEBUG,"(%P|%t) DBG: "
641  "Hex Dump:\n%s", ebuffer));
642  }
643 
644  this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
645 
646  //
647  // Demarshal the sample header.
648  //
649  this->data_sample_header_ =
650  *this->receive_buffers_[this->buffer_index_];
651 
652  //
653  // Check the DataSampleHeader.
654  //
656 
657  //
658  // Set the amount to parse into the message buffer. We
659  // can't just use the header value to keep track of
660  // where we are since downstream processing will expect
661  // the value to be correct (unadjusted by us).
662  //
664  this->data_sample_header_.message_length();
665 
666  VDBG((LM_DEBUG,"(%P|%t) DBG: "
667  "The demarshalled sample header says that we "
668  "have %d bytes to read for the data portion of "
669  "the sample.\n",
671 
672  //
673  // Decrement packet size.
674  //
675  const size_t header_size =
676  this->data_sample_header_.get_serialized_size();
677  VDBG((LM_DEBUG,"(%P|%t) DBG: "
678  "this->data_sample_header_.get_serialized_size() == %d.\n",
679  header_size));
680 
681  this->pdu_remaining_ -= header_size;
682 
683  VDBG((LM_DEBUG,"(%P|%t) DBG: "
684  "Amount of transport packet remaining: %d.\n",
685  this->pdu_remaining_));
686 
687  const int rtn_code = skip_bad_pdus();
688  if (rtn_code <= 0) return rtn_code;
689  }
690  }
691 
692  bool last_buffer = false;
693  update_buffer_index(last_buffer);
694 
695  if (this->receive_sample_remaining_ > 0) {
696  //
697  // Manage the current sample data.
698  //
699  // This involves reading data to complete the current sample. As
700  // samples are completed, they are dispatched via the
701  // data_received() mechanism. This data is read into message
702  // blocks that are obtained from the pool of message blocks since
703  // the lifetime of this data will last until the DataReader
704  // components demarshal the sample data. A reference to the
705  // current sample being built is retained as a member to allow us
706  // to hold partially read samples until they are completed.
707  //
708  VDBG((LM_DEBUG,"(%P|%t) DBG: "
709  "Determine amount of data for the next block in the chain\n"));
710 
711  VDBG((LM_DEBUG,"(%P|%t) DBG: "
712  "this->buffer_index_ == %d.\n",this->buffer_index_));
713 
714  VDBG((LM_DEBUG,"(%P|%t) DBG: "
715  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
716  "== %u.\n",
717  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
718 
719  VDBG((LM_DEBUG,"(%P|%t) DBG: "
720  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
721  "== %u.\n",
722  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
723  //
724  // Determine the amount of data for the next block in the chain.
725  //
726  const size_t amount = ace_min<size_t>(
728  this->receive_buffers_[this->buffer_index_]->length());
729 
730  VDBG((LM_DEBUG,"(%P|%t) DBG: "
731  "amount of data for the next block in the chain is %d\n",
732  amount));
733  //
734  // Now create a message block for the data in the current buffer
735  // and chain it if we are starting a new sample.
736  //
737  ACE_Message_Block* current_sample_block = 0;
739  current_sample_block,
741  sizeof(ACE_Message_Block)),
743  this->receive_buffers_[this->buffer_index_]
744  ->data_block()->duplicate(),
745  0,
746  &this->mb_allocator_),
747  -1);
748 
749  //
750  // Chain it to the end of the current sample.
751  //
752  if (this->payload_ == 0) {
753  this->payload_ = current_sample_block;
754 
755  } else {
756  ACE_Message_Block* block = this->payload_;
757 
758  while (block->cont() != 0) {
759  block = block->cont();
760  }
761 
762  block->cont(current_sample_block);
763  }
764 
765  VDBG((LM_DEBUG,"(%P|%t) DBG: "
766  "Before adjustment of the pointers and byte counters\n"));
767 
768  VDBG((LM_DEBUG,"(%P|%t) DBG: "
769  "this->payload_->rd_ptr() "
770  "== %u "
771  "this->payload_->wr_ptr() "
772  "== %u.\n",
773  this->payload_->rd_ptr(),
774  this->payload_->wr_ptr()));
775 
776  //
777  // Adjust the pointers and byte counters.
778  //
779  current_sample_block->rd_ptr(
780  this->receive_buffers_[this->buffer_index_]->rd_ptr());
781  current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
782  this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
783  this->receive_sample_remaining_ -= amount;
784  this->pdu_remaining_ -= amount;
785 
786  VDBG((LM_DEBUG,"(%P|%t) DBG: "
787  "After adjustment of the pointers and byte counters\n"));
788 
789  VDBG((LM_DEBUG,"(%P|%t) DBG: "
790  "this->payload_->rd_ptr() "
791  "== %u "
792  "this->payload_->wr_ptr() "
793  "== %u.\n",
794  this->payload_->rd_ptr(),
795  this->payload_->wr_ptr()));
796 
797  VDBG((LM_DEBUG,"(%P|%t) DBG: "
798  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
799  "== %u.\n",
800  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
801 
802  VDBG((LM_DEBUG,"(%P|%t) DBG: "
803  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
804  "== %u.\n",
805  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
806 
807  VDBG((LM_DEBUG,"(%P|%t) DBG: "
808  "After adjustment, remaining sample bytes == %d and remaining transport packet bytes == %d\n",
809  this->receive_sample_remaining_,
810  this->pdu_remaining_));
811  }
812 
813  //
814  // Dispatch the received message if we have received it all.
815  //
816  // NB: Since we are doing this synchronously and without passing
817  // ownership of the sample, we can use NULL mutex lock for
818  // the allocators. Since only one thread can be in this
819  // routine at a time, that is.
820  //
821  if (this->receive_sample_remaining_ == 0) {
822  VDBG((LM_DEBUG,"(%P|%t) DBG: "
823  "Now dispatch the sample to the DataLink\n"));
824 
825  ReceivedDataSample rds = payload_ ? ReceivedDataSample(*payload_) : ReceivedDataSample();
826  if (payload_) {
827  payload_->release();
828  payload_ = 0;
829  }
830  if (this->data_sample_header_.into_received_data_sample(rds)) {
831 
832  if (this->data_sample_header_.more_fragments()
833  || this->receive_transport_header_.last_fragment()) {
834  VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
835 
836  if (this->reassemble(rds)) {
837  VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
838  this->deliver_sample(rds, remote_address);
839  }
840  // If reassemble() returned false, it takes ownership of the data
841  // just like deliver_sample() does.
842 
843  } else {
844  this->deliver_sample(rds, remote_address);
845  }
846  }
847 
848  // For the reassembly algorithm, the 'last_fragment_' header bit only
849  // applies to the first DataSampleHeader in the TransportHeader
850  this->receive_transport_header_.last_fragment(false);
851 
852  VDBG((LM_DEBUG,"(%P|%t) DBG: "
853  "Release the sample that we just sent.\n"));
854  // ~ReceivedDataSample() releases the payload_ message block
855  }
856 
857  update_buffer_index(last_buffer);
858 
859  if (last_buffer) {
860  // Relinquish control if there is no more data to process.
861  VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n"));
862  return 0;
863  }
864 
865  } // End of while (this->pdu_remaining_ > 0)
866 
867  VDBG((LM_DEBUG,"(%P|%t) DBG: "
868  "Let's try to do some more.\n"));
869  } // End of while (true)
870 
871  VDBG((LM_DEBUG,"(%P|%t) DBG: "
872  "It looks like we are done - the done loop has finished.\n"));
873  //
874  // Relinquish control.
875  //
876  // This involves ensuring that when we reenter this method, we will
877  // pick up from where we left off correctly.
878  //
879  return 0;
880 }
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
int skip_bad_pdus()
Ignore bad PDUs by skipping over them.
static const ACE_Time_Value max_time
TH receive_transport_header_
Current receive TransportHeader.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
size_t length(void) const
virtual bool check_header(const TH &header)
Check the transport header for suitability.
int ssize_t
char * rd_ptr(void) const
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
size_t receive_sample_remaining_
Bytes remaining in the current DataSample.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
DSH data_sample_header_
Current data sample header.
size_t successor_index(size_t index) const
Manage an index into the receive buffer array.
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0
Called when there is a ReceivedDataSample to be delivered.
#define VDBG(DBG_ARGS)
char ACE_TCHAR
virtual ACE_Message_Block * release(void)
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)=0
Only our subclass knows how to do this.
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
void free(void *ptr)
Return a chunk of memory back to free list cache.
ACE_TEXT("TCP_Factory")
size_t format_hexdump(const char *buffer, size_t size, ACE_TCHAR *obuf, size_t obuf_sz)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
#define ACE_ERROR_RETURN(X, Y)
virtual bool reassemble(ReceivedDataSample &data)
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.
size_t buffer_index_
Current receive buffer index in use.

◆ OPENDDS_VECTOR()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::OPENDDS_VECTOR ( ACE_Message_Block )
protected

Set of receive buffers in use.

◆ pdu_remaining()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining ( ) const
inlineprotected

Definition at line 130 of file TransportReceiveStrategy_T.h.

130 { return this->pdu_remaining_; }
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.

◆ reassemble()

template<typename TH , typename DSH >
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble ( ReceivedDataSample data)
protectedvirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::UdpReceiveStrategy, and OpenDDS::DCPS::MulticastReceiveStrategy.

Definition at line 884 of file TransportReceiveStrategy_T.cpp.

885 {
886  ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
887  "WARNING: derived class must override if specific transport type uses "
888  "fragmentation and reassembly\n"));
889  return false;
890 }
#define ACE_DEBUG(X)

◆ receive_bytes()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual ssize_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd,
bool &  stop 
)
protectedpure virtual

◆ received_header() [1/2]

template<typename TH , typename DSH >
ACE_INLINE const TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header ( ) const

◆ received_header() [2/2]

template<typename TH , typename DSH >
ACE_INLINE TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header ( )

Definition at line 36 of file TransportReceiveStrategy_T.inl.

37 {
38  return this->receive_transport_header_;
39 }
TH receive_transport_header_
Current receive TransportHeader.

◆ received_sample_header() [1/2]

template<typename TH , typename DSH >
ACE_INLINE const DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header ( ) const

Provides access to the received sample header for subclasses.

Definition at line 43 of file TransportReceiveStrategy_T.inl.

44 {
45  return this->data_sample_header_;
46 }
DSH data_sample_header_
Current data sample header.

◆ received_sample_header() [2/2]

template<typename TH , typename DSH >
ACE_INLINE DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header ( )

Definition at line 50 of file TransportReceiveStrategy_T.inl.

51 {
52  return this->data_sample_header_;
53 }
DSH data_sample_header_
Current data sample header.

◆ relink()

template<typename TH , typename DSH >
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink ( bool  do_suspend = true)
virtual

The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.

Reimplemented in OpenDDS::DCPS::TcpReceiveStrategy.

Definition at line 64 of file TransportReceiveStrategy_T.inl.

65 {
66  // The subclass needs implement this function for re-establishing
67  // the link upon recv failure.
68 }

◆ reset()

template<typename TH , typename DSH >
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset ( void  )
protected

For datagram-based derived classes, reset() can be called to clear any state that may be remaining from parsing the previous datagram.

Definition at line 894 of file TransportReceiveStrategy_T.cpp.

895 {
896  this->receive_sample_remaining_ = 0;
898  this->payload_ = 0;
899  this->good_pdu_ = true;
900  this->pdu_remaining_ = 0;
901  for (size_t i = 0; i < receive_buffers_.size(); ++i) {
902  ACE_Message_Block& rb = *this->receive_buffers_[i];
903  rb.rd_ptr(rb.wr_ptr());
904  }
905 }
char * rd_ptr(void) const
size_t receive_sample_remaining_
Bytes remaining in the current DataSample.
virtual ACE_Message_Block * release(void)
char * wr_ptr(void) const
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.

◆ skip_bad_pdus()

template<typename TH , typename DSH >
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus ( )
protected

Ignore bad PDUs by skipping over them.

Definition at line 931 of file TransportReceiveStrategy_T.cpp.

932 {
933  if (this->good_pdu_) return 1;
934 
935  //
936  // Adjust the message block chain pointers to account for the
937  // skipped data.
938  //
939  for (size_t index = this->buffer_index_;
940  this->pdu_remaining_ > 0;
941  index = this->successor_index(index)) {
942  const size_t amount =
943  ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
944 
945  this->receive_buffers_[index]->rd_ptr(amount);
946  this->pdu_remaining_ -= amount;
947 
948  if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
949  ACE_ERROR_RETURN((LM_ERROR,
950  ACE_TEXT("(%P|%t) ERROR: ")
951  ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
952  ACE_TEXT(" - Unrecoverably corrupted ")
953  ACE_TEXT("receive buffer management detected: ")
954  ACE_TEXT("read more bytes than available.\n")),
955  -1);
956  }
957  }
958 
959  this->receive_sample_remaining_ = 0;
960 
961  bool done = false;
962  update_buffer_index(done);
963  return done ? 0 : 1;
964 }
size_t receive_sample_remaining_
Bytes remaining in the current DataSample.
size_t successor_index(size_t index) const
Manage an index into the receive buffer array.
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
ACE_TEXT("TCP_Factory")
#define ACE_ERROR_RETURN(X, Y)
size_t buffer_index_
Current receive buffer index in use.

◆ start()

template<typename TH , typename DSH >
ACE_INLINE int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start ( void  )
virtual

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 13 of file TransportReceiveStrategy_T.inl.

14 {
15  DBG_ENTRY_LVL("TransportReceiveStrategy","start",6);
16  return this->start_i();
17 }
virtual int start_i()=0
Let the subclass start.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ start_i()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i ( )
protectedpure virtual

◆ stop()

template<typename TH , typename DSH >
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop ( void  )
virtual

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 21 of file TransportReceiveStrategy_T.inl.

22 {
23  DBG_ENTRY_LVL("TransportReceiveStrategy","stop",6);
24  this->stop_i();
25 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual void stop_i()=0
Let the subclass stop.

◆ stop_i()

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i ( )
protectedpure virtual

◆ successor_index()

template<typename TH , typename DSH >
ACE_INLINE size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index ( size_t  index) const
protected

Manage an index into the receive buffer array.

Definition at line 57 of file TransportReceiveStrategy_T.inl.

58 {
59  return ++index % RECEIVE_BUFFERS;
60 }

◆ to_msgblock()

template<typename TH , typename DSH >
ACE_Message_Block * OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::to_msgblock ( const ReceivedDataSample sample)

Use the receive strategy's Message Block Allocator to convert the ReceivedDataSample's payload to an ACE_Message_Block chain

Definition at line 968 of file TransportReceiveStrategy_T.cpp.

Referenced by OpenDDS::DCPS::ShmemDataLink::request_ack_received().

969 {
970  return sample.data(&mb_allocator_);
971 }

◆ update_buffer_index()

template<typename TH , typename DSH >
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index ( bool &  done)
protected

Definition at line 909 of file TransportReceiveStrategy_T.cpp.

910 {
911  VDBG((LM_DEBUG,"(%P|%t) DBG: "
912  "Adjust the buffer chain in case we crossed into the next "
913  "buffer after the last read(s).\n"));
914  const size_t initial = this->buffer_index_;
915  while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
916  this->buffer_index_ = this->successor_index(this->buffer_index_);
917 
918  VDBG((LM_DEBUG,"(%P|%t) DBG: "
919  "Set this->buffer_index_ = %d.\n",
920  this->buffer_index_));
921 
922  if (initial == this->buffer_index_) {
923  done = true; // no other buffers in receive_buffers_ have data
924  return;
925  }
926  }
927 }
size_t successor_index(size_t index) const
Manage an index into the receive buffer array.
#define VDBG(DBG_ARGS)
size_t buffer_index_
Current receive buffer index in use.

Member Data Documentation

◆ buffer_index_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_
protected

Current receive buffer index in use.

Definition at line 161 of file TransportReceiveStrategy_T.h.

◆ data_allocator_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_
protected

Definition at line 152 of file TransportReceiveStrategy_T.h.

◆ data_sample_header_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
DSH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_
protected

Current data sample header.

Definition at line 164 of file TransportReceiveStrategy_T.h.

◆ db_allocator_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_
protected

Definition at line 151 of file TransportReceiveStrategy_T.h.

◆ good_pdu_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_
protected

Flag indicating that the currently resident PDU is a good one (i.e. has not been received and processed previously). This is included in case we receive PDUs that were resent for reliability reasons and we receive one even if we have already processed it. This is a use case from multicast transports.

Definition at line 174 of file TransportReceiveStrategy_T.h.

◆ gracefully_disconnected_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_
protected

Flag indicates if the GRACEFUL_DISCONNECT message is received.

Definition at line 133 of file TransportReceiveStrategy_T.h.

◆ mb_allocator_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportMessageBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_
protected

Definition at line 150 of file TransportReceiveStrategy_T.h.

◆ payload_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_
protected

Definition at line 166 of file TransportReceiveStrategy_T.h.

◆ pdu_remaining_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_
protected

Amount of the current PDU that has not been processed yet.

Definition at line 177 of file TransportReceiveStrategy_T.h.

◆ receive_lock_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_
protected

Locking strategy for the allocators.

Definition at line 155 of file TransportReceiveStrategy_T.h.

◆ receive_sample_remaining_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_
protected

Bytes remaining in the current DataSample.

Definition at line 143 of file TransportReceiveStrategy_T.h.

◆ receive_transport_header_

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_
protected

Current receive TransportHeader.

Definition at line 146 of file TransportReceiveStrategy_T.h.


The documentation for this class was generated from the following files: