00001
00002
00003
00004
00005
00006
00007
00008 #include "TransportReceiveStrategy_T.h"
00009 #include "ace/INET_Addr.h"
00010 #include "ace/Min_Max.h"
00011
00012 #if !defined (__ACE_INLINE__)
00013 #include "TransportReceiveStrategy_T.inl"
00014 #endif
00015
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017
00018 namespace OpenDDS {
00019 namespace DCPS {
00020
00021 template<typename TH, typename DSH>
00022 TransportReceiveStrategy<TH, DSH>::TransportReceiveStrategy()
00023 : gracefully_disconnected_(false),
00024 receive_sample_remaining_(0),
00025 mb_allocator_(MESSAGE_BLOCKS),
00026 db_allocator_(DATA_BLOCKS),
00027 data_allocator_(DATA_BLOCKS),
00028 buffer_index_(0),
00029 payload_(0),
00030 good_pdu_(true),
00031 pdu_remaining_(0)
00032 {
00033 DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00034
00035 if (Transport_debug_level >= 2) {
00036 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00037 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038 &mb_allocator_, MESSAGE_BLOCKS));
00039 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00040 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041 &db_allocator_, DATA_BLOCKS));
00042 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00043 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00044 &data_allocator_, DATA_BLOCKS));
00045 }
00046
00047 ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00048 }
00049
00050 template<typename TH, typename DSH>
00051 TransportReceiveStrategy<TH, DSH>::~TransportReceiveStrategy()
00052 {
00053 DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00054
00055 if (this->receive_buffers_[this->buffer_index_] != 0) {
00056 size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00057
00058 if (size > 0) {
00059 ACE_DEBUG((LM_WARNING,
00060 ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00061 ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00062 size));
00063 }
00064 }
00065 }
00066
00067 template<typename TH, typename DSH>
00068 bool
00069 TransportReceiveStrategy<TH, DSH>::check_header(const TH& )
00070 {
00071 return true;
00072 }
00073
00074 template<typename TH, typename DSH>
00075 bool
00076 TransportReceiveStrategy<TH, DSH>::check_header(const DSH& )
00077 {
00078 return true;
00079 }
00080
00081
00082
00083
00084
00085
00086
00087 template<typename TH, typename DSH>
00088 int
00089 TransportReceiveStrategy<TH, DSH>::handle_dds_input(ACE_HANDLE fd)
00090 {
00091 DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159 size_t index;
00160
00161 for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00162 if ((this->receive_buffers_[index] != 0)
00163 && (this->receive_buffers_[index]->length() == 0)
00164 && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00165 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00166 "Remove a receive_buffer_[%d] from use.\n",
00167 index));
00168
00169
00170
00171
00172 for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00173 if ((0 != this->receive_buffers_[ii]) &&
00174 (this->receive_buffers_[ii]->cont() ==
00175 this->receive_buffers_[index])) {
00176 this->receive_buffers_[ii]->cont(0);
00177 }
00178 }
00179
00180
00181
00182
00183 ACE_DES_FREE(
00184 this->receive_buffers_[index],
00185 this->mb_allocator_.free,
00186 ACE_Message_Block);
00187 this->receive_buffers_[index] = 0;
00188 }
00189 }
00190
00191
00192
00193
00194
00195
00196 size_t previous = this->buffer_index_;
00197
00198 for (index = this->buffer_index_;
00199 this->successor_index(previous) != this->buffer_index_;
00200 index = this->successor_index(index)) {
00201 if (this->receive_buffers_[index] == 0) {
00202 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00203 "Allocate a Message_Block for new receive_buffer_[%d].\n",
00204 index));
00205
00206 ACE_NEW_MALLOC_RETURN(
00207 this->receive_buffers_[index],
00208 (ACE_Message_Block*) this->mb_allocator_.malloc(
00209 sizeof(ACE_Message_Block)),
00210 ACE_Message_Block(
00211 RECEIVE_DATA_BUFFER_SIZE,
00212 ACE_Message_Block::MB_DATA,
00213 0,
00214 0,
00215 &this->data_allocator_,
00216 &this->receive_lock_,
00217 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00218 ACE_Time_Value::zero,
00219 ACE_Time_Value::max_time,
00220 &this->db_allocator_,
00221 &this->mb_allocator_
00222 ),
00223 -1);
00224 }
00225
00226
00227
00228
00229
00230 if (previous != index) {
00231 this->receive_buffers_[previous]->cont(
00232 this->receive_buffers_[index]);
00233 }
00234
00235 previous = index;
00236 }
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250
00251
00252
00253 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00254 "Form the iovec from the message block\n"));
00255
00256
00257
00258
00259 iovec iov[RECEIVE_BUFFERS];
00260 size_t vec_index = 0;
00261 size_t current = this->buffer_index_;
00262
00263 for (index = 0;
00264 index < RECEIVE_BUFFERS;
00265 ++index, current = this->successor_index(current)) {
00266
00267 if (this->receive_buffers_[current] == 0) {
00268 ACE_ERROR_RETURN((LM_ERROR,
00269 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00270 ACE_TEXT("receive buffer management detected.\n")),
00271 -1);
00272 }
00273
00274 #ifdef _MSC_VER
00275 #pragma warning(push)
00276
00277
00278 #pragma warning(disable : 4267)
00279 #endif
00280
00281
00282 if (this->receive_buffers_[current]->space() > 0) {
00283 iov[vec_index].iov_len = this->receive_buffers_[current]->space();
00284 iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00285
00286 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00287 "index==%d, len==%d, base==%x\n",
00288 vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00289
00290 vec_index++;
00291 }
00292 }
00293
00294 #ifdef _MSC_VER
00295 #pragma warning(pop)
00296 #endif
00297
00298 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00299 "Perform the recvv() call\n"));
00300
00301
00302
00303
00304 ACE_INET_Addr remote_address;
00305 ssize_t bytes_remaining = this->receive_bytes(iov,
00306 static_cast<int>(vec_index),
00307 remote_address,
00308 fd);
00309
00310 if (bytes_remaining < 0) {
00311 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00312 ACE_TEXT("with data link detected: %p.\n"),
00313 ACE_TEXT("receive_bytes")));
00314
00315
00316
00317 this->relink();
00318
00319
00320 return -1;
00321
00322 }
00323
00324 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00325 "recvv() return %d - we call this the bytes_remaining.\n",
00326 bytes_remaining), 5);
00327
00328 if (bytes_remaining == 0) {
00329 if (this->gracefully_disconnected_) {
00330 VDBG_LVL((LM_INFO,
00331 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00332 ,1);
00333 return -1;
00334
00335 } else {
00336 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ")
00337 ACE_TEXT("with data link detected\n")), 1);
00338
00339
00340
00341 this->relink();
00342
00343
00344 return -1;
00345
00346 }
00347 }
00348
00349 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00350 "START Adjust the message block chain pointers to account for the "
00351 "new data.\n"), 5);
00352
00353
00354
00355
00356
00357 size_t bytes = bytes_remaining;
00358
00359 if (!this->pdu_remaining_) {
00360 this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00361 }
00362
00363 for (index = this->buffer_index_;
00364 bytes > 0;
00365 index = this->successor_index(index)) {
00366 VDBG((LM_DEBUG,"(%P|%t) DBG: -> "
00367 "At top of for..loop block.\n"));
00368 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00369 "index == %d.\n", index));
00370 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00371 "bytes == %d.\n", bytes));
00372
00373 size_t amount
00374 = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00375
00376 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00377 "amount == %d.\n", amount));
00378
00379 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00380 "this->receive_buffers_[index]->rd_ptr() == %u.\n",
00381 this->receive_buffers_[index]->rd_ptr()));
00382 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00383 "this->receive_buffers_[index]->wr_ptr() == %u.\n",
00384 this->receive_buffers_[index]->wr_ptr()));
00385
00386 this->receive_buffers_[index]->wr_ptr(amount);
00387
00388 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00389 "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n",
00390 this->receive_buffers_[index]->wr_ptr()));
00391
00392 bytes -= amount;
00393
00394 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00395 "Now, bytes == %d .\n", bytes));
00396
00397
00398
00399 if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00400
00401 ACE_ERROR_RETURN((LM_ERROR,
00402 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00403 ACE_TEXT("receive buffer management detected: ")
00404 ACE_TEXT("read more bytes than available.\n")),
00405 -1);
00406 }
00407 }
00408
00409 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00410 "DONE Adjust the message block chain pointers to account "
00411 "for the new data.\n"));
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424
00425
00426
00427 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00428 "Start processing the data we just received.\n"));
00429
00430
00431
00432
00433 while (true) {
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453
00454
00455
00456 if (this->pdu_remaining_ == 0) {
00457 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00458 "We are expecting a transport packet header.\n"));
00459
00460 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00461 "this->buffer_index_ == %d.\n",this->buffer_index_));
00462
00463 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00464 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00465 "== %u.\n",
00466 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00467
00468 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00469 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00470 "== %u.\n",
00471 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00472
00473 if (this->receive_buffers_[this->buffer_index_]->total_length()
00474 < this->receive_transport_header_.max_marshaled_size()) {
00475
00476
00477
00478
00479
00480 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00481 "Not enough bytes read to account for a transport "
00482 "packet header. We are done here - we need to "
00483 "receive more bytes.\n"));
00484
00485 this->receive_transport_header_.incomplete(
00486 *this->receive_buffers_[this->buffer_index_]);
00487
00488 return 0;
00489
00490 } else {
00491 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00492 "We have enough bytes to demarshall the transport "
00493 "packet header.\n"));
00494
00495
00496 if (Transport_debug_level > 5) {
00497 ACE_TCHAR xbuffer[4096];
00498 const ACE_Message_Block& mb =
00499 *this->receive_buffers_[this->buffer_index_];
00500 size_t xbytes = mb.length();
00501
00502 xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00503
00504 ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00505
00506 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00507 "Hex Dump of transport header block "
00508 "(%d bytes):\n%s\n", xbytes, xbuffer));
00509 }
00510
00511
00512
00513
00514 this->receive_transport_header_ =
00515 *this->receive_buffers_[this->buffer_index_];
00516
00517
00518
00519
00520 if (!this->receive_transport_header_.valid()) {
00521 ACE_ERROR_RETURN((LM_ERROR,
00522 ACE_TEXT
00523 ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00524 -1);
00525 }
00526
00527 this->good_pdu_ = check_header(this->receive_transport_header_);
00528 this->pdu_remaining_ = this->receive_transport_header_.length_;
00529
00530 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00531 "Amount of transport packet bytes (remaining): %d.\n",
00532 this->pdu_remaining_));
00533 }
00534 }
00535
00536
00537
00538
00539
00540 {
00541 int rtn_code = skip_bad_pdus();
00542 if (rtn_code <= 0) return rtn_code;
00543 }
00544
00545
00546
00547
00548 while (this->pdu_remaining_ > 0) {
00549 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00550 "We have a transport packet now. There are more sample "
00551 "bytes to parse in order to complete the packet.\n"));
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567
00568
00569
00570 if (this->receive_sample_remaining_ == 0) {
00571 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00572 "We are not working on some remaining sample data. "
00573 "We are expecting to parse a sample header now.\n"));
00574
00575 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00576 "this->buffer_index_ == %d.\n",this->buffer_index_));
00577
00578 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00579 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00580 "== %u.\n",
00581 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00582
00583 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00584 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00585 "== %u.\n",
00586 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00587
00588 if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00589
00590
00591
00592
00593
00594 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00595 "Not enough bytes have been received to account "
00596 "for a complete sample header. We are done for "
00597 "now - we need to receive more data before we "
00598 "can go on.\n"));
00599 if (Transport_debug_level > 2) {
00600 ACE_TCHAR ebuffer[350];
00601 ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00602 const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00603 ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00604 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: "
00605 "Partial DataSampleHeader:\n%s\n", ebuffer));
00606 }
00607
00608 return 0;
00609
00610 } else {
00611 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00612 "We have received enough bytes for the sample "
00613 "header. Demarshall the sample header now.\n"));
00614
00615
00616 if (Transport_debug_level > 5) {
00617 ACE_TCHAR ebuffer[4096];
00618 ACE::format_hexdump
00619 (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00620 this->data_sample_header_.max_marshaled_size(),
00621 ebuffer, sizeof(ebuffer));
00622
00623 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00624 "Hex Dump:\n%s\n", ebuffer));
00625 }
00626
00627 this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00628
00629
00630
00631
00632 this->data_sample_header_ =
00633 *this->receive_buffers_[this->buffer_index_];
00634
00635
00636
00637
00638
00639 this->good_pdu_ = check_header(data_sample_header_);
00640
00641
00642
00643
00644
00645
00646
00647 this->receive_sample_remaining_ =
00648 this->data_sample_header_.message_length();
00649
00650 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00651 "The demarshalled sample header says that we "
00652 "have %d bytes to read for the data portion of "
00653 "the sample.\n",
00654 this->receive_sample_remaining_));
00655
00656
00657
00658
00659 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00660 "this->data_sample_header_.marshaled_size() "
00661 "== %d.\n",
00662 this->data_sample_header_.marshaled_size()));
00663
00664 this->pdu_remaining_
00665 -= this->data_sample_header_.marshaled_size();
00666
00667 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00668 "Amount of transport packet remaining: %d.\n",
00669 this->pdu_remaining_));
00670
00671 int rtn_code = skip_bad_pdus();
00672 if (rtn_code <= 0) return rtn_code;
00673 }
00674 }
00675
00676 bool last_buffer = false;
00677 update_buffer_index(last_buffer);
00678
00679 if (this->receive_sample_remaining_ > 0) {
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689
00690
00691
00692 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00693 "Determine amount of data for the next block in the chain\n"));
00694
00695 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00696 "this->buffer_index_ == %d.\n",this->buffer_index_));
00697
00698 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00699 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00700 "== %u.\n",
00701 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00702
00703 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00704 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00705 "== %u.\n",
00706 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00707
00708
00709
00710 const size_t amount = ace_min<size_t>(
00711 this->receive_sample_remaining_,
00712 this->receive_buffers_[this->buffer_index_]->length());
00713
00714 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00715 "amount of data for the next block in the chain is %d\n",
00716 amount));
00717
00718
00719
00720
00721 ACE_Message_Block* current_sample_block = 0;
00722 ACE_NEW_MALLOC_RETURN(
00723 current_sample_block,
00724 (ACE_Message_Block*) this->mb_allocator_.malloc(
00725 sizeof(ACE_Message_Block)),
00726 ACE_Message_Block(
00727 this->receive_buffers_[this->buffer_index_]
00728 ->data_block()->duplicate(),
00729 0,
00730 &this->mb_allocator_),
00731 -1);
00732
00733
00734
00735
00736 if (this->payload_ == 0) {
00737 this->payload_ = current_sample_block;
00738
00739 } else {
00740 ACE_Message_Block* block = this->payload_;
00741
00742 while (block->cont() != 0) {
00743 block = block->cont();
00744 }
00745
00746 block->cont(current_sample_block);
00747 }
00748
00749 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00750 "Before adjustment of the pointers and byte counters\n"));
00751
00752 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00753 "this->payload_->rd_ptr() "
00754 "== %u.\n",
00755 this->payload_->rd_ptr()));
00756
00757 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00758 "this->payload_->wr_ptr() "
00759 "== %u.\n",
00760 this->payload_->wr_ptr()));
00761
00762
00763
00764
00765 current_sample_block->rd_ptr(
00766 this->receive_buffers_[this->buffer_index_]->rd_ptr());
00767 current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00768 this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00769 this->receive_sample_remaining_ -= amount;
00770 this->pdu_remaining_ -= amount;
00771
00772 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00773 "After adjustment of the pointers and byte counters\n"));
00774
00775 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00776 "this->payload_->rd_ptr() "
00777 "== %u.\n",
00778 this->payload_->rd_ptr()));
00779
00780 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00781 "this->payload_->wr_ptr() "
00782 "== %u.\n",
00783 this->payload_->wr_ptr()));
00784
00785 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00786 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00787 "== %u.\n",
00788 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00789
00790 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00791 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00792 "== %u.\n",
00793 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00794
00795 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00796 "After adjustment, remaining sample bytes == %d\n",
00797 this->receive_sample_remaining_));
00798 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00799 "After adjustment, remaining transport packet bytes == %d\n",
00800 this->pdu_remaining_));
00801 }
00802
00803
00804
00805
00806
00807
00808
00809
00810
00811 if (this->receive_sample_remaining_ == 0) {
00812 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00813 "Now dispatch the sample to the DataLink\n"));
00814
00815 ReceivedDataSample rds(this->payload_);
00816 this->payload_ = 0;
00817 if (this->data_sample_header_.into_received_data_sample(rds)) {
00818
00819 if (this->data_sample_header_.more_fragments()
00820 || this->receive_transport_header_.last_fragment()) {
00821 VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
00822
00823 if (this->reassemble(rds)) {
00824 VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
00825 this->deliver_sample(rds, remote_address);
00826 }
00827
00828
00829
00830 } else {
00831 this->deliver_sample(rds, remote_address);
00832 }
00833 }
00834
00835
00836
00837 this->receive_transport_header_.last_fragment(false);
00838
00839 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00840 "Release the sample that we just sent.\n"));
00841
00842 }
00843
00844 update_buffer_index(last_buffer);
00845
00846 if (last_buffer) {
00847
00848 VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n"));
00849 return 0;
00850 }
00851
00852 }
00853
00854 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00855 "Let's try to do some more.\n"));
00856 }
00857
00858 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00859 "It looks like we are done - the done loop has finished.\n"));
00860
00861
00862
00863
00864
00865
00866 return 0;
00867 }
00868
00869 template<typename TH, typename DSH>
00870 bool
00871 TransportReceiveStrategy<TH, DSH>::reassemble(ReceivedDataSample&)
00872 {
00873 ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00874 "WARNING: derived class must override if specific transport type uses "
00875 "fragmentation and reassembly\n"));
00876 return false;
00877 }
00878
00879 template<typename TH, typename DSH>
00880 void
00881 TransportReceiveStrategy<TH, DSH>::reset()
00882 {
00883 this->receive_sample_remaining_ = 0;
00884 ACE_Message_Block::release(this->payload_);
00885 this->payload_ = 0;
00886 this->good_pdu_ = true;
00887 this->pdu_remaining_ = 0;
00888 for (int i = 0; i < RECEIVE_BUFFERS; ++i) {
00889 ACE_Message_Block& rb = *this->receive_buffers_[i];
00890 rb.rd_ptr(rb.wr_ptr());
00891 }
00892 }
00893
00894 template<typename TH, typename DSH>
00895 void
00896 TransportReceiveStrategy<TH, DSH>::update_buffer_index(bool& done)
00897 {
00898 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00899 "Adjust the buffer chain in case we crossed into the next "
00900 "buffer after the last read(s).\n"));
00901 const size_t initial = this->buffer_index_;
00902 while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00903 this->buffer_index_ = this->successor_index(this->buffer_index_);
00904
00905 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00906 "Set this->buffer_index_ = %d.\n",
00907 this->buffer_index_));
00908
00909 if (initial == this->buffer_index_) {
00910 done = true;
00911 return;
00912 }
00913 }
00914 }
00915
00916 template<typename TH, typename DSH>
00917 int
00918 TransportReceiveStrategy<TH, DSH>::skip_bad_pdus()
00919 {
00920 if (this->good_pdu_) return 1;
00921
00922
00923
00924
00925
00926 for (size_t index = this->buffer_index_;
00927 this->pdu_remaining_ > 0;
00928 index = this->successor_index(index)) {
00929 size_t amount =
00930 ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00931
00932 this->receive_buffers_[index]->rd_ptr(amount);
00933 this->pdu_remaining_ -= amount;
00934
00935 if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00936 ACE_ERROR_RETURN((LM_ERROR,
00937 ACE_TEXT("(%P|%t) ERROR: ")
00938 ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00939 ACE_TEXT(" - Unrecoverably corrupted ")
00940 ACE_TEXT("receive buffer management detected: ")
00941 ACE_TEXT("read more bytes than available.\n")),
00942 -1);
00943 }
00944 }
00945
00946 this->receive_sample_remaining_ = 0;
00947
00948 this->receive_sample_remaining_ = 0;
00949
00950 bool done = false;
00951 update_buffer_index(done);
00952 return done ? 0 : 1;
00953 }
00954
00955 }
00956 }
00957
00958 OPENDDS_END_VERSIONED_NAMESPACE_DECL