OpenDDS  Snapshot(2023/04/28-20:55)
TransportReceiveStrategy_T.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 
10 #include "TransportInst.h"
11 
12 #include "ace/INET_Addr.h"
13 #include "ace/Min_Max.h"
14 
15 #if !defined (__ACE_INLINE__)
17 #endif /* __ACE_INLINE__ */
18 
20 
21 namespace OpenDDS {
22 namespace DCPS {
23 
24 template<typename TH, typename DSH>
26  size_t receive_buffers_count)
27  : gracefully_disconnected_(false),
28  receive_sample_remaining_(0),
29  mb_allocator_((config && config->receive_preallocated_message_blocks_) ? config->receive_preallocated_message_blocks_ : MESSAGE_BLOCKS),
30  db_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : DATA_BLOCKS),
31  data_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : receive_buffers_count * 2),
32  receive_buffers_(receive_buffers_count, 0),
33  buffer_index_(0),
34  payload_(0),
35  good_pdu_(true),
36  pdu_remaining_(0)
37 {
38  DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
39 
40  if (Transport_debug_level >= 2) {
41  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
42  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
43  &mb_allocator_, mb_allocator_.n_chunks()));
44  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
45  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
46  &db_allocator_, db_allocator_.n_chunks()));
47  ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
48  " Cached_Allocator_With_Overflow %@ with %B chunks\n",
49  &data_allocator_, data_allocator_.n_chunks()));
50  }
51 }
52 
53 template<typename TH, typename DSH>
55 {
56  DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
57 
58  if (this->receive_buffers_[this->buffer_index_] != 0) {
59  size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
60 
61  if (size > 0) {
63  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
64  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
65  size));
66  }
67  }
68 
69  for (size_t index = 0; index < receive_buffers_.size(); ++index) {
70  if (receive_buffers_[index] != 0) {
72  receive_buffers_[index],
73  mb_allocator_.free,
75  }
76  }
77 }
78 
79 template<typename TH, typename DSH>
80 bool
82 {
83  return true;
84 }
85 
86 template<typename TH, typename DSH>
87 bool
89 {
90  return true;
91 }
92 
93 /// Note that this is just an initial implementation. We may take
94 /// some shortcuts (we will) that will need to be dealt with later once
95 /// a more robust implementation can be put in place.
96 ///
97 /// Our handle_dds_input() method is called by the reactor when there is
98 /// data to be pulled from our peer() ACE_SOCK_Stream.
99 template<typename TH, typename DSH>
100 int
102 {
103  DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
104 
105  //
106  // What we will be doing here:
107  //
108  // 1) Read as much data as possible from the handle.
109  // 2) Process each Transport layer packet
110  // A) Parse the Transport header
111  // B) Process each DataSample in the packet
112  // a) Parse the DataSampleHeader
113  // b) Parse the remainder of the sample
114  // c) call data_received for complete samples
115  // d) store any partial sample or header data
116  // 3) return
117  //
118  // The state that we might need to maintain includes:
119  // I) Current Transport layer header
120  // II) Current DataSample header
121  // III) Current DataSample data
122  //
123  // For each of these elements, they can be:
124  // i) empty
125  // ii) partial (not able to parse yet).
126  // iii) complete (parsed values available).
127  //
128  // The read buffers will be a series of data buffers of a fixed size
129  // that are each managed by an ACE_Data_Block, each of which is
130  // managed by an ACE_Message_Block containing the read and write
131  // pointers into the data.
132  //
133  // As messages are parsed, new ACE_Message_Blocks will be formed with
134  // read and write pointers adjusted to reference the individual
135  // DataSample buffer contents.
136  //
137  // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
138  // will be acquired from a Cached_Allocater_with_overflow. The size
139  // of the individual data buffers will be set initially to the
140  // ethernet MTU to allow for full TCP messages to be received into
141  // individual blocks.
142  //
143  // Reading will be performed with a two member struct iov, to allow
144  // for the remainder of the current buffer to be used along with at
145  // least one completely empty buffer. There will be a low water
146  // parameter used to stop the use of the current block when the
147  // available space in it is reduced.
148  //
149 
150  //
151  // Establish the current receive buffer.
152  //
153  // This involves checking for any remainder from the previous trip
154  // through the code. If there is no current buffer, or the buffer
155  // has less than the low water mark space() left, then promote the
156  // next receive buffer to the current. If none is present, create a
157  // new one and use it.
158  //
159  //
160  // Establish the next receive buffer.
161  //
162  // This involves checking for any previous complete buffers that
163  // were not promoted in the previous step. If none are present,
164  // create a new one and use that.
165  //
166 
167  //
168  // Remove any buffers that have been completely read and have less
169  // than the low water amount of space left.
170  //
171  size_t index;
172 
173  for (index = 0; index < receive_buffers_.size(); ++index) {
174  if ((this->receive_buffers_[index] != 0)
175  && (this->receive_buffers_[index]->length() == 0)
176  && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
177  VDBG((LM_DEBUG,"(%P|%t) DBG: "
178  "Remove a receive_buffer_[%d] from use.\n",
179  index));
180 
181  // unlink any Message_Block that continues to this one
182  // being removed.
183  // This avoids a possible infinite ->cont() loop.
184  for (size_t ii =0; ii < receive_buffers_.size(); ii++) {
185  if ((0 != this->receive_buffers_[ii]) &&
186  (this->receive_buffers_[ii]->cont() ==
187  this->receive_buffers_[index])) {
188  this->receive_buffers_[ii]->cont(0);
189  }
190  }
191 
192  //
193  // Remove the receive buffer from use.
194  //
195  ACE_DES_FREE(
196  this->receive_buffers_[index],
197  this->mb_allocator_.free,
199  this->receive_buffers_[index] = 0;
200  }
201  }
202 
203  //
204  // Allocate buffers for any empty slots. We may have emptied one just
205  // here, but others may have been emptied by a large read during the
206  // last trip through the code as well.
207  //
208  size_t previous = this->buffer_index_;
209 
210  for (index = this->buffer_index_;
211  this->successor_index(previous) != this->buffer_index_;
212  index = this->successor_index(index)) {
213  if (this->receive_buffers_[index] == 0) {
214  VDBG((LM_DEBUG,"(%P|%t) DBG: "
215  "Allocate a Message_Block for new receive_buffer_[%d].\n",
216  index));
217 
219  this->receive_buffers_[index],
220  (ACE_Message_Block*) this->mb_allocator_.malloc(
221  sizeof(ACE_Message_Block)),
223  RECEIVE_DATA_BUFFER_SIZE, // Buffer size
224  ACE_Message_Block::MB_DATA, // Default
225  0, // Start with no continuation
226  0, // Let the constructor allocate
227  &this->data_allocator_, // Our buffer cache
228  &this->receive_lock_, // Our locking strategy
230  ACE_Time_Value::zero, // Default
231  ACE_Time_Value::max_time, // Default
232  &this->db_allocator_, // Our data block cache
233  &this->mb_allocator_ // Our message block cache
234  ),
235  -1);
236  }
237 
238  //
239  // Chain the buffers. This allows us to have portions of parsed
240  // data cross buffer boundaries without a problem.
241  //
242  if (previous != index) {
243  this->receive_buffers_[previous]->cont(
244  this->receive_buffers_[index]);
245  }
246 
247  previous = index;
248  }
249 
250  //
251  // Read from handle.
252  //
253  // This involves a non-blocking recvv_n(). Any remaining space in
254  // the buffers should be saved for the next pass through the code.
255  // If more than one receive buffer has data, chain them. Promote
256  // all receive buffers so that the next pass through, the first
257  // buffer with space remaining will appear as the current receive
258  // buffer.
259  //
260  // This is probably what should remain in the specific
261  // implementation, with the rest of this factored back up into the
262  // framework.
263  //
264 
265  VDBG((LM_DEBUG,"(%P|%t) DBG: "
266  "Form the iovec from the message block\n"));
267 
268  //
269  // Form the iovec from the message block chain of receive buffers.
270  //
271  iovec iov[RECEIVE_BUFFERS];
272  size_t vec_index = 0;
273  size_t current = this->buffer_index_;
274 
275  for (index = 0;
276  index < receive_buffers_.size();
277  ++index, current = this->successor_index(current)) {
278  // Invariant. ASSERT?
279  if (this->receive_buffers_[current] == 0) {
281  ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
282  ACE_TEXT("receive buffer management detected.\n")),
283  -1);
284  }
285 
286 #ifdef _MSC_VER
287 #pragma warning(push)
288 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
289 // since on other platforms iov_len is 64-bit
290 #pragma warning(disable : 4267)
291 #endif
292  // This check covers the case where we have unread data in
293  // the first buffer, but no space to write any more data.
294  if (this->receive_buffers_[current]->space() > 0) {
295  iov[vec_index].iov_len = this->receive_buffers_[current]->space();
296  iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
297 
298  VDBG((LM_DEBUG,"(%P|%t) DBG: "
299  "index==%d, len==%d, base==%x\n",
300  vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
301 
302  vec_index++;
303  }
304  }
305 
306 #ifdef _MSC_VER
307 #pragma warning(pop)
308 #endif
309 
310  VDBG((LM_DEBUG,"(%P|%t) DBG: "
311  "Perform the recvv() call\n"));
312 
313  //
314  // Read into the buffers.
315  //
316  ACE_INET_Addr remote_address;
317  bool stop = false;
318  const ssize_t bytes_remaining = this->receive_bytes(iov,
319  static_cast<int>(vec_index),
320  remote_address,
321  fd,
322  stop);
323 
324  if (stop) {
325  return 0;
326  }
327 
328  if (bytes_remaining < 0) {
329  ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
330  ACE_TEXT("with data link detected: %p.\n"),
331  ACE_TEXT("receive_bytes")));
332 
333  // The relink() will handle the connection to the ReconnectTask to do
334  // the reconnect so this reactor thread will not be block.
335  this->relink();
336 
337  // Close connection anyway.
338  return -1;
339  // Returning -1 takes the handle out of the reactor read mask.
340  }
341 
342  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
343  "recvv() return %d - we call this the bytes_remaining.\n",
344  bytes_remaining), 5);
345 
346  if (bytes_remaining == 0) {
347  if (this->gracefully_disconnected_) {
348  VDBG_LVL((LM_DEBUG, "(%P|%t) Peer has gracefully disconnected.\n"), 1);
349  return -1;
350 
351  } else {
352  VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ")
353  ACE_TEXT("with data link detected\n")), 1);
354 
355  // The relink() will handle the connection to the ReconnectTask to do
356  // the reconnect so this reactor thread will not be block.
357  this->relink();
358 
359  // Close connection anyway.
360  return -1;
361  // Returning -1 takes the handle out of the reactor read mask.
362  }
363  }
364 
365  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
366  "START Adjust the message block chain pointers to account for the "
367  "new data.\n"), 5);
368 
369  //
370  // Adjust the message block chain pointers to account for the new
371  // data.
372  //
373  size_t bytes = bytes_remaining;
374 
375  if (!this->pdu_remaining_) {
376  this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
377  }
378 
379  for (index = this->buffer_index_;
380  bytes > 0;
381  index = this->successor_index(index)) {
382  VDBG((LM_DEBUG,"(%P|%t) DBG: -> "
383  "At top of for..loop block.\n"));
384 
385  const size_t amount = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
386 
387  VDBG((LM_DEBUG,"(%P|%t) DBG: "
388  "index == %d bytes == %d amount == %d.\n",
389  index, bytes, amount));
390 
391  VDBG((LM_DEBUG,"(%P|%t) DBG: "
392  "this->receive_buffers_[index]->rd_ptr() == %u.\n",
393  this->receive_buffers_[index]->rd_ptr()));
394  VDBG((LM_DEBUG,"(%P|%t) DBG: "
395  "this->receive_buffers_[index]->wr_ptr() == %u.\n",
396  this->receive_buffers_[index]->wr_ptr()));
397 
398  this->receive_buffers_[index]->wr_ptr(amount);
399 
400  VDBG((LM_DEBUG,"(%P|%t) DBG: "
401  "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n",
402  this->receive_buffers_[index]->wr_ptr()));
403 
404  bytes -= amount;
405 
406  VDBG((LM_DEBUG,"(%P|%t) DBG: "
407  "Now, bytes == %d .\n", bytes));
408 
409  // This is yukky to do here, but there is a fine line between
410  // where things are moved and where they are checked.
411  if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
412  // Here we have read more data than we passed in buffer. Bad.
414  ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
415  ACE_TEXT("receive buffer management detected: ")
416  ACE_TEXT("read more bytes than available.\n")),
417  -1);
418  }
419  }
420 
421  VDBG((LM_DEBUG,"(%P|%t) DBG: "
422  "DONE Adjust the message block chain pointers to account "
423  "for the new data.\n"));
424 
425  //
426  // While the receive buffer is not empty:
427  //
428  // This is the receive buffer(s) that was just read. The receive
429  // buffer message block is used to manage our parsing through this
430  // data. When we have parsed out all the data, then it will be
431  // considered empty. This message block read pointer indicates
432  // where we are in the parsing process. Other message blocks refer
433  // to the parsed data and retain the data block references to
434  // prevent the data from being released until we have completed all
435  // uses for the data in the receive buffer. This will normally be
436  // after the sample data is demarshaled in the DataReader
437  // components.
438  //
439  VDBG((LM_DEBUG,"(%P|%t) DBG: "
440  "Start processing the data we just received.\n"));
441 
442  //
443  // Operate while we have more data to process.
444  //
445  while (true) {
446 
447  //
448  // Manage the current transport header.
449  //
450  // This involves checking to see if we have remaining data in the
451  // current packet. If not, or if we have not read a complete
452  // header, then we read a transport header and check it for
453  // validity. As we have a valid transport header with data
454  // remaining to read in the packet that it represents, we move on.
455  //
456  // As new transport headers are to be read, they are read into a
457  // message buffer member variable and demarshaled directly. The
458  // values are retained for the lifetime of the processing of the
459  // instant transport packet. The member message buffer allows us
460  // to retain partially read transport headers until we can read
461  // more data.
462  //
463 
464  //
465  // Check the remaining transport packet length to see if we are
466  // expecting a new one.
467  //
468  if (this->pdu_remaining_ == 0) {
469  VDBG((LM_DEBUG,"(%P|%t) DBG: "
470  "We are expecting a transport packet header.\n"));
471 
472  VDBG((LM_DEBUG,"(%P|%t) DBG: "
473  "this->buffer_index_ == %d.\n",this->buffer_index_));
474 
475  VDBG((LM_DEBUG,"(%P|%t) DBG: "
476  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
477  "== %u.\n",
478  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
479 
480  VDBG((LM_DEBUG,"(%P|%t) DBG: "
481  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
482  "== %u.\n",
483  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
484 
485  if (this->receive_buffers_[this->buffer_index_]->total_length()
486  < this->receive_transport_header_.get_max_serialized_size()) {
487  //
488  // Not enough room in the buffer for the entire Transport
489  // header that we need to read, so relinquish control until
490  // we get more data.
491  //
492  VDBG((LM_DEBUG,"(%P|%t) DBG: "
493  "Not enough bytes read to account for a transport "
494  "packet header. We are done here - we need to "
495  "receive more bytes.\n"));
496 
497  this->receive_transport_header_.incomplete(
498  *this->receive_buffers_[this->buffer_index_]);
499 
500  return 0;
501 
502  } else {
503  VDBG((LM_DEBUG,"(%P|%t) DBG: "
504  "We have enough bytes to demarshall the transport "
505  "packet header.\n"));
506 
507  // only do the hexdump if it will be printed - to not impact performance.
508  if (Transport_debug_level > 5) {
509  ACE_TCHAR xbuffer[4096];
510  const ACE_Message_Block& mb =
511  *this->receive_buffers_[this->buffer_index_];
512  size_t xbytes = mb.length();
513 
514  xbytes = (std::min)(xbytes, TH::get_max_serialized_size());
515 
516  ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
517 
518  VDBG((LM_DEBUG,"(%P|%t) DBG: "
519  "Hex Dump of transport header block "
520  "(%d bytes):\n%s", xbytes, xbuffer));
521  }
522 
523  //
524  // Demarshal the transport header.
525  //
526  this->receive_transport_header_ =
527  *this->receive_buffers_[this->buffer_index_];
528 
529  //
530  // Check the TransportHeader.
531  //
532  if (!this->receive_transport_header_.valid()) {
534  ACE_TEXT
535  ("(%P|%t) ERROR: TransportHeader invalid.\n")),
536  -1);
537  }
538 
539  this->good_pdu_ = check_header(this->receive_transport_header_);
540  this->pdu_remaining_ = this->receive_transport_header_.length_;
541 
542  VDBG((LM_DEBUG,"(%P|%t) DBG: "
543  "Amount of transport packet bytes (remaining): %d.\n",
544  this->pdu_remaining_));
545  }
546 
547  // The receive_transport_header_ assignment advanced the read index
548  // that can be across the buffers. Advance the buffer index.
549  bool last_buffer = false;
550  update_buffer_index(last_buffer);
551  }
552 
553  //
554  // Ignore bad PDUs by skipping over them. We do this out here
555  // in case we did not skip over the entire bad PDU last time.
556  //
557  {
558  int rtn_code = skip_bad_pdus();
559  if (rtn_code <= 0) return rtn_code;
560  }
561 
562  //
563  // Keep processing samples while we have data to read.
564  //
565  while (this->pdu_remaining_ > 0) {
566  VDBG((LM_DEBUG,"(%P|%t) DBG: "
567  "We have a transport packet now. There are more sample "
568  "bytes to parse in order to complete the packet.\n"));
569 
570  //
571  // Manage the current sample header.
572  //
573  // This involves checking to see if we have remaining data in the
574  // current sample. If not, or if we have not read a complete
575  // header, then we read a sample header and check it for validity,
576  // which currently involves checking the message type only. As we
577  // have a valid sample header with data remaining to read in the
578  // sample that it represents, we move on.
579  //
580  // As new sample headers are read, the are read into a message
581  // buffer member variable and demarshaled directly. The values are
582  // retained for the lifetime of the sample and are passed as part
583  // of the receive data sample itself. The member message buffer
584  // allows us to retain partially read sample headers until we can
585  // read more data.
586  //
587  if (this->receive_sample_remaining_ == 0) {
588  VDBG((LM_DEBUG,"(%P|%t) DBG: "
589  "We are not working on some remaining sample data. "
590  "We are expecting to parse a sample header now.\n"));
591 
592  VDBG((LM_DEBUG,"(%P|%t) DBG: "
593  "this->buffer_index_ == %d.\n",this->buffer_index_));
594 
595  VDBG((LM_DEBUG,"(%P|%t) DBG: "
596  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
597  "== %u.\n",
598  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
599 
600  VDBG((LM_DEBUG,"(%P|%t) DBG: "
601  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
602  "== %u.\n",
603  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
604 
605  if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
606  //
607  // Not enough room in the buffer for the entire Sample
608  // header that we need to read, so relinquish control until
609  // we get more data.
610  //
611  VDBG((LM_DEBUG,"(%P|%t) DBG: "
612  "Not enough bytes have been received to account "
613  "for a complete sample header. We are done for "
614  "now - we need to receive more data before we "
615  "can go on.\n"));
616  if (Transport_debug_level > 2) {
617  ACE_TCHAR ebuffer[350];
618  ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
619  const size_t sz = (std::min)(DSH::get_max_serialized_size(), mb.length());
620  ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
621  ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: "
622  "Partial DataSampleHeader:\n%s\n", ebuffer));
623  }
624 
625  return 0;
626 
627  } else {
628  VDBG((LM_DEBUG,"(%P|%t) DBG: "
629  "We have received enough bytes for the sample "
630  "header. Demarshall the sample header now.\n"));
631 
632  // only do the hexdump if it will be printed - to not impact performance.
633  if (Transport_debug_level > 5) {
634  ACE_TCHAR ebuffer[4096];
636  this->receive_buffers_[this->buffer_index_]->rd_ptr(),
637  this->data_sample_header_.get_max_serialized_size(),
638  ebuffer, sizeof(ebuffer));
639 
640  VDBG((LM_DEBUG,"(%P|%t) DBG: "
641  "Hex Dump:\n%s", ebuffer));
642  }
643 
644  this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
645 
646  //
647  // Demarshal the sample header.
648  //
649  this->data_sample_header_ =
650  *this->receive_buffers_[this->buffer_index_];
651 
652  //
653  // Check the DataSampleHeader.
654  //
655  this->good_pdu_ = check_header(data_sample_header_);
656 
657  //
658  // Set the amount to parse into the message buffer. We
659  // can't just use the header value to keep track of
660  // where we are since downstream processing will expect
661  // the value to be correct (unadjusted by us).
662  //
663  this->receive_sample_remaining_ =
664  this->data_sample_header_.message_length();
665 
666  VDBG((LM_DEBUG,"(%P|%t) DBG: "
667  "The demarshalled sample header says that we "
668  "have %d bytes to read for the data portion of "
669  "the sample.\n",
670  this->receive_sample_remaining_));
671 
672  //
673  // Decrement packet size.
674  //
675  const size_t header_size =
676  this->data_sample_header_.get_serialized_size();
677  VDBG((LM_DEBUG,"(%P|%t) DBG: "
678  "this->data_sample_header_.get_serialized_size() == %d.\n",
679  header_size));
680 
681  this->pdu_remaining_ -= header_size;
682 
683  VDBG((LM_DEBUG,"(%P|%t) DBG: "
684  "Amount of transport packet remaining: %d.\n",
685  this->pdu_remaining_));
686 
687  const int rtn_code = skip_bad_pdus();
688  if (rtn_code <= 0) return rtn_code;
689  }
690  }
691 
692  bool last_buffer = false;
693  update_buffer_index(last_buffer);
694 
695  if (this->receive_sample_remaining_ > 0) {
696  //
697  // Manage the current sample data.
698  //
699  // This involves reading data to complete the current sample. As
700  // samples are completed, they are dispatched via the
701  // data_received() mechanism. This data is read into message
702  // blocks that are obtained from the pool of message blocks since
703  // the lifetime of this data will last until the DataReader
704  // components demarshal the sample data. A reference to the
705  // current sample being built is retained as a member to allow us
706  // to hold partially read samples until they are completed.
707  //
708  VDBG((LM_DEBUG,"(%P|%t) DBG: "
709  "Determine amount of data for the next block in the chain\n"));
710 
711  VDBG((LM_DEBUG,"(%P|%t) DBG: "
712  "this->buffer_index_ == %d.\n",this->buffer_index_));
713 
714  VDBG((LM_DEBUG,"(%P|%t) DBG: "
715  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
716  "== %u.\n",
717  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
718 
719  VDBG((LM_DEBUG,"(%P|%t) DBG: "
720  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
721  "== %u.\n",
722  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
723  //
724  // Determine the amount of data for the next block in the chain.
725  //
726  const size_t amount = ace_min<size_t>(
727  this->receive_sample_remaining_,
728  this->receive_buffers_[this->buffer_index_]->length());
729 
730  VDBG((LM_DEBUG,"(%P|%t) DBG: "
731  "amount of data for the next block in the chain is %d\n",
732  amount));
733  //
734  // Now create a message block for the data in the current buffer
735  // and chain it if we are starting a new sample.
736  //
737  ACE_Message_Block* current_sample_block = 0;
739  current_sample_block,
740  (ACE_Message_Block*) this->mb_allocator_.malloc(
741  sizeof(ACE_Message_Block)),
743  this->receive_buffers_[this->buffer_index_]
744  ->data_block()->duplicate(),
745  0,
746  &this->mb_allocator_),
747  -1);
748 
749  //
750  // Chain it to the end of the current sample.
751  //
752  if (this->payload_ == 0) {
753  this->payload_ = current_sample_block;
754 
755  } else {
756  ACE_Message_Block* block = this->payload_;
757 
758  while (block->cont() != 0) {
759  block = block->cont();
760  }
761 
762  block->cont(current_sample_block);
763  }
764 
765  VDBG((LM_DEBUG,"(%P|%t) DBG: "
766  "Before adjustment of the pointers and byte counters\n"));
767 
768  VDBG((LM_DEBUG,"(%P|%t) DBG: "
769  "this->payload_->rd_ptr() "
770  "== %u "
771  "this->payload_->wr_ptr() "
772  "== %u.\n",
773  this->payload_->rd_ptr(),
774  this->payload_->wr_ptr()));
775 
776  //
777  // Adjust the pointers and byte counters.
778  //
779  current_sample_block->rd_ptr(
780  this->receive_buffers_[this->buffer_index_]->rd_ptr());
781  current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
782  this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
783  this->receive_sample_remaining_ -= amount;
784  this->pdu_remaining_ -= amount;
785 
786  VDBG((LM_DEBUG,"(%P|%t) DBG: "
787  "After adjustment of the pointers and byte counters\n"));
788 
789  VDBG((LM_DEBUG,"(%P|%t) DBG: "
790  "this->payload_->rd_ptr() "
791  "== %u "
792  "this->payload_->wr_ptr() "
793  "== %u.\n",
794  this->payload_->rd_ptr(),
795  this->payload_->wr_ptr()));
796 
797  VDBG((LM_DEBUG,"(%P|%t) DBG: "
798  "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
799  "== %u.\n",
800  this->receive_buffers_[this->buffer_index_]->rd_ptr()));
801 
802  VDBG((LM_DEBUG,"(%P|%t) DBG: "
803  "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
804  "== %u.\n",
805  this->receive_buffers_[this->buffer_index_]->wr_ptr()));
806 
807  VDBG((LM_DEBUG,"(%P|%t) DBG: "
808  "After adjustment, remaining sample bytes == %d and remaining transport packet bytes == %d\n",
809  this->receive_sample_remaining_,
810  this->pdu_remaining_));
811  }
812 
813  //
814  // Dispatch the received message if we have received it all.
815  //
816  // NB: Since we are doing this synchronously and without passing
817  // ownership of the sample, we can use NULL mutex lock for
818  // the allocators. Since only one thread can be in this
819  // routine at a time, that is.
820  //
821  if (this->receive_sample_remaining_ == 0) {
822  VDBG((LM_DEBUG,"(%P|%t) DBG: "
823  "Now dispatch the sample to the DataLink\n"));
824 
825  ReceivedDataSample rds = payload_ ? ReceivedDataSample(*payload_) : ReceivedDataSample();
826  if (payload_) {
827  payload_->release();
828  payload_ = 0;
829  }
830  if (this->data_sample_header_.into_received_data_sample(rds)) {
831 
832  if (this->data_sample_header_.more_fragments()
833  || this->receive_transport_header_.last_fragment()) {
834  VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
835 
836  if (this->reassemble(rds)) {
837  VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
838  this->deliver_sample(rds, remote_address);
839  }
840  // If reassemble() returned false, it takes ownership of the data
841  // just like deliver_sample() does.
842 
843  } else {
844  this->deliver_sample(rds, remote_address);
845  }
846  }
847 
848  // For the reassembly algorithm, the 'last_fragment_' header bit only
849  // applies to the first DataSampleHeader in the TransportHeader
850  this->receive_transport_header_.last_fragment(false);
851 
852  VDBG((LM_DEBUG,"(%P|%t) DBG: "
853  "Release the sample that we just sent.\n"));
854  // ~ReceivedDataSample() releases the payload_ message block
855  }
856 
857  update_buffer_index(last_buffer);
858 
859  if (last_buffer) {
860  // Relinquish control if there is no more data to process.
861  VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n"));
862  return 0;
863  }
864 
865  } // End of while (this->pdu_remaining_ > 0)
866 
867  VDBG((LM_DEBUG,"(%P|%t) DBG: "
868  "Let's try to do some more.\n"));
869  } // End of while (true)
870 
871  VDBG((LM_DEBUG,"(%P|%t) DBG: "
872  "It looks like we are done - the done loop has finished.\n"));
873  //
874  // Relinquish control.
875  //
876  // This involves ensuring that when we reenter this method, we will
877  // pick up from where we left off correctly.
878  //
879  return 0;
880 }
881 
882 template<typename TH, typename DSH>
883 bool
885 {
886  ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
887  "WARNING: derived class must override if specific transport type uses "
888  "fragmentation and reassembly\n"));
889  return false;
890 }
891 
892 template<typename TH, typename DSH>
893 void
895 {
896  this->receive_sample_remaining_ = 0;
897  ACE_Message_Block::release(this->payload_);
898  this->payload_ = 0;
899  this->good_pdu_ = true;
900  this->pdu_remaining_ = 0;
901  for (size_t i = 0; i < receive_buffers_.size(); ++i) {
902  ACE_Message_Block& rb = *this->receive_buffers_[i];
903  rb.rd_ptr(rb.wr_ptr());
904  }
905 }
906 
907 template<typename TH, typename DSH>
908 void
910 {
911  VDBG((LM_DEBUG,"(%P|%t) DBG: "
912  "Adjust the buffer chain in case we crossed into the next "
913  "buffer after the last read(s).\n"));
914  const size_t initial = this->buffer_index_;
915  while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
916  this->buffer_index_ = this->successor_index(this->buffer_index_);
917 
918  VDBG((LM_DEBUG,"(%P|%t) DBG: "
919  "Set this->buffer_index_ = %d.\n",
920  this->buffer_index_));
921 
922  if (initial == this->buffer_index_) {
923  done = true; // no other buffers in receive_buffers_ have data
924  return;
925  }
926  }
927 }
928 
929 template<typename TH, typename DSH>
930 int
932 {
933  if (this->good_pdu_) return 1;
934 
935  //
936  // Adjust the message block chain pointers to account for the
937  // skipped data.
938  //
939  for (size_t index = this->buffer_index_;
940  this->pdu_remaining_ > 0;
941  index = this->successor_index(index)) {
942  const size_t amount =
943  ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
944 
945  this->receive_buffers_[index]->rd_ptr(amount);
946  this->pdu_remaining_ -= amount;
947 
948  if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
950  ACE_TEXT("(%P|%t) ERROR: ")
951  ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
952  ACE_TEXT(" - Unrecoverably corrupted ")
953  ACE_TEXT("receive buffer management detected: ")
954  ACE_TEXT("read more bytes than available.\n")),
955  -1);
956  }
957  }
958 
959  this->receive_sample_remaining_ = 0;
960 
961  bool done = false;
962  update_buffer_index(done);
963  return done ? 0 : 1;
964 }
965 
966 template<typename TH, typename DSH>
969 {
970  return sample.data(&mb_allocator_);
971 }
972 
973 }
974 }
975 
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
static const ACE_Time_Value max_time
size_t length(void) const
int ssize_t
char * rd_ptr(void) const
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
LM_DEBUG
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
#define VDBG(DBG_ARGS)
char ACE_TCHAR
Holds a data sample received by the transport.
virtual ACE_Message_Block * release(void)
TransportReceiveStrategy(const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const
LM_WARNING
ACE_TEXT("TCP_Factory")
size_t format_hexdump(const char *buffer, size_t size, ACE_TCHAR *obuf, size_t obuf_sz)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
#define ACE_ERROR_RETURN(X, Y)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28