TransportReceiveStrategy_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "TransportReceiveStrategy_T.h"
00009 #include "ace/INET_Addr.h"
00010 
00011 #if !defined (__ACE_INLINE__)
00012 #include "TransportReceiveStrategy_T.inl"
00013 #endif /* __ACE_INLINE__ */
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 template<typename TH, typename DSH>
00019 TransportReceiveStrategy<TH, DSH>::TransportReceiveStrategy()
00020   : gracefully_disconnected_(false),
00021     receive_sample_remaining_(0),
00022     mb_allocator_(MESSAGE_BLOCKS),
00023     db_allocator_(DATA_BLOCKS),
00024     data_allocator_(DATA_BLOCKS),
00025     buffer_index_(0),
00026     payload_(0),
00027     good_pdu_(true),
00028     pdu_remaining_(0)
00029 {
00030   DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00031 
00032   if (Transport_debug_level >= 2) {
00033     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00034                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00035                &mb_allocator_, MESSAGE_BLOCKS));
00036     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00037                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038                &db_allocator_, DATA_BLOCKS));
00039     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00040                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041                &data_allocator_, DATA_BLOCKS));
00042   }
00043 
00044   ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00045 }
00046 
00047 template<typename TH, typename DSH>
00048 TransportReceiveStrategy<TH, DSH>::~TransportReceiveStrategy()
00049 {
00050   DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00051 
00052   if (this->receive_buffers_[this->buffer_index_] != 0) {
00053     size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00054 
00055     if (size > 0) {
00056       ACE_DEBUG((LM_WARNING,
00057                  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00058                  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00059                  size));
00060     }
00061   }
00062 }
00063 
00064 template<typename TH, typename DSH>
00065 bool
00066 TransportReceiveStrategy<TH, DSH>::check_header(const TH& /*header*/)
00067 {
00068   return true;
00069 }
00070 
00071 template<typename TH, typename DSH>
00072 bool
00073 TransportReceiveStrategy<TH, DSH>::check_header(const DSH& /*header*/)
00074 {
00075   return true;
00076 }
00077 
00078 /// Note that this is just an initial implementation.  We may take
00079 /// some shortcuts (we will) that will need to be dealt with later once
00080 /// a more robust implementation can be put in place.
00081 ///
00082 /// Our handle_dds_input() method is called by the reactor when there is
00083 /// data to be pulled from our peer() ACE_SOCK_Stream.
00084 template<typename TH, typename DSH>
00085 int
00086 TransportReceiveStrategy<TH, DSH>::handle_dds_input(ACE_HANDLE fd)
00087 {
00088   DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00089 
00090   //
00091   // What we will be doing here:
00092   //
00093   //   1) Read as much data as possible from the handle.
00094   //   2) Process each Transport layer packet
00095   //      A) Parse the Transport header
00096   //      B) Process each DataSample in the packet
00097   //         a) Parse the DataSampleHeader
00098   //         b) Parse the remainder of the sample
00099   //         c) call data_received for complete samples
00100   //         d) store any partial sample or header data
00101   //   3) return
00102   //
00103   // The state that we might need to maintain includes:
00104   //     I) Current Transport layer header
00105   //    II) Current DataSample header
00106   //   III) Current DataSample data
00107   //
00108   // For each of these elements, they can be:
00109   //     i) empty
00110   //    ii) partial (not able to parse yet).
00111   //   iii) complete (parsed values available).
00112   //
00113   // The read buffers will be a series of data buffers of a fixed size
00114   // that are each managed by an ACE_Data_Block, each of which is
00115   // managed by an ACE_Message_Block containing the read and write
00116   // pointers into the data.
00117   //
00118   // As messages are parsed, new ACE_Message_Blocks will be formed with
00119   // read and write pointers adjusted to reference the individual
00120   // DataSample buffer contents.
00121   //
00122   // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
00123   // will be acquired from a Cached_Allocater_with_overflow.  The size
00124   // of the individual data buffers will be set initially to the
00125   // ethernet MTU to allow for full TCP messages to be received into
00126   // individual blocks.
00127   //
00128   // Reading will be performed with a two member struct iov, to allow
00129   // for the remainder of the current buffer to be used along with at
00130   // least one completely empty buffer.  There will be a low water
00131   // parameter used to stop the use of the current block when the
00132   // available space in it is reduced.
00133   //
00134 
00135   //
00136   // Establish the current receive buffer.
00137   //
00138   //   This involves checking for any remainder from the previous trip
00139   //   through the code.  If there is no current buffer, or the buffer
00140   //   has less than the low water mark space() left, then promote the
00141   //   next receive buffer to the current.  If none is present, create a
00142   //   new one and use it.
00143   //
00144   //
00145   // Establish the next receive buffer.
00146   //
00147   //   This involves checking for any previous complete buffers that
00148   //   were not promoted in the previous step.  If none are present,
00149   //   create a new one and use that.
00150   //
00151 
00152   //
00153   // Remove any buffers that have been completely read and have less
00154   // than the low water amount of space left.
00155   //
00156   size_t index;
00157 
00158   for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00159     if ((this->receive_buffers_[index] != 0)
00160         && (this->receive_buffers_[index]->length() == 0)
00161         && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00162       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00163             "Remove a receive_buffer_[%d] from use.\n",
00164             index));
00165 
00166       // unlink any Message_Block that continues to this one
00167       // being removed.
00168       // This avoids a possible infinite ->cont() loop.
00169       for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00170         if ((0 != this->receive_buffers_[ii]) &&
00171             (this->receive_buffers_[ii]->cont() ==
00172              this->receive_buffers_[index])) {
00173           this->receive_buffers_[ii]->cont(0);
00174         }
00175       }
00176 
00177       //
00178       // Remove the receive buffer from use.
00179       //
00180       ACE_DES_FREE(
00181         this->receive_buffers_[index],
00182         this->mb_allocator_.free,
00183         ACE_Message_Block);
00184       this->receive_buffers_[index] = 0;
00185     }
00186   }
00187 
00188   //
00189   // Allocate buffers for any empty slots.  We may have emptied one just
00190   // here, but others may have been emptied by a large read during the
00191   // last trip through the code as well.
00192   //
00193   size_t previous = this->buffer_index_;
00194 
00195   for (index = this->buffer_index_;
00196        this->successor_index(previous) != this->buffer_index_;
00197        index = this->successor_index(index)) {
00198     if (this->receive_buffers_[index] == 0) {
00199       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00200             "Allocate a Message_Block for new receive_buffer_[%d].\n",
00201             index));
00202 
00203       ACE_NEW_MALLOC_RETURN(
00204         this->receive_buffers_[index],
00205         (ACE_Message_Block*) this->mb_allocator_.malloc(
00206           sizeof(ACE_Message_Block)),
00207         ACE_Message_Block(
00208           RECEIVE_DATA_BUFFER_SIZE,     // Buffer size
00209           ACE_Message_Block::MB_DATA,   // Default
00210           0,                            // Start with no continuation
00211           0,                            // Let the constructor allocate
00212           &this->data_allocator_,       // Our buffer cache
00213           &this->receive_lock_,         // Our locking strategy
00214           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
00215           ACE_Time_Value::zero,         // Default
00216           ACE_Time_Value::max_time,     // Default
00217           &this->db_allocator_,         // Our data block cache
00218           &this->mb_allocator_          // Our message block cache
00219         ),
00220         -1);
00221     }
00222 
00223     //
00224     // Chain the buffers.  This allows us to have portions of parsed
00225     // data cross buffer boundaries without a problem.
00226     //
00227     if (previous != index) {
00228       this->receive_buffers_[previous]->cont(
00229         this->receive_buffers_[index]);
00230     }
00231 
00232     previous = index;
00233   }
00234 
00235   //
00236   // Read from handle.
00237   //
00238   //   This involves a non-blocking recvv_n().  Any remaining space in
00239   //   the buffers should be saved for the next pass through the code.
00240   //   If more than one receive buffer has data, chain them.  Promote
00241   //   all receive buffers so that the next pass through, the first
00242   //   buffer with space remaining will appear as the current receive
00243   //   buffer.
00244   //
00245   //   This is probably what should remain in the specific
00246   //   implementation, with the rest of this factored back up into the
00247   //   framework.
00248   //
00249 
00250   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00251         "Form the iovec from the message block\n"));
00252 
00253   //
00254   // Form the iovec from the message block chain of receive buffers.
00255   //
00256   iovec iov[RECEIVE_BUFFERS];
00257   size_t vec_index = 0;
00258   size_t current = this->buffer_index_;
00259 
00260   for (index = 0;
00261        index < RECEIVE_BUFFERS;
00262        ++index, current = this->successor_index(current)) {
00263     // Invariant.  ASSERT?
00264     if (this->receive_buffers_[current] == 0) {
00265       ACE_ERROR_RETURN((LM_ERROR,
00266                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00267                         ACE_TEXT("receive buffer management detected.\n")),
00268                        -1);
00269     }
00270 
00271 #ifdef _MSC_VER
00272 #pragma warning(push)
00273 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
00274 // since on other platforms iov_len is 64-bit
00275 #pragma warning(disable : 4267)
00276 #endif
00277     // This check covers the case where we have unread data in
00278     // the first buffer, but no space to write any more data.
00279     if (this->receive_buffers_[current]->space() > 0) {
00280       iov[vec_index].iov_len  = this->receive_buffers_[current]->space();
00281       iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00282 
00283       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00284             "index==%d, len==%d, base==%x\n",
00285             vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00286 
00287       vec_index++;
00288     }
00289   }
00290 
00291 #ifdef _MSC_VER
00292 #pragma warning(pop)
00293 #endif
00294 
00295   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00296         "Perform the recvv() call\n"));
00297 
00298   //
00299   // Read into the buffers.
00300   //
00301   ACE_INET_Addr remote_address;
00302   ssize_t bytes_remaining = this->receive_bytes(iov,
00303                                                 static_cast<int>(vec_index),
00304                                                 remote_address,
00305                                                 fd);
00306 
00307   if (bytes_remaining < 0) {
00308     ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00309               ACE_TEXT("with data link detected: %p.\n"),
00310               ACE_TEXT("receive_bytes")));
00311 
00312     // The relink() will handle the connection to the ReconnectTask to do
00313     // the reconnect so this reactor thread will not be block.
00314     this->relink();
00315 
00316     // Close connection anyway.
00317     return -1;
00318     // Returning -1 takes the handle out of the reactor read mask.
00319   }
00320 
00321   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00322             "recvv() return %d - we call this the bytes_remaining.\n",
00323             bytes_remaining), 5);
00324 
00325   if (bytes_remaining == 0) {
00326     if (this->gracefully_disconnected_) {
00327       VDBG_LVL((LM_INFO,
00328                 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00329                ,1);
00330       return -1;
00331 
00332     } else {
00333       VDBG_LVL((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Unrecoverable problem ")
00334                 ACE_TEXT("with data link detected\n")), 1);
00335 
00336       // The relink() will handle the connection to the ReconnectTask to do
00337       // the reconnect so this reactor thread will not be block.
00338       this->relink();
00339 
00340       // Close connection anyway.
00341       return -1;
00342       // Returning -1 takes the handle out of the reactor read mask.
00343     }
00344   }
00345 
00346   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00347             "START Adjust the message block chain pointers to account for the "
00348             "new data.\n"), 5);
00349 
00350   //
00351   // Adjust the message block chain pointers to account for the new
00352   // data.
00353   //
00354   size_t bytes = bytes_remaining;
00355 
00356   if (!this->pdu_remaining_) {
00357     this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00358   }
00359 
00360   for (index = this->buffer_index_;
00361        bytes > 0;
00362        index = this->successor_index(index)) {
00363     VDBG((LM_DEBUG,"(%P|%t) DBG:    -> "
00364           "At top of for..loop block.\n"));
00365     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00366           "index == %d.\n", index));
00367     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00368           "bytes == %d.\n", bytes));
00369 
00370     size_t amount
00371     = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00372 
00373     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00374           "amount == %d.\n", amount));
00375 
00376     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00377           "this->receive_buffers_[index]->rd_ptr() ==  %u.\n",
00378           this->receive_buffers_[index]->rd_ptr()));
00379     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00380           "this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00381           this->receive_buffers_[index]->wr_ptr()));
00382 
00383     this->receive_buffers_[index]->wr_ptr(amount);
00384 
00385     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00386           "Now, this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00387           this->receive_buffers_[index]->wr_ptr()));
00388 
00389     bytes -= amount;
00390 
00391     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00392           "Now, bytes == %d .\n", bytes));
00393 
00394     // This is yukky to do here, but there is a fine line between
00395     // where things are moved and where they are checked.
00396     if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00397       // Here we have read more data than we passed in buffer.  Bad.
00398       ACE_ERROR_RETURN((LM_ERROR,
00399                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00400                         ACE_TEXT("receive buffer management detected: ")
00401                         ACE_TEXT("read more bytes than available.\n")),
00402                        -1);
00403     }
00404   }
00405 
00406   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00407         "DONE Adjust the message block chain pointers to account "
00408         "for the new data.\n"));
00409 
00410   //
00411   // While the receive buffer is not empty:
00412   //
00413   //   This is the receive buffer(s) that was just read.  The receive
00414   //   buffer message block is used to manage our parsing through this
00415   //   data.  When we have parsed out all the data, then it will be
00416   //   considered empty.  This message block read pointer indicates
00417   //   where we are in the parsing process.  Other message blocks refer
00418   //   to the parsed data and retain the data block references to
00419   //   prevent the data from being released until we have completed all
00420   //   uses for the data in the receive buffer.  This will normally be
00421   //   after the sample data is demarshaled in the DataReader
00422   //   components.
00423   //
00424   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00425         "Start processing the data we just received.\n"));
00426 
00427   //
00428   // Operate while we have more data to process.
00429   //
00430   while (true) {
00431 
00432     //
00433     // Manage the current transport header.
00434     //
00435     //   This involves checking to see if we have remaining data in the
00436     //   current packet.  If not, or if we have not read a complete
00437     //   header, then we read a transport header and check it for
00438     //   validity.  As we have a valid transport header with data
00439     //   remaining to read in the packet that it represents, we move on.
00440     //
00441     //   As new transport headers are to be read, they are read into a
00442     //   message buffer member variable and demarshaled directly.  The
00443     //   values are retained for the lifetime of the processing of the
00444     //   instant transport packet.  The member message buffer allows us
00445     //   to retain partially read transport headers until we can read
00446     //   more data.
00447     //
00448 
00449     //
00450     // Check the remaining transport packet length to see if we are
00451     // expecting a new one.
00452     //
00453     if (this->pdu_remaining_ == 0) {
00454       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00455             "We are expecting a transport packet header.\n"));
00456 
00457       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00458             "this->buffer_index_ == %d.\n",this->buffer_index_));
00459 
00460       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00461             "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00462             "== %u.\n",
00463             this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00464 
00465       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00466             "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00467             "== %u.\n",
00468             this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00469 
00470       if (this->receive_buffers_[this->buffer_index_]->total_length()
00471           < this->receive_transport_header_.max_marshaled_size()) {
00472         //
00473         // Not enough room in the buffer for the entire Transport
00474         // header that we need to read, so relinquish control until
00475         // we get more data.
00476         //
00477         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00478               "Not enough bytes read to account for a transport "
00479               "packet header.  We are done here - we need to "
00480               "receive more bytes.\n"));
00481 
00482         this->receive_transport_header_.incomplete(
00483           *this->receive_buffers_[this->buffer_index_]);
00484 
00485         return 0;
00486 
00487       } else {
00488         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00489               "We have enough bytes to demarshall the transport "
00490               "packet header.\n"));
00491 
00492         // only do the hexdump if it will be printed - to not impact performance.
00493         if (Transport_debug_level > 5) {
00494           ACE_TCHAR xbuffer[4096];
00495           const ACE_Message_Block& mb =
00496             *this->receive_buffers_[this->buffer_index_];
00497           size_t xbytes = mb.length();
00498 
00499           xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00500 
00501           ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00502 
00503           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00504                 "Hex Dump of transport header block "
00505                 "(%d bytes):\n%s\n", xbytes, xbuffer));
00506         }
00507 
00508         //
00509         // Demarshal the transport header.
00510         //
00511         this->receive_transport_header_ =
00512           *this->receive_buffers_[this->buffer_index_];
00513 
00514         //
00515         // Check the TransportHeader.
00516         //
00517         if (!this->receive_transport_header_.valid()) {
00518           ACE_ERROR_RETURN((LM_ERROR,
00519                             ACE_TEXT
00520                             ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00521                            -1);
00522         }
00523 
00524         this->good_pdu_ = check_header(this->receive_transport_header_);
00525         this->pdu_remaining_ = this->receive_transport_header_.length_;
00526 
00527         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00528               "Amount of transport packet bytes (remaining): %d.\n",
00529               this->pdu_remaining_));
00530       }
00531     }
00532 
00533     //
00534     // Ignore bad PDUs by skipping over them.  We do this out here
00535     // in case we did not skip over the entire bad PDU last time.
00536     //
00537     {
00538       int rtn_code = skip_bad_pdus();
00539       if (rtn_code <= 0) return rtn_code;
00540     }
00541 
00542     //
00543     // Keep processing samples while we have data to read.
00544     //
00545     while (this->pdu_remaining_ > 0) {
00546       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00547             "We have a transport packet now.  There are more sample "
00548             "bytes to parse in order to complete the packet.\n"));
00549 
00550       //
00551       // Manage the current sample header.
00552       //
00553       //  This involves checking to see if we have remaining data in the
00554       //  current sample.  If not, or if we have not read a complete
00555       //  header, then we read a sample header and check it for validity,
00556       //  which currently involves checking the message type only.  As we
00557       //  have a valid sample header with data remaining to read in the
00558       //  sample that it represents, we move on.
00559       //
00560       //  As new sample headers are read, the are read into a message
00561       //  buffer member variable and demarshaled directly.  The values are
00562       //  retained for the lifetime of the sample and are passed as part
00563       //  of the recieve data sample itself.  The member message buffer
00564       //  allows us to retain partially read sample headers until we can
00565       //  read more data.
00566       //
00567       if (this->receive_sample_remaining_ == 0) {
00568         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00569               "We are not working on some remaining sample data.  "
00570               "We are expecting to parse a sample header now.\n"));
00571 
00572         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00573               "this->buffer_index_ == %d.\n",this->buffer_index_));
00574 
00575         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00576               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00577               "== %u.\n",
00578               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00579 
00580         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00581               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00582               "== %u.\n",
00583               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00584 
00585         if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00586           //
00587           // Not enough room in the buffer for the entire Sample
00588           // header that we need to read, so relinquish control until
00589           // we get more data.
00590           //
00591           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00592                 "Not enough bytes have been received to account "
00593                 "for a complete sample header.  We are done for "
00594                 "now - we need to receive more data before we "
00595                 "can go on.\n"));
00596           if (Transport_debug_level > 2) {
00597             ACE_TCHAR ebuffer[350];
00598             ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00599             const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00600             ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00601             ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   "
00602               "Partial DataSampleHeader:\n%s\n", ebuffer));
00603           }
00604 
00605           return 0;
00606 
00607         } else {
00608           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00609                 "We have received enough bytes for the sample "
00610                 "header.  Demarshall the sample header now.\n"));
00611 
00612           // only do the hexdump if it will be printed - to not impact performance.
00613           if (Transport_debug_level > 5) {
00614             ACE_TCHAR ebuffer[4096];
00615             ACE::format_hexdump
00616             (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00617              this->data_sample_header_.max_marshaled_size(),
00618              ebuffer, sizeof(ebuffer));
00619 
00620             VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00621                   "Hex Dump:\n%s\n", ebuffer));
00622           }
00623 
00624           this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00625 
00626           //
00627           // Demarshal the sample header.
00628           //
00629           this->data_sample_header_ =
00630             *this->receive_buffers_[this->buffer_index_];
00631 
00632           //
00633           // Check the DataSampleHeader.
00634           //
00635 
00636           this->good_pdu_ = check_header(data_sample_header_);
00637 
00638           //
00639           // Set the amount to parse into the message buffer.  We
00640           // can't just use the header value to keep track of
00641           // where we are since downstream processing will expect
00642           // the value to be correct (unadjusted by us).
00643           //
00644           this->receive_sample_remaining_ =
00645             this->data_sample_header_.message_length();
00646 
00647           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00648                 "The demarshalled sample header says that we "
00649                 "have %d bytes to read for the data portion of "
00650                 "the sample.\n",
00651                 this->receive_sample_remaining_));
00652 
00653           //
00654           // Decrement packet size.
00655           //
00656           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00657                 "this->data_sample_header_.marshaled_size() "
00658                 "== %d.\n",
00659                 this->data_sample_header_.marshaled_size()));
00660 
00661           this->pdu_remaining_
00662           -= this->data_sample_header_.marshaled_size();
00663 
00664           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00665                 "Amount of transport packet remaining: %d.\n",
00666                 this->pdu_remaining_));
00667 
00668           int rtn_code = skip_bad_pdus();
00669           if (rtn_code <= 0) return rtn_code;
00670         }
00671       }
00672 
00673       bool last_buffer = false;
00674       update_buffer_index(last_buffer);
00675 
00676       if (this->receive_sample_remaining_ > 0) {
00677         //
00678         // Manage the current sample data.
00679         //
00680         //   This involves reading data to complete the current sample.  As
00681         //   samples are completed, they are dispatched via the
00682         //   data_received() mechanism.  This data is read into message
00683         //   blocks that are obtained from the pool of message blocks since
00684         //   the lifetime of this data will last until the DataReader
00685         //   components demarshal the sample data.  A reference to the
00686         //   current sample being built is retained as a member to allow us
00687         //   to hold partialy read samples until they are completed.
00688         //
00689         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00690               "Determine amount of data for the next block in the chain\n"));
00691 
00692         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00693               "this->buffer_index_ == %d.\n",this->buffer_index_));
00694 
00695         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00696               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00697               "== %u.\n",
00698               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00699 
00700         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00701               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00702               "== %u.\n",
00703               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00704         //
00705         // Determine the amount of data for the next block in the chain.
00706         //
00707         const size_t amount = ace_min<size_t>(
00708           this->receive_sample_remaining_,
00709           this->receive_buffers_[this->buffer_index_]->length());
00710 
00711         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00712               "amount of data for the next block in the chain is %d\n",
00713               amount));
00714         //
00715         // Now create a message block for the data in the current buffer
00716         // and chain it if we are starting a new sample.
00717         //
00718         ACE_Message_Block* current_sample_block = 0;
00719         ACE_NEW_MALLOC_RETURN(
00720           current_sample_block,
00721           (ACE_Message_Block*) this->mb_allocator_.malloc(
00722             sizeof(ACE_Message_Block)),
00723           ACE_Message_Block(
00724             this->receive_buffers_[this->buffer_index_]
00725             ->data_block()->duplicate(),
00726             0,
00727             &this->mb_allocator_),
00728           -1);
00729 
00730         //
00731         // Chain it to the end of the current sample.
00732         //
00733         if (this->payload_ == 0) {
00734           this->payload_ = current_sample_block;
00735 
00736         } else {
00737           ACE_Message_Block* block = this->payload_;
00738 
00739           while (block->cont() != 0) {
00740             block = block->cont();
00741           }
00742 
00743           block->cont(current_sample_block);
00744         }
00745 
00746         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00747               "Before adjustment of the pointers and byte counters\n"));
00748 
00749         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00750               "this->payload_->rd_ptr() "
00751               "== %u.\n",
00752               this->payload_->rd_ptr()));
00753 
00754         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00755               "this->payload_->wr_ptr() "
00756               "== %u.\n",
00757               this->payload_->wr_ptr()));
00758 
00759         //
00760         // Adjust the pointers and byte counters.
00761         //
00762         current_sample_block->rd_ptr(
00763           this->receive_buffers_[this->buffer_index_]->rd_ptr());
00764         current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00765         this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00766         this->receive_sample_remaining_ -= amount;
00767         this->pdu_remaining_            -= amount;
00768 
00769         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00770               "After adjustment of the pointers and byte counters\n"));
00771 
00772         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00773               "this->payload_->rd_ptr() "
00774               "== %u.\n",
00775               this->payload_->rd_ptr()));
00776 
00777         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00778               "this->payload_->wr_ptr() "
00779               "== %u.\n",
00780               this->payload_->wr_ptr()));
00781 
00782         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00783               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00784               "== %u.\n",
00785               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00786 
00787         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00788               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00789               "== %u.\n",
00790               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00791 
00792         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00793               "After adjustment, remaining sample bytes == %d\n",
00794               this->receive_sample_remaining_));
00795         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00796               "After adjustment, remaining transport packet bytes == %d\n",
00797               this->pdu_remaining_));
00798       }
00799 
00800       //
00801       // Dispatch the received message if we have received it all.
00802       //
00803       // NB: Since we are doing this synchronously and without passing
00804       //     ownership of the sample, we can use NULL mutex lock for
00805       //     the allocators.  Since only one thread can be in this
00806       //     routine at a time, that is.
00807       //
00808       if (this->receive_sample_remaining_ == 0) {
00809         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00810               "Now dispatch the sample to the DataLink\n"));
00811 
00812         ReceivedDataSample rds(this->payload_);
00813         this->payload_ = 0;  // rds takes ownership of payload_
00814         if (this->data_sample_header_.into_received_data_sample(rds)) {
00815 
00816           if (this->data_sample_header_.more_fragments()
00817               || this->receive_transport_header_.last_fragment()) {
00818             VDBG((LM_DEBUG,"(%P|%t) DBG:   Attempt reassembly of fragments\n"));
00819 
00820             if (this->reassemble(rds)) {
00821               VDBG((LM_DEBUG,"(%P|%t) DBG:   Reassembled complete message\n"));
00822               this->deliver_sample(rds, remote_address);
00823             }
00824             // If reassemble() returned false, it takes ownership of the data
00825             // just like deliver_sample() does.
00826 
00827           } else {
00828             this->deliver_sample(rds, remote_address);
00829           }
00830         }
00831 
00832         // For the reassembly algorithm, the 'last_fragment_' header bit only
00833         // applies to the first DataSampleHeader in the TransportHeader
00834         this->receive_transport_header_.last_fragment(false);
00835 
00836         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00837               "Release the sample that we just sent.\n"));
00838         // ~ReceivedDataSample() releases the payload_ message block
00839       }
00840 
00841       update_buffer_index(last_buffer);
00842 
00843       if (last_buffer) {
00844         // Relinquish control if there is no more data to process.
00845         VDBG((LM_DEBUG,"(%P|%t) DBG:   We are done - no more data.\n"));
00846         return 0;
00847       }
00848 
00849     } // End of while (this->pdu_remaining_ > 0)
00850 
00851     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00852           "Let's try to do some more.\n"));
00853   } // End of while (true)
00854 
00855   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00856         "It looks like we are done - the done loop has finished.\n"));
00857   //
00858   // Relinquish control.
00859   //
00860   //   This involves ensuring that when we reenter this method, we will
00861   //   pick up from where we left off correctly.
00862   //
00863   return 0;
00864 }
00865 
00866 template<typename TH, typename DSH>
00867 bool
00868 TransportReceiveStrategy<TH, DSH>::reassemble(ReceivedDataSample&)
00869 {
00870   ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00871     "WARNING: derived class must override if specific transport type uses "
00872     "fragmentation and reassembly\n"));
00873   return false;
00874 }
00875 
00876 template<typename TH, typename DSH>
00877 void
00878 TransportReceiveStrategy<TH, DSH>::reset()
00879 {
00880   this->receive_sample_remaining_ = 0;
00881   ACE_Message_Block::release(this->payload_);
00882   this->payload_ = 0;
00883   this->good_pdu_ = true;
00884   this->pdu_remaining_ = 0;
00885   for (int i = 0; i < RECEIVE_BUFFERS; ++i) {
00886     ACE_Message_Block& rb = *this->receive_buffers_[i];
00887     rb.rd_ptr(rb.wr_ptr());
00888   }
00889 }
00890 
00891 template<typename TH, typename DSH>
00892 void
00893 TransportReceiveStrategy<TH, DSH>::update_buffer_index(bool& done)
00894 {
00895   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00896         "Adjust the buffer chain in case we crossed into the next "
00897         "buffer after the last read(s).\n"));
00898   const size_t initial = this->buffer_index_;
00899   while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00900     this->buffer_index_ = this->successor_index(this->buffer_index_);
00901 
00902     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00903           "Set this->buffer_index_ = %d.\n",
00904           this->buffer_index_));
00905 
00906     if (initial == this->buffer_index_) {
00907       done = true; // no other buffers in receive_buffers_ have data
00908       return;
00909     }
00910   }
00911 }
00912 
00913 template<typename TH, typename DSH>
00914 int
00915 TransportReceiveStrategy<TH, DSH>::skip_bad_pdus()
00916 {
00917   if (this->good_pdu_) return 1;
00918 
00919   //
00920   // Adjust the message block chain pointers to account for the
00921   // skipped data.
00922   //
00923   for (size_t index = this->buffer_index_;
00924        this->pdu_remaining_ > 0;
00925        index = this->successor_index(index)) {
00926     size_t amount =
00927       ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00928 
00929     this->receive_buffers_[index]->rd_ptr(amount);
00930     this->pdu_remaining_ -= amount;
00931 
00932     if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00933       ACE_ERROR_RETURN((LM_ERROR,
00934                         ACE_TEXT("(%P|%t) ERROR: ")
00935                         ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00936                         ACE_TEXT(" - Unrecoverably corrupted ")
00937                         ACE_TEXT("receive buffer management detected: ")
00938                         ACE_TEXT("read more bytes than available.\n")),
00939                        -1);
00940     }
00941   }
00942 
00943   this->receive_sample_remaining_ = 0;
00944 
00945   this->receive_sample_remaining_ = 0;
00946 
00947   bool done = false;
00948   update_buffer_index(done);
00949   return done ? 0 : 1;
00950 }
00951 
00952 }
00953 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7