15 #if !defined (__ACE_INLINE__) 24 template<
typename TH,
typename DSH>
26 size_t receive_buffers_count)
27 : gracefully_disconnected_(false),
28 receive_sample_remaining_(0),
29 mb_allocator_((config && config->receive_preallocated_message_blocks_) ? config->receive_preallocated_message_blocks_ : MESSAGE_BLOCKS),
30 db_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : DATA_BLOCKS),
31 data_allocator_((config && config->receive_preallocated_data_blocks_) ? config->receive_preallocated_data_blocks_ : receive_buffers_count * 2),
32 receive_buffers_(receive_buffers_count, 0),
38 DBG_ENTRY_LVL(
"TransportReceiveStrategy",
"TransportReceiveStrategy" ,6);
42 " Cached_Allocator_With_Overflow %@ with %B chunks\n",
43 &mb_allocator_, mb_allocator_.n_chunks()));
45 " Cached_Allocator_With_Overflow %@ with %B chunks\n",
46 &db_allocator_, db_allocator_.n_chunks()));
48 " Cached_Allocator_With_Overflow %@ with %B chunks\n",
49 &data_allocator_, data_allocator_.n_chunks()));
53 template<
typename TH,
typename DSH>
56 DBG_ENTRY_LVL(
"TransportReceiveStrategy",
"~TransportReceiveStrategy",6);
58 if (this->receive_buffers_[this->buffer_index_] != 0) {
59 size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
63 ACE_TEXT(
"(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
64 ACE_TEXT(
"terminating with %d unprocessed bytes.\n"),
69 for (
size_t index = 0; index < receive_buffers_.size(); ++index) {
70 if (receive_buffers_[index] != 0) {
72 receive_buffers_[index],
79 template<
typename TH,
typename DSH>
86 template<
typename TH,
typename DSH>
99 template<
typename TH,
typename DSH>
103 DBG_ENTRY_LVL(
"TransportReceiveStrategy",
"handle_dds_input", 6);
173 for (index = 0; index < receive_buffers_.size(); ++index) {
174 if ((this->receive_buffers_[index] != 0)
175 && (this->receive_buffers_[index]->length() == 0)
176 && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
178 "Remove a receive_buffer_[%d] from use.\n",
184 for (
size_t ii =0; ii < receive_buffers_.size(); ii++) {
185 if ((0 != this->receive_buffers_[ii]) &&
186 (this->receive_buffers_[ii]->cont() ==
187 this->receive_buffers_[index])) {
188 this->receive_buffers_[ii]->cont(0);
196 this->receive_buffers_[index],
197 this->mb_allocator_.free,
199 this->receive_buffers_[index] = 0;
208 size_t previous = this->buffer_index_;
210 for (index = this->buffer_index_;
211 this->successor_index(previous) != this->buffer_index_;
212 index = this->successor_index(index)) {
213 if (this->receive_buffers_[index] == 0) {
215 "Allocate a Message_Block for new receive_buffer_[%d].\n",
219 this->receive_buffers_[index],
227 &this->data_allocator_,
228 &this->receive_lock_,
232 &this->db_allocator_,
242 if (previous != index) {
243 this->receive_buffers_[previous]->cont(
244 this->receive_buffers_[index]);
266 "Form the iovec from the message block\n"));
271 iovec iov[RECEIVE_BUFFERS];
272 size_t vec_index = 0;
273 size_t current = this->buffer_index_;
276 index < receive_buffers_.size();
277 ++index, current = this->successor_index(current)) {
279 if (this->receive_buffers_[current] == 0) {
281 ACE_TEXT(
"(%P|%t) ERROR: Unrecoverably corrupted ")
282 ACE_TEXT(
"receive buffer management detected.\n")),
287 #pragma warning(push) 290 #pragma warning(disable : 4267) 294 if (this->receive_buffers_[current]->space() > 0) {
295 iov[vec_index].iov_len = this->receive_buffers_[current]->space();
296 iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
299 "index==%d, len==%d, base==%x\n",
300 vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
311 "Perform the recvv() call\n"));
318 const ssize_t bytes_remaining = this->receive_bytes(iov,
319 static_cast<int>(vec_index),
328 if (bytes_remaining < 0) {
330 ACE_TEXT(
"with data link detected: %p.\n"),
343 "recvv() return %d - we call this the bytes_remaining.\n",
344 bytes_remaining), 5);
346 if (bytes_remaining == 0) {
347 if (this->gracefully_disconnected_) {
353 ACE_TEXT(
"with data link detected\n")), 1);
366 "START Adjust the message block chain pointers to account for the " 373 size_t bytes = bytes_remaining;
375 if (!this->pdu_remaining_) {
376 this->receive_transport_header_.length_ =
static_cast<ACE_UINT32
>(bytes);
379 for (index = this->buffer_index_;
381 index = this->successor_index(index)) {
383 "At top of for..loop block.\n"));
385 const size_t amount = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
388 "index == %d bytes == %d amount == %d.\n",
389 index, bytes, amount));
392 "this->receive_buffers_[index]->rd_ptr() == %u.\n",
393 this->receive_buffers_[index]->rd_ptr()));
395 "this->receive_buffers_[index]->wr_ptr() == %u.\n",
396 this->receive_buffers_[index]->wr_ptr()));
398 this->receive_buffers_[index]->wr_ptr(amount);
401 "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n",
402 this->receive_buffers_[index]->wr_ptr()));
407 "Now, bytes == %d .\n", bytes));
411 if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
414 ACE_TEXT(
"(%P|%t) ERROR: Unrecoverably corrupted ")
415 ACE_TEXT(
"receive buffer management detected: ")
416 ACE_TEXT(
"read more bytes than available.\n")),
422 "DONE Adjust the message block chain pointers to account " 423 "for the new data.\n"));
440 "Start processing the data we just received.\n"));
468 if (this->pdu_remaining_ == 0) {
470 "We are expecting a transport packet header.\n"));
473 "this->buffer_index_ == %d.\n",this->buffer_index_));
476 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 478 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
481 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 483 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
485 if (this->receive_buffers_[this->buffer_index_]->total_length()
486 < this->receive_transport_header_.get_max_serialized_size()) {
493 "Not enough bytes read to account for a transport " 494 "packet header. We are done here - we need to " 495 "receive more bytes.\n"));
497 this->receive_transport_header_.incomplete(
498 *this->receive_buffers_[this->buffer_index_]);
504 "We have enough bytes to demarshall the transport " 505 "packet header.\n"));
511 *this->receive_buffers_[this->buffer_index_];
512 size_t xbytes = mb.
length();
514 xbytes = (std::min)(xbytes, TH::get_max_serialized_size());
519 "Hex Dump of transport header block " 520 "(%d bytes):\n%s", xbytes, xbuffer));
526 this->receive_transport_header_ =
527 *this->receive_buffers_[this->buffer_index_];
532 if (!this->receive_transport_header_.valid()) {
535 (
"(%P|%t) ERROR: TransportHeader invalid.\n")),
539 this->good_pdu_ = check_header(this->receive_transport_header_);
540 this->pdu_remaining_ = this->receive_transport_header_.length_;
543 "Amount of transport packet bytes (remaining): %d.\n",
544 this->pdu_remaining_));
549 bool last_buffer =
false;
550 update_buffer_index(last_buffer);
558 int rtn_code = skip_bad_pdus();
559 if (rtn_code <= 0)
return rtn_code;
565 while (this->pdu_remaining_ > 0) {
567 "We have a transport packet now. There are more sample " 568 "bytes to parse in order to complete the packet.\n"));
587 if (this->receive_sample_remaining_ == 0) {
589 "We are not working on some remaining sample data. " 590 "We are expecting to parse a sample header now.\n"));
593 "this->buffer_index_ == %d.\n",this->buffer_index_));
596 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 598 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
601 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 603 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
605 if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
612 "Not enough bytes have been received to account " 613 "for a complete sample header. We are done for " 614 "now - we need to receive more data before we " 619 const size_t sz = (std::min)(DSH::get_max_serialized_size(), mb.
length());
622 "Partial DataSampleHeader:\n%s\n", ebuffer));
629 "We have received enough bytes for the sample " 630 "header. Demarshall the sample header now.\n"));
636 this->receive_buffers_[this->buffer_index_]->rd_ptr(),
637 this->data_sample_header_.get_max_serialized_size(),
638 ebuffer,
sizeof(ebuffer));
641 "Hex Dump:\n%s", ebuffer));
644 this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
649 this->data_sample_header_ =
650 *this->receive_buffers_[this->buffer_index_];
655 this->good_pdu_ = check_header(data_sample_header_);
663 this->receive_sample_remaining_ =
664 this->data_sample_header_.message_length();
667 "The demarshalled sample header says that we " 668 "have %d bytes to read for the data portion of " 670 this->receive_sample_remaining_));
675 const size_t header_size =
676 this->data_sample_header_.get_serialized_size();
678 "this->data_sample_header_.get_serialized_size() == %d.\n",
681 this->pdu_remaining_ -= header_size;
684 "Amount of transport packet remaining: %d.\n",
685 this->pdu_remaining_));
687 const int rtn_code = skip_bad_pdus();
688 if (rtn_code <= 0)
return rtn_code;
692 bool last_buffer =
false;
693 update_buffer_index(last_buffer);
695 if (this->receive_sample_remaining_ > 0) {
709 "Determine amount of data for the next block in the chain\n"));
712 "this->buffer_index_ == %d.\n",this->buffer_index_));
715 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 717 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
720 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 722 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
726 const size_t amount = ace_min<size_t>(
727 this->receive_sample_remaining_,
728 this->receive_buffers_[this->buffer_index_]->length());
731 "amount of data for the next block in the chain is %d\n",
739 current_sample_block,
743 this->receive_buffers_[this->buffer_index_]
744 ->data_block()->duplicate(),
746 &this->mb_allocator_),
752 if (this->payload_ == 0) {
753 this->payload_ = current_sample_block;
758 while (block->
cont() != 0) {
759 block = block->
cont();
762 block->
cont(current_sample_block);
766 "Before adjustment of the pointers and byte counters\n"));
769 "this->payload_->rd_ptr() " 771 "this->payload_->wr_ptr() " 773 this->payload_->rd_ptr(),
774 this->payload_->wr_ptr()));
779 current_sample_block->
rd_ptr(
780 this->receive_buffers_[this->buffer_index_]->rd_ptr());
781 current_sample_block->
wr_ptr(current_sample_block->
rd_ptr() + amount);
782 this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
783 this->receive_sample_remaining_ -= amount;
784 this->pdu_remaining_ -= amount;
787 "After adjustment of the pointers and byte counters\n"));
790 "this->payload_->rd_ptr() " 792 "this->payload_->wr_ptr() " 794 this->payload_->rd_ptr(),
795 this->payload_->wr_ptr()));
798 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 800 this->receive_buffers_[this->buffer_index_]->rd_ptr()));
803 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 805 this->receive_buffers_[this->buffer_index_]->wr_ptr()));
808 "After adjustment, remaining sample bytes == %d and remaining transport packet bytes == %d\n",
809 this->receive_sample_remaining_,
810 this->pdu_remaining_));
821 if (this->receive_sample_remaining_ == 0) {
823 "Now dispatch the sample to the DataLink\n"));
830 if (this->data_sample_header_.into_received_data_sample(rds)) {
832 if (this->data_sample_header_.more_fragments()
833 || this->receive_transport_header_.last_fragment()) {
834 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Attempt reassembly of fragments\n"));
836 if (this->reassemble(rds)) {
837 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Reassembled complete message\n"));
838 this->deliver_sample(rds, remote_address);
844 this->deliver_sample(rds, remote_address);
850 this->receive_transport_header_.last_fragment(
false);
853 "Release the sample that we just sent.\n"));
857 update_buffer_index(last_buffer);
861 VDBG((
LM_DEBUG,
"(%P|%t) DBG: We are done - no more data.\n"));
868 "Let's try to do some more.\n"));
872 "It looks like we are done - the done loop has finished.\n"));
882 template<
typename TH,
typename DSH>
887 "WARNING: derived class must override if specific transport type uses " 888 "fragmentation and reassembly\n"));
892 template<
typename TH,
typename DSH>
896 this->receive_sample_remaining_ = 0;
899 this->good_pdu_ =
true;
900 this->pdu_remaining_ = 0;
901 for (
size_t i = 0; i < receive_buffers_.size(); ++i) {
907 template<
typename TH,
typename DSH>
912 "Adjust the buffer chain in case we crossed into the next " 913 "buffer after the last read(s).\n"));
914 const size_t initial = this->buffer_index_;
915 while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
916 this->buffer_index_ = this->successor_index(this->buffer_index_);
919 "Set this->buffer_index_ = %d.\n",
920 this->buffer_index_));
922 if (initial == this->buffer_index_) {
929 template<
typename TH,
typename DSH>
933 if (this->good_pdu_)
return 1;
939 for (
size_t index = this->buffer_index_;
940 this->pdu_remaining_ > 0;
941 index = this->successor_index(index)) {
942 const size_t amount =
943 ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
945 this->receive_buffers_[index]->rd_ptr(amount);
946 this->pdu_remaining_ -= amount;
948 if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
951 ACE_TEXT(
"TransportReceiveStrategy::skip_bad_pdus()")
952 ACE_TEXT(
" - Unrecoverably corrupted ")
953 ACE_TEXT(
"receive buffer management detected: ")
954 ACE_TEXT(
"read more bytes than available.\n")),
959 this->receive_sample_remaining_ = 0;
962 update_buffer_index(done);
966 template<
typename TH,
typename DSH>
970 return sample.
data(&mb_allocator_);
static const ACE_Time_Value max_time
size_t length(void) const
char * rd_ptr(void) const
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
#define ACE_NEW_MALLOC_RETURN(POINTER, ALLOCATOR, CONSTRUCTOR, RET_VAL)
Holds a data sample received by the transport.
virtual ACE_Message_Block * release(void)
TransportReceiveStrategy(const TransportInst_rch &config, size_t receive_buffers_count=RECEIVE_BUFFERS)
ACE_Message_Block * cont(void) const
char * wr_ptr(void) const
size_t format_hexdump(const char *buffer, size_t size, ACE_TCHAR *obuf, size_t obuf_sz)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Message_Block * data(ACE_Allocator *mb_alloc=0) const
#define ACE_DES_FREE(POINTER, DEALLOCATOR, CLASS)
#define ACE_ERROR_RETURN(X, Y)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
The Internal API and Implementation of OpenDDS.