00001
00002
00003
00004
00005
00006
00007
00008 #include "TransportReceiveStrategy_T.h"
00009 #include "ace/INET_Addr.h"
00010
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReceiveStrategy_T.inl"
00013 #endif
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 template<typename TH, typename DSH>
00019 TransportReceiveStrategy<TH, DSH>::TransportReceiveStrategy()
00020 : gracefully_disconnected_(false),
00021 receive_sample_remaining_(0),
00022 mb_allocator_(MESSAGE_BLOCKS),
00023 db_allocator_(DATA_BLOCKS),
00024 data_allocator_(DATA_BLOCKS),
00025 buffer_index_(0),
00026 payload_(0),
00027 good_pdu_(true),
00028 pdu_remaining_(0)
00029 {
00030 DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00031
00032 if (Transport_debug_level >= 2) {
00033 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00034 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00035 &mb_allocator_, MESSAGE_BLOCKS));
00036 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00037 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038 &db_allocator_, DATA_BLOCKS));
00039 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00040 " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041 &data_allocator_, DATA_BLOCKS));
00042 }
00043
00044 ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00045 }
00046
00047 template<typename TH, typename DSH>
00048 TransportReceiveStrategy<TH, DSH>::~TransportReceiveStrategy()
00049 {
00050 DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00051
00052 if (this->receive_buffers_[this->buffer_index_] != 0) {
00053 size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00054
00055 if (size > 0) {
00056 ACE_DEBUG((LM_WARNING,
00057 ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00058 ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00059 size));
00060 }
00061 }
00062 }
00063
00064 template<typename TH, typename DSH>
00065 bool
00066 TransportReceiveStrategy<TH, DSH>::check_header(const TH& )
00067 {
00068 return true;
00069 }
00070
00071 template<typename TH, typename DSH>
00072 bool
00073 TransportReceiveStrategy<TH, DSH>::check_header(const DSH& )
00074 {
00075 return true;
00076 }
00077
00078
00079
00080
00081
00082
00083
00084 template<typename TH, typename DSH>
00085 int
00086 TransportReceiveStrategy<TH, DSH>::handle_dds_input(ACE_HANDLE fd)
00087 {
00088 DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00089
00090
00091
00092
00093
00094
00095
00096
00097
00098
00099
00100
00101
00102
00103
00104
00105
00106
00107
00108
00109
00110
00111
00112
00113
00114
00115
00116
00117
00118
00119
00120
00121
00122
00123
00124
00125
00126
00127
00128
00129
00130
00131
00132
00133
00134
00135
00136
00137
00138
00139
00140
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156 size_t index;
00157
00158 for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00159 if ((this->receive_buffers_[index] != 0)
00160 && (this->receive_buffers_[index]->length() == 0)
00161 && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00162 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00163 "Remove a receive_buffer_[%d] from use.\n",
00164 index));
00165
00166
00167
00168
00169 for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00170 if ((0 != this->receive_buffers_[ii]) &&
00171 (this->receive_buffers_[ii]->cont() ==
00172 this->receive_buffers_[index])) {
00173 this->receive_buffers_[ii]->cont(0);
00174 }
00175 }
00176
00177
00178
00179
00180 ACE_DES_FREE(
00181 this->receive_buffers_[index],
00182 this->mb_allocator_.free,
00183 ACE_Message_Block);
00184 this->receive_buffers_[index] = 0;
00185 }
00186 }
00187
00188
00189
00190
00191
00192
00193 size_t previous = this->buffer_index_;
00194
00195 for (index = this->buffer_index_;
00196 this->successor_index(previous) != this->buffer_index_;
00197 index = this->successor_index(index)) {
00198 if (this->receive_buffers_[index] == 0) {
00199 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00200 "Allocate a Message_Block for new receive_buffer_[%d].\n",
00201 index));
00202
00203 ACE_NEW_MALLOC_RETURN(
00204 this->receive_buffers_[index],
00205 (ACE_Message_Block*) this->mb_allocator_.malloc(
00206 sizeof(ACE_Message_Block)),
00207 ACE_Message_Block(
00208 RECEIVE_DATA_BUFFER_SIZE,
00209 ACE_Message_Block::MB_DATA,
00210 0,
00211 0,
00212 &this->data_allocator_,
00213 &this->receive_lock_,
00214 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00215 ACE_Time_Value::zero,
00216 ACE_Time_Value::max_time,
00217 &this->db_allocator_,
00218 &this->mb_allocator_
00219 ),
00220 -1);
00221 }
00222
00223
00224
00225
00226
00227 if (previous != index) {
00228 this->receive_buffers_[previous]->cont(
00229 this->receive_buffers_[index]);
00230 }
00231
00232 previous = index;
00233 }
00234
00235
00236
00237
00238
00239
00240
00241
00242
00243
00244
00245
00246
00247
00248
00249
00250 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00251 "Form the iovec from the message block\n"));
00252
00253
00254
00255
00256 iovec iov[RECEIVE_BUFFERS];
00257 size_t vec_index = 0;
00258 size_t current = this->buffer_index_;
00259
00260 for (index = 0;
00261 index < RECEIVE_BUFFERS;
00262 ++index, current = this->successor_index(current)) {
00263
00264 if (this->receive_buffers_[current] == 0) {
00265 ACE_ERROR_RETURN((LM_ERROR,
00266 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00267 ACE_TEXT("receive buffer management detected.\n")),
00268 -1);
00269 }
00270
00271 #ifdef _MSC_VER
00272 #pragma warning(push)
00273
00274
00275 #pragma warning(disable : 4267)
00276 #endif
00277
00278
00279 if (this->receive_buffers_[current]->space() > 0) {
00280 iov[vec_index].iov_len = this->receive_buffers_[current]->space();
00281 iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00282
00283 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00284 "index==%d, len==%d, base==%x\n",
00285 vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00286
00287 vec_index++;
00288 }
00289 }
00290
00291 #ifdef _MSC_VER
00292 #pragma warning(pop)
00293 #endif
00294
00295 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00296 "Perform the recvv() call\n"));
00297
00298
00299
00300
00301 ACE_INET_Addr remote_address;
00302 ssize_t bytes_remaining = this->receive_bytes(iov,
00303 static_cast<int>(vec_index),
00304 remote_address,
00305 fd);
00306
00307 if (bytes_remaining < 0) {
00308 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00309 ACE_TEXT("with data link detected: %p.\n"),
00310 ACE_TEXT("receive_bytes")));
00311
00312
00313
00314 this->relink();
00315
00316
00317 return -1;
00318
00319 }
00320
00321 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00322 "recvv() return %d - we call this the bytes_remaining.\n",
00323 bytes_remaining), 5);
00324
00325 if (bytes_remaining == 0) {
00326 if (this->gracefully_disconnected_) {
00327 VDBG_LVL((LM_INFO,
00328 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00329 ,1);
00330 return -1;
00331
00332 } else {
00333 VDBG_LVL((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Unrecoverable problem ")
00334 ACE_TEXT("with data link detected\n")), 1);
00335
00336
00337
00338 this->relink();
00339
00340
00341 return -1;
00342
00343 }
00344 }
00345
00346 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
00347 "START Adjust the message block chain pointers to account for the "
00348 "new data.\n"), 5);
00349
00350
00351
00352
00353
00354 size_t bytes = bytes_remaining;
00355
00356 if (!this->pdu_remaining_) {
00357 this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00358 }
00359
00360 for (index = this->buffer_index_;
00361 bytes > 0;
00362 index = this->successor_index(index)) {
00363 VDBG((LM_DEBUG,"(%P|%t) DBG: -> "
00364 "At top of for..loop block.\n"));
00365 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00366 "index == %d.\n", index));
00367 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00368 "bytes == %d.\n", bytes));
00369
00370 size_t amount
00371 = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00372
00373 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00374 "amount == %d.\n", amount));
00375
00376 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00377 "this->receive_buffers_[index]->rd_ptr() == %u.\n",
00378 this->receive_buffers_[index]->rd_ptr()));
00379 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00380 "this->receive_buffers_[index]->wr_ptr() == %u.\n",
00381 this->receive_buffers_[index]->wr_ptr()));
00382
00383 this->receive_buffers_[index]->wr_ptr(amount);
00384
00385 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00386 "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n",
00387 this->receive_buffers_[index]->wr_ptr()));
00388
00389 bytes -= amount;
00390
00391 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00392 "Now, bytes == %d .\n", bytes));
00393
00394
00395
00396 if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00397
00398 ACE_ERROR_RETURN((LM_ERROR,
00399 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00400 ACE_TEXT("receive buffer management detected: ")
00401 ACE_TEXT("read more bytes than available.\n")),
00402 -1);
00403 }
00404 }
00405
00406 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00407 "DONE Adjust the message block chain pointers to account "
00408 "for the new data.\n"));
00409
00410
00411
00412
00413
00414
00415
00416
00417
00418
00419
00420
00421
00422
00423
00424 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00425 "Start processing the data we just received.\n"));
00426
00427
00428
00429
00430 while (true) {
00431
00432
00433
00434
00435
00436
00437
00438
00439
00440
00441
00442
00443
00444
00445
00446
00447
00448
00449
00450
00451
00452
00453 if (this->pdu_remaining_ == 0) {
00454 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00455 "We are expecting a transport packet header.\n"));
00456
00457 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00458 "this->buffer_index_ == %d.\n",this->buffer_index_));
00459
00460 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00461 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00462 "== %u.\n",
00463 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00464
00465 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00466 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00467 "== %u.\n",
00468 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00469
00470 if (this->receive_buffers_[this->buffer_index_]->total_length()
00471 < this->receive_transport_header_.max_marshaled_size()) {
00472
00473
00474
00475
00476
00477 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00478 "Not enough bytes read to account for a transport "
00479 "packet header. We are done here - we need to "
00480 "receive more bytes.\n"));
00481
00482 this->receive_transport_header_.incomplete(
00483 *this->receive_buffers_[this->buffer_index_]);
00484
00485 return 0;
00486
00487 } else {
00488 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00489 "We have enough bytes to demarshall the transport "
00490 "packet header.\n"));
00491
00492
00493 if (Transport_debug_level > 5) {
00494 ACE_TCHAR xbuffer[4096];
00495 const ACE_Message_Block& mb =
00496 *this->receive_buffers_[this->buffer_index_];
00497 size_t xbytes = mb.length();
00498
00499 xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00500
00501 ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00502
00503 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00504 "Hex Dump of transport header block "
00505 "(%d bytes):\n%s\n", xbytes, xbuffer));
00506 }
00507
00508
00509
00510
00511 this->receive_transport_header_ =
00512 *this->receive_buffers_[this->buffer_index_];
00513
00514
00515
00516
00517 if (!this->receive_transport_header_.valid()) {
00518 ACE_ERROR_RETURN((LM_ERROR,
00519 ACE_TEXT
00520 ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00521 -1);
00522 }
00523
00524 this->good_pdu_ = check_header(this->receive_transport_header_);
00525 this->pdu_remaining_ = this->receive_transport_header_.length_;
00526
00527 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00528 "Amount of transport packet bytes (remaining): %d.\n",
00529 this->pdu_remaining_));
00530 }
00531 }
00532
00533
00534
00535
00536
00537 {
00538 int rtn_code = skip_bad_pdus();
00539 if (rtn_code <= 0) return rtn_code;
00540 }
00541
00542
00543
00544
00545 while (this->pdu_remaining_ > 0) {
00546 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00547 "We have a transport packet now. There are more sample "
00548 "bytes to parse in order to complete the packet.\n"));
00549
00550
00551
00552
00553
00554
00555
00556
00557
00558
00559
00560
00561
00562
00563
00564
00565
00566
00567 if (this->receive_sample_remaining_ == 0) {
00568 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00569 "We are not working on some remaining sample data. "
00570 "We are expecting to parse a sample header now.\n"));
00571
00572 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00573 "this->buffer_index_ == %d.\n",this->buffer_index_));
00574
00575 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00576 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00577 "== %u.\n",
00578 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00579
00580 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00581 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00582 "== %u.\n",
00583 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00584
00585 if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00586
00587
00588
00589
00590
00591 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00592 "Not enough bytes have been received to account "
00593 "for a complete sample header. We are done for "
00594 "now - we need to receive more data before we "
00595 "can go on.\n"));
00596 if (Transport_debug_level > 2) {
00597 ACE_TCHAR ebuffer[350];
00598 ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00599 const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00600 ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00601 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: "
00602 "Partial DataSampleHeader:\n%s\n", ebuffer));
00603 }
00604
00605 return 0;
00606
00607 } else {
00608 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00609 "We have received enough bytes for the sample "
00610 "header. Demarshall the sample header now.\n"));
00611
00612
00613 if (Transport_debug_level > 5) {
00614 ACE_TCHAR ebuffer[4096];
00615 ACE::format_hexdump
00616 (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00617 this->data_sample_header_.max_marshaled_size(),
00618 ebuffer, sizeof(ebuffer));
00619
00620 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00621 "Hex Dump:\n%s\n", ebuffer));
00622 }
00623
00624 this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00625
00626
00627
00628
00629 this->data_sample_header_ =
00630 *this->receive_buffers_[this->buffer_index_];
00631
00632
00633
00634
00635
00636 this->good_pdu_ = check_header(data_sample_header_);
00637
00638
00639
00640
00641
00642
00643
00644 this->receive_sample_remaining_ =
00645 this->data_sample_header_.message_length();
00646
00647 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00648 "The demarshalled sample header says that we "
00649 "have %d bytes to read for the data portion of "
00650 "the sample.\n",
00651 this->receive_sample_remaining_));
00652
00653
00654
00655
00656 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00657 "this->data_sample_header_.marshaled_size() "
00658 "== %d.\n",
00659 this->data_sample_header_.marshaled_size()));
00660
00661 this->pdu_remaining_
00662 -= this->data_sample_header_.marshaled_size();
00663
00664 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00665 "Amount of transport packet remaining: %d.\n",
00666 this->pdu_remaining_));
00667
00668 int rtn_code = skip_bad_pdus();
00669 if (rtn_code <= 0) return rtn_code;
00670 }
00671 }
00672
00673 bool last_buffer = false;
00674 update_buffer_index(last_buffer);
00675
00676 if (this->receive_sample_remaining_ > 0) {
00677
00678
00679
00680
00681
00682
00683
00684
00685
00686
00687
00688
00689 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00690 "Determine amount of data for the next block in the chain\n"));
00691
00692 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00693 "this->buffer_index_ == %d.\n",this->buffer_index_));
00694
00695 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00696 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00697 "== %u.\n",
00698 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00699
00700 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00701 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00702 "== %u.\n",
00703 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00704
00705
00706
00707 const size_t amount = ace_min<size_t>(
00708 this->receive_sample_remaining_,
00709 this->receive_buffers_[this->buffer_index_]->length());
00710
00711 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00712 "amount of data for the next block in the chain is %d\n",
00713 amount));
00714
00715
00716
00717
00718 ACE_Message_Block* current_sample_block = 0;
00719 ACE_NEW_MALLOC_RETURN(
00720 current_sample_block,
00721 (ACE_Message_Block*) this->mb_allocator_.malloc(
00722 sizeof(ACE_Message_Block)),
00723 ACE_Message_Block(
00724 this->receive_buffers_[this->buffer_index_]
00725 ->data_block()->duplicate(),
00726 0,
00727 &this->mb_allocator_),
00728 -1);
00729
00730
00731
00732
00733 if (this->payload_ == 0) {
00734 this->payload_ = current_sample_block;
00735
00736 } else {
00737 ACE_Message_Block* block = this->payload_;
00738
00739 while (block->cont() != 0) {
00740 block = block->cont();
00741 }
00742
00743 block->cont(current_sample_block);
00744 }
00745
00746 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00747 "Before adjustment of the pointers and byte counters\n"));
00748
00749 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00750 "this->payload_->rd_ptr() "
00751 "== %u.\n",
00752 this->payload_->rd_ptr()));
00753
00754 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00755 "this->payload_->wr_ptr() "
00756 "== %u.\n",
00757 this->payload_->wr_ptr()));
00758
00759
00760
00761
00762 current_sample_block->rd_ptr(
00763 this->receive_buffers_[this->buffer_index_]->rd_ptr());
00764 current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00765 this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00766 this->receive_sample_remaining_ -= amount;
00767 this->pdu_remaining_ -= amount;
00768
00769 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00770 "After adjustment of the pointers and byte counters\n"));
00771
00772 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00773 "this->payload_->rd_ptr() "
00774 "== %u.\n",
00775 this->payload_->rd_ptr()));
00776
00777 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00778 "this->payload_->wr_ptr() "
00779 "== %u.\n",
00780 this->payload_->wr_ptr()));
00781
00782 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00783 "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00784 "== %u.\n",
00785 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00786
00787 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00788 "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00789 "== %u.\n",
00790 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00791
00792 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00793 "After adjustment, remaining sample bytes == %d\n",
00794 this->receive_sample_remaining_));
00795 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00796 "After adjustment, remaining transport packet bytes == %d\n",
00797 this->pdu_remaining_));
00798 }
00799
00800
00801
00802
00803
00804
00805
00806
00807
00808 if (this->receive_sample_remaining_ == 0) {
00809 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00810 "Now dispatch the sample to the DataLink\n"));
00811
00812 ReceivedDataSample rds(this->payload_);
00813 this->payload_ = 0;
00814 if (this->data_sample_header_.into_received_data_sample(rds)) {
00815
00816 if (this->data_sample_header_.more_fragments()
00817 || this->receive_transport_header_.last_fragment()) {
00818 VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n"));
00819
00820 if (this->reassemble(rds)) {
00821 VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n"));
00822 this->deliver_sample(rds, remote_address);
00823 }
00824
00825
00826
00827 } else {
00828 this->deliver_sample(rds, remote_address);
00829 }
00830 }
00831
00832
00833
00834 this->receive_transport_header_.last_fragment(false);
00835
00836 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00837 "Release the sample that we just sent.\n"));
00838
00839 }
00840
00841 update_buffer_index(last_buffer);
00842
00843 if (last_buffer) {
00844
00845 VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n"));
00846 return 0;
00847 }
00848
00849 }
00850
00851 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00852 "Let's try to do some more.\n"));
00853 }
00854
00855 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00856 "It looks like we are done - the done loop has finished.\n"));
00857
00858
00859
00860
00861
00862
00863 return 0;
00864 }
00865
00866 template<typename TH, typename DSH>
00867 bool
00868 TransportReceiveStrategy<TH, DSH>::reassemble(ReceivedDataSample&)
00869 {
00870 ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00871 "WARNING: derived class must override if specific transport type uses "
00872 "fragmentation and reassembly\n"));
00873 return false;
00874 }
00875
00876 template<typename TH, typename DSH>
00877 void
00878 TransportReceiveStrategy<TH, DSH>::reset()
00879 {
00880 this->receive_sample_remaining_ = 0;
00881 ACE_Message_Block::release(this->payload_);
00882 this->payload_ = 0;
00883 this->good_pdu_ = true;
00884 this->pdu_remaining_ = 0;
00885 for (int i = 0; i < RECEIVE_BUFFERS; ++i) {
00886 ACE_Message_Block& rb = *this->receive_buffers_[i];
00887 rb.rd_ptr(rb.wr_ptr());
00888 }
00889 }
00890
00891 template<typename TH, typename DSH>
00892 void
00893 TransportReceiveStrategy<TH, DSH>::update_buffer_index(bool& done)
00894 {
00895 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00896 "Adjust the buffer chain in case we crossed into the next "
00897 "buffer after the last read(s).\n"));
00898 const size_t initial = this->buffer_index_;
00899 while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00900 this->buffer_index_ = this->successor_index(this->buffer_index_);
00901
00902 VDBG((LM_DEBUG,"(%P|%t) DBG: "
00903 "Set this->buffer_index_ = %d.\n",
00904 this->buffer_index_));
00905
00906 if (initial == this->buffer_index_) {
00907 done = true;
00908 return;
00909 }
00910 }
00911 }
00912
00913 template<typename TH, typename DSH>
00914 int
00915 TransportReceiveStrategy<TH, DSH>::skip_bad_pdus()
00916 {
00917 if (this->good_pdu_) return 1;
00918
00919
00920
00921
00922
00923 for (size_t index = this->buffer_index_;
00924 this->pdu_remaining_ > 0;
00925 index = this->successor_index(index)) {
00926 size_t amount =
00927 ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00928
00929 this->receive_buffers_[index]->rd_ptr(amount);
00930 this->pdu_remaining_ -= amount;
00931
00932 if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00933 ACE_ERROR_RETURN((LM_ERROR,
00934 ACE_TEXT("(%P|%t) ERROR: ")
00935 ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00936 ACE_TEXT(" - Unrecoverably corrupted ")
00937 ACE_TEXT("receive buffer management detected: ")
00938 ACE_TEXT("read more bytes than available.\n")),
00939 -1);
00940 }
00941 }
00942
00943 this->receive_sample_remaining_ = 0;
00944
00945 this->receive_sample_remaining_ = 0;
00946
00947 bool done = false;
00948 update_buffer_index(done);
00949 return done ? 0 : 1;
00950 }
00951
00952 }
00953 }