TransportReceiveStrategy_T.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "TransportReceiveStrategy_T.h"
00009 #include "ace/INET_Addr.h"
00010 #include "ace/Min_Max.h"
00011 
00012 #if !defined (__ACE_INLINE__)
00013 #include "TransportReceiveStrategy_T.inl"
00014 #endif /* __ACE_INLINE__ */
00015 
00016 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00017 
00018 namespace OpenDDS {
00019 namespace DCPS {
00020 
00021 template<typename TH, typename DSH>
00022 TransportReceiveStrategy<TH, DSH>::TransportReceiveStrategy()
00023   : gracefully_disconnected_(false),
00024     receive_sample_remaining_(0),
00025     mb_allocator_(MESSAGE_BLOCKS),
00026     db_allocator_(DATA_BLOCKS),
00027     data_allocator_(DATA_BLOCKS),
00028     buffer_index_(0),
00029     payload_(0),
00030     good_pdu_(true),
00031     pdu_remaining_(0)
00032 {
00033   DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00034 
00035   if (Transport_debug_level >= 2) {
00036     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00037                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038                &mb_allocator_, MESSAGE_BLOCKS));
00039     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00040                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041                &db_allocator_, DATA_BLOCKS));
00042     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00043                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00044                &data_allocator_, DATA_BLOCKS));
00045   }
00046 
00047   ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00048 }
00049 
00050 template<typename TH, typename DSH>
00051 TransportReceiveStrategy<TH, DSH>::~TransportReceiveStrategy()
00052 {
00053   DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00054 
00055   if (this->receive_buffers_[this->buffer_index_] != 0) {
00056     size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00057 
00058     if (size > 0) {
00059       ACE_DEBUG((LM_WARNING,
00060                  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00061                  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00062                  size));
00063     }
00064   }
00065 }
00066 
00067 template<typename TH, typename DSH>
00068 bool
00069 TransportReceiveStrategy<TH, DSH>::check_header(const TH& /*header*/)
00070 {
00071   return true;
00072 }
00073 
00074 template<typename TH, typename DSH>
00075 bool
00076 TransportReceiveStrategy<TH, DSH>::check_header(const DSH& /*header*/)
00077 {
00078   return true;
00079 }
00080 
00081 /// Note that this is just an initial implementation.  We may take
00082 /// some shortcuts (we will) that will need to be dealt with later once
00083 /// a more robust implementation can be put in place.
00084 ///
00085 /// Our handle_dds_input() method is called by the reactor when there is
00086 /// data to be pulled from our peer() ACE_SOCK_Stream.
00087 template<typename TH, typename DSH>
00088 int
00089 TransportReceiveStrategy<TH, DSH>::handle_dds_input(ACE_HANDLE fd)
00090 {
00091   DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00092 
00093   //
00094   // What we will be doing here:
00095   //
00096   //   1) Read as much data as possible from the handle.
00097   //   2) Process each Transport layer packet
00098   //      A) Parse the Transport header
00099   //      B) Process each DataSample in the packet
00100   //         a) Parse the DataSampleHeader
00101   //         b) Parse the remainder of the sample
00102   //         c) call data_received for complete samples
00103   //         d) store any partial sample or header data
00104   //   3) return
00105   //
00106   // The state that we might need to maintain includes:
00107   //     I) Current Transport layer header
00108   //    II) Current DataSample header
00109   //   III) Current DataSample data
00110   //
00111   // For each of these elements, they can be:
00112   //     i) empty
00113   //    ii) partial (not able to parse yet).
00114   //   iii) complete (parsed values available).
00115   //
00116   // The read buffers will be a series of data buffers of a fixed size
00117   // that are each managed by an ACE_Data_Block, each of which is
00118   // managed by an ACE_Message_Block containing the read and write
00119   // pointers into the data.
00120   //
00121   // As messages are parsed, new ACE_Message_Blocks will be formed with
00122   // read and write pointers adjusted to reference the individual
00123   // DataSample buffer contents.
00124   //
00125   // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
00126   // will be acquired from a Cached_Allocater_with_overflow.  The size
00127   // of the individual data buffers will be set initially to the
00128   // ethernet MTU to allow for full TCP messages to be received into
00129   // individual blocks.
00130   //
00131   // Reading will be performed with a two member struct iov, to allow
00132   // for the remainder of the current buffer to be used along with at
00133   // least one completely empty buffer.  There will be a low water
00134   // parameter used to stop the use of the current block when the
00135   // available space in it is reduced.
00136   //
00137 
00138   //
00139   // Establish the current receive buffer.
00140   //
00141   //   This involves checking for any remainder from the previous trip
00142   //   through the code.  If there is no current buffer, or the buffer
00143   //   has less than the low water mark space() left, then promote the
00144   //   next receive buffer to the current.  If none is present, create a
00145   //   new one and use it.
00146   //
00147   //
00148   // Establish the next receive buffer.
00149   //
00150   //   This involves checking for any previous complete buffers that
00151   //   were not promoted in the previous step.  If none are present,
00152   //   create a new one and use that.
00153   //
00154 
00155   //
00156   // Remove any buffers that have been completely read and have less
00157   // than the low water amount of space left.
00158   //
00159   size_t index;
00160 
00161   for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00162     if ((this->receive_buffers_[index] != 0)
00163         && (this->receive_buffers_[index]->length() == 0)
00164         && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00165       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00166             "Remove a receive_buffer_[%d] from use.\n",
00167             index));
00168 
00169       // unlink any Message_Block that continues to this one
00170       // being removed.
00171       // This avoids a possible infinite ->cont() loop.
00172       for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00173         if ((0 != this->receive_buffers_[ii]) &&
00174             (this->receive_buffers_[ii]->cont() ==
00175              this->receive_buffers_[index])) {
00176           this->receive_buffers_[ii]->cont(0);
00177         }
00178       }
00179 
00180       //
00181       // Remove the receive buffer from use.
00182       //
00183       ACE_DES_FREE(
00184         this->receive_buffers_[index],
00185         this->mb_allocator_.free,
00186         ACE_Message_Block);
00187       this->receive_buffers_[index] = 0;
00188     }
00189   }
00190 
00191   //
00192   // Allocate buffers for any empty slots.  We may have emptied one just
00193   // here, but others may have been emptied by a large read during the
00194   // last trip through the code as well.
00195   //
00196   size_t previous = this->buffer_index_;
00197 
00198   for (index = this->buffer_index_;
00199        this->successor_index(previous) != this->buffer_index_;
00200        index = this->successor_index(index)) {
00201     if (this->receive_buffers_[index] == 0) {
00202       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00203             "Allocate a Message_Block for new receive_buffer_[%d].\n",
00204             index));
00205 
00206       ACE_NEW_MALLOC_RETURN(
00207         this->receive_buffers_[index],
00208         (ACE_Message_Block*) this->mb_allocator_.malloc(
00209           sizeof(ACE_Message_Block)),
00210         ACE_Message_Block(
00211           RECEIVE_DATA_BUFFER_SIZE,     // Buffer size
00212           ACE_Message_Block::MB_DATA,   // Default
00213           0,                            // Start with no continuation
00214           0,                            // Let the constructor allocate
00215           &this->data_allocator_,       // Our buffer cache
00216           &this->receive_lock_,         // Our locking strategy
00217           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
00218           ACE_Time_Value::zero,         // Default
00219           ACE_Time_Value::max_time,     // Default
00220           &this->db_allocator_,         // Our data block cache
00221           &this->mb_allocator_          // Our message block cache
00222         ),
00223         -1);
00224     }
00225 
00226     //
00227     // Chain the buffers.  This allows us to have portions of parsed
00228     // data cross buffer boundaries without a problem.
00229     //
00230     if (previous != index) {
00231       this->receive_buffers_[previous]->cont(
00232         this->receive_buffers_[index]);
00233     }
00234 
00235     previous = index;
00236   }
00237 
00238   //
00239   // Read from handle.
00240   //
00241   //   This involves a non-blocking recvv_n().  Any remaining space in
00242   //   the buffers should be saved for the next pass through the code.
00243   //   If more than one receive buffer has data, chain them.  Promote
00244   //   all receive buffers so that the next pass through, the first
00245   //   buffer with space remaining will appear as the current receive
00246   //   buffer.
00247   //
00248   //   This is probably what should remain in the specific
00249   //   implementation, with the rest of this factored back up into the
00250   //   framework.
00251   //
00252 
00253   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00254         "Form the iovec from the message block\n"));
00255 
00256   //
00257   // Form the iovec from the message block chain of receive buffers.
00258   //
00259   iovec iov[RECEIVE_BUFFERS];
00260   size_t vec_index = 0;
00261   size_t current = this->buffer_index_;
00262 
00263   for (index = 0;
00264        index < RECEIVE_BUFFERS;
00265        ++index, current = this->successor_index(current)) {
00266     // Invariant.  ASSERT?
00267     if (this->receive_buffers_[current] == 0) {
00268       ACE_ERROR_RETURN((LM_ERROR,
00269                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00270                         ACE_TEXT("receive buffer management detected.\n")),
00271                        -1);
00272     }
00273 
00274 #ifdef _MSC_VER
00275 #pragma warning(push)
00276 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
00277 // since on other platforms iov_len is 64-bit
00278 #pragma warning(disable : 4267)
00279 #endif
00280     // This check covers the case where we have unread data in
00281     // the first buffer, but no space to write any more data.
00282     if (this->receive_buffers_[current]->space() > 0) {
00283       iov[vec_index].iov_len  = this->receive_buffers_[current]->space();
00284       iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00285 
00286       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00287             "index==%d, len==%d, base==%x\n",
00288             vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00289 
00290       vec_index++;
00291     }
00292   }
00293 
00294 #ifdef _MSC_VER
00295 #pragma warning(pop)
00296 #endif
00297 
00298   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00299         "Perform the recvv() call\n"));
00300 
00301   //
00302   // Read into the buffers.
00303   //
00304   ACE_INET_Addr remote_address;
00305   ssize_t bytes_remaining = this->receive_bytes(iov,
00306                                                 static_cast<int>(vec_index),
00307                                                 remote_address,
00308                                                 fd);
00309 
00310   if (bytes_remaining < 0) {
00311     ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00312               ACE_TEXT("with data link detected: %p.\n"),
00313               ACE_TEXT("receive_bytes")));
00314 
00315     // The relink() will handle the connection to the ReconnectTask to do
00316     // the reconnect so this reactor thread will not be block.
00317     this->relink();
00318 
00319     // Close connection anyway.
00320     return -1;
00321     // Returning -1 takes the handle out of the reactor read mask.
00322   }
00323 
00324   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00325             "recvv() return %d - we call this the bytes_remaining.\n",
00326             bytes_remaining), 5);
00327 
00328   if (bytes_remaining == 0) {
00329     if (this->gracefully_disconnected_) {
00330       VDBG_LVL((LM_INFO,
00331                 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00332                ,1);
00333       return -1;
00334 
00335     } else {
00336       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ")
00337                 ACE_TEXT("with data link detected\n")), 1);
00338 
00339       // The relink() will handle the connection to the ReconnectTask to do
00340       // the reconnect so this reactor thread will not be block.
00341       this->relink();
00342 
00343       // Close connection anyway.
00344       return -1;
00345       // Returning -1 takes the handle out of the reactor read mask.
00346     }
00347   }
00348 
00349   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00350             "START Adjust the message block chain pointers to account for the "
00351             "new data.\n"), 5);
00352 
00353   //
00354   // Adjust the message block chain pointers to account for the new
00355   // data.
00356   //
00357   size_t bytes = bytes_remaining;
00358 
00359   if (!this->pdu_remaining_) {
00360     this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00361   }
00362 
00363   for (index = this->buffer_index_;
00364        bytes > 0;
00365        index = this->successor_index(index)) {
00366     VDBG((LM_DEBUG,"(%P|%t) DBG:    -> "
00367           "At top of for..loop block.\n"));
00368     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00369           "index == %d.\n", index));
00370     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00371           "bytes == %d.\n", bytes));
00372 
00373     size_t amount
00374     = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00375 
00376     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00377           "amount == %d.\n", amount));
00378 
00379     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00380           "this->receive_buffers_[index]->rd_ptr() ==  %u.\n",
00381           this->receive_buffers_[index]->rd_ptr()));
00382     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00383           "this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00384           this->receive_buffers_[index]->wr_ptr()));
00385 
00386     this->receive_buffers_[index]->wr_ptr(amount);
00387 
00388     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00389           "Now, this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00390           this->receive_buffers_[index]->wr_ptr()));
00391 
00392     bytes -= amount;
00393 
00394     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00395           "Now, bytes == %d .\n", bytes));
00396 
00397     // This is yukky to do here, but there is a fine line between
00398     // where things are moved and where they are checked.
00399     if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00400       // Here we have read more data than we passed in buffer.  Bad.
00401       ACE_ERROR_RETURN((LM_ERROR,
00402                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00403                         ACE_TEXT("receive buffer management detected: ")
00404                         ACE_TEXT("read more bytes than available.\n")),
00405                        -1);
00406     }
00407   }
00408 
00409   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00410         "DONE Adjust the message block chain pointers to account "
00411         "for the new data.\n"));
00412 
00413   //
00414   // While the receive buffer is not empty:
00415   //
00416   //   This is the receive buffer(s) that was just read.  The receive
00417   //   buffer message block is used to manage our parsing through this
00418   //   data.  When we have parsed out all the data, then it will be
00419   //   considered empty.  This message block read pointer indicates
00420   //   where we are in the parsing process.  Other message blocks refer
00421   //   to the parsed data and retain the data block references to
00422   //   prevent the data from being released until we have completed all
00423   //   uses for the data in the receive buffer.  This will normally be
00424   //   after the sample data is demarshaled in the DataReader
00425   //   components.
00426   //
00427   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00428         "Start processing the data we just received.\n"));
00429 
00430   //
00431   // Operate while we have more data to process.
00432   //
00433   while (true) {
00434 
00435     //
00436     // Manage the current transport header.
00437     //
00438     //   This involves checking to see if we have remaining data in the
00439     //   current packet.  If not, or if we have not read a complete
00440     //   header, then we read a transport header and check it for
00441     //   validity.  As we have a valid transport header with data
00442     //   remaining to read in the packet that it represents, we move on.
00443     //
00444     //   As new transport headers are to be read, they are read into a
00445     //   message buffer member variable and demarshaled directly.  The
00446     //   values are retained for the lifetime of the processing of the
00447     //   instant transport packet.  The member message buffer allows us
00448     //   to retain partially read transport headers until we can read
00449     //   more data.
00450     //
00451 
00452     //
00453     // Check the remaining transport packet length to see if we are
00454     // expecting a new one.
00455     //
00456     if (this->pdu_remaining_ == 0) {
00457       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00458             "We are expecting a transport packet header.\n"));
00459 
00460       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00461             "this->buffer_index_ == %d.\n",this->buffer_index_));
00462 
00463       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00464             "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00465             "== %u.\n",
00466             this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00467 
00468       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00469             "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00470             "== %u.\n",
00471             this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00472 
00473       if (this->receive_buffers_[this->buffer_index_]->total_length()
00474           < this->receive_transport_header_.max_marshaled_size()) {
00475         //
00476         // Not enough room in the buffer for the entire Transport
00477         // header that we need to read, so relinquish control until
00478         // we get more data.
00479         //
00480         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00481               "Not enough bytes read to account for a transport "
00482               "packet header.  We are done here - we need to "
00483               "receive more bytes.\n"));
00484 
00485         this->receive_transport_header_.incomplete(
00486           *this->receive_buffers_[this->buffer_index_]);
00487 
00488         return 0;
00489 
00490       } else {
00491         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00492               "We have enough bytes to demarshall the transport "
00493               "packet header.\n"));
00494 
00495         // only do the hexdump if it will be printed - to not impact performance.
00496         if (Transport_debug_level > 5) {
00497           ACE_TCHAR xbuffer[4096];
00498           const ACE_Message_Block& mb =
00499             *this->receive_buffers_[this->buffer_index_];
00500           size_t xbytes = mb.length();
00501 
00502           xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00503 
00504           ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00505 
00506           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00507                 "Hex Dump of transport header block "
00508                 "(%d bytes):\n%s\n", xbytes, xbuffer));
00509         }
00510 
00511         //
00512         // Demarshal the transport header.
00513         //
00514         this->receive_transport_header_ =
00515           *this->receive_buffers_[this->buffer_index_];
00516 
00517         //
00518         // Check the TransportHeader.
00519         //
00520         if (!this->receive_transport_header_.valid()) {
00521           ACE_ERROR_RETURN((LM_ERROR,
00522                             ACE_TEXT
00523                             ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00524                            -1);
00525         }
00526 
00527         this->good_pdu_ = check_header(this->receive_transport_header_);
00528         this->pdu_remaining_ = this->receive_transport_header_.length_;
00529 
00530         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00531               "Amount of transport packet bytes (remaining): %d.\n",
00532               this->pdu_remaining_));
00533       }
00534     }
00535 
00536     //
00537     // Ignore bad PDUs by skipping over them.  We do this out here
00538     // in case we did not skip over the entire bad PDU last time.
00539     //
00540     {
00541       int rtn_code = skip_bad_pdus();
00542       if (rtn_code <= 0) return rtn_code;
00543     }
00544 
00545     //
00546     // Keep processing samples while we have data to read.
00547     //
00548     while (this->pdu_remaining_ > 0) {
00549       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00550             "We have a transport packet now.  There are more sample "
00551             "bytes to parse in order to complete the packet.\n"));
00552 
00553       //
00554       // Manage the current sample header.
00555       //
00556       //  This involves checking to see if we have remaining data in the
00557       //  current sample.  If not, or if we have not read a complete
00558       //  header, then we read a sample header and check it for validity,
00559       //  which currently involves checking the message type only.  As we
00560       //  have a valid sample header with data remaining to read in the
00561       //  sample that it represents, we move on.
00562       //
00563       //  As new sample headers are read, the are read into a message
00564       //  buffer member variable and demarshaled directly.  The values are
00565       //  retained for the lifetime of the sample and are passed as part
00566       //  of the recieve data sample itself.  The member message buffer
00567       //  allows us to retain partially read sample headers until we can
00568       //  read more data.
00569       //
00570       if (this->receive_sample_remaining_ == 0) {
00571         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00572               "We are not working on some remaining sample data.  "
00573               "We are expecting to parse a sample header now.\n"));
00574 
00575         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00576               "this->buffer_index_ == %d.\n",this->buffer_index_));
00577 
00578         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00579               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00580               "== %u.\n",
00581               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00582 
00583         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00584               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00585               "== %u.\n",
00586               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00587 
00588         if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00589           //
00590           // Not enough room in the buffer for the entire Sample
00591           // header that we need to read, so relinquish control until
00592           // we get more data.
00593           //
00594           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00595                 "Not enough bytes have been received to account "
00596                 "for a complete sample header.  We are done for "
00597                 "now - we need to receive more data before we "
00598                 "can go on.\n"));
00599           if (Transport_debug_level > 2) {
00600             ACE_TCHAR ebuffer[350];
00601             ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00602             const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00603             ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00604             ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   "
00605               "Partial DataSampleHeader:\n%s\n", ebuffer));
00606           }
00607 
00608           return 0;
00609 
00610         } else {
00611           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00612                 "We have received enough bytes for the sample "
00613                 "header.  Demarshall the sample header now.\n"));
00614 
00615           // only do the hexdump if it will be printed - to not impact performance.
00616           if (Transport_debug_level > 5) {
00617             ACE_TCHAR ebuffer[4096];
00618             ACE::format_hexdump
00619             (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00620              this->data_sample_header_.max_marshaled_size(),
00621              ebuffer, sizeof(ebuffer));
00622 
00623             VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00624                   "Hex Dump:\n%s\n", ebuffer));
00625           }
00626 
00627           this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00628 
00629           //
00630           // Demarshal the sample header.
00631           //
00632           this->data_sample_header_ =
00633             *this->receive_buffers_[this->buffer_index_];
00634 
00635           //
00636           // Check the DataSampleHeader.
00637           //
00638 
00639           this->good_pdu_ = check_header(data_sample_header_);
00640 
00641           //
00642           // Set the amount to parse into the message buffer.  We
00643           // can't just use the header value to keep track of
00644           // where we are since downstream processing will expect
00645           // the value to be correct (unadjusted by us).
00646           //
00647           this->receive_sample_remaining_ =
00648             this->data_sample_header_.message_length();
00649 
00650           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00651                 "The demarshalled sample header says that we "
00652                 "have %d bytes to read for the data portion of "
00653                 "the sample.\n",
00654                 this->receive_sample_remaining_));
00655 
00656           //
00657           // Decrement packet size.
00658           //
00659           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00660                 "this->data_sample_header_.marshaled_size() "
00661                 "== %d.\n",
00662                 this->data_sample_header_.marshaled_size()));
00663 
00664           this->pdu_remaining_
00665           -= this->data_sample_header_.marshaled_size();
00666 
00667           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00668                 "Amount of transport packet remaining: %d.\n",
00669                 this->pdu_remaining_));
00670 
00671           int rtn_code = skip_bad_pdus();
00672           if (rtn_code <= 0) return rtn_code;
00673         }
00674       }
00675 
00676       bool last_buffer = false;
00677       update_buffer_index(last_buffer);
00678 
00679       if (this->receive_sample_remaining_ > 0) {
00680         //
00681         // Manage the current sample data.
00682         //
00683         //   This involves reading data to complete the current sample.  As
00684         //   samples are completed, they are dispatched via the
00685         //   data_received() mechanism.  This data is read into message
00686         //   blocks that are obtained from the pool of message blocks since
00687         //   the lifetime of this data will last until the DataReader
00688         //   components demarshal the sample data.  A reference to the
00689         //   current sample being built is retained as a member to allow us
00690         //   to hold partialy read samples until they are completed.
00691         //
00692         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00693               "Determine amount of data for the next block in the chain\n"));
00694 
00695         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00696               "this->buffer_index_ == %d.\n",this->buffer_index_));
00697 
00698         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00699               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00700               "== %u.\n",
00701               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00702 
00703         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00704               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00705               "== %u.\n",
00706               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00707         //
00708         // Determine the amount of data for the next block in the chain.
00709         //
00710         const size_t amount = ace_min<size_t>(
00711           this->receive_sample_remaining_,
00712           this->receive_buffers_[this->buffer_index_]->length());
00713 
00714         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00715               "amount of data for the next block in the chain is %d\n",
00716               amount));
00717         //
00718         // Now create a message block for the data in the current buffer
00719         // and chain it if we are starting a new sample.
00720         //
00721         ACE_Message_Block* current_sample_block = 0;
00722         ACE_NEW_MALLOC_RETURN(
00723           current_sample_block,
00724           (ACE_Message_Block*) this->mb_allocator_.malloc(
00725             sizeof(ACE_Message_Block)),
00726           ACE_Message_Block(
00727             this->receive_buffers_[this->buffer_index_]
00728             ->data_block()->duplicate(),
00729             0,
00730             &this->mb_allocator_),
00731           -1);
00732 
00733         //
00734         // Chain it to the end of the current sample.
00735         //
00736         if (this->payload_ == 0) {
00737           this->payload_ = current_sample_block;
00738 
00739         } else {
00740           ACE_Message_Block* block = this->payload_;
00741 
00742           while (block->cont() != 0) {
00743             block = block->cont();
00744           }
00745 
00746           block->cont(current_sample_block);
00747         }
00748 
00749         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00750               "Before adjustment of the pointers and byte counters\n"));
00751 
00752         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00753               "this->payload_->rd_ptr() "
00754               "== %u.\n",
00755               this->payload_->rd_ptr()));
00756 
00757         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00758               "this->payload_->wr_ptr() "
00759               "== %u.\n",
00760               this->payload_->wr_ptr()));
00761 
00762         //
00763         // Adjust the pointers and byte counters.
00764         //
00765         current_sample_block->rd_ptr(
00766           this->receive_buffers_[this->buffer_index_]->rd_ptr());
00767         current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00768         this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00769         this->receive_sample_remaining_ -= amount;
00770         this->pdu_remaining_            -= amount;
00771 
00772         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00773               "After adjustment of the pointers and byte counters\n"));
00774 
00775         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00776               "this->payload_->rd_ptr() "
00777               "== %u.\n",
00778               this->payload_->rd_ptr()));
00779 
00780         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00781               "this->payload_->wr_ptr() "
00782               "== %u.\n",
00783               this->payload_->wr_ptr()));
00784 
00785         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00786               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00787               "== %u.\n",
00788               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00789 
00790         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00791               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00792               "== %u.\n",
00793               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00794 
00795         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00796               "After adjustment, remaining sample bytes == %d\n",
00797               this->receive_sample_remaining_));
00798         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00799               "After adjustment, remaining transport packet bytes == %d\n",
00800               this->pdu_remaining_));
00801       }
00802 
00803       //
00804       // Dispatch the received message if we have received it all.
00805       //
00806       // NB: Since we are doing this synchronously and without passing
00807       //     ownership of the sample, we can use NULL mutex lock for
00808       //     the allocators.  Since only one thread can be in this
00809       //     routine at a time, that is.
00810       //
00811       if (this->receive_sample_remaining_ == 0) {
00812         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00813               "Now dispatch the sample to the DataLink\n"));
00814 
00815         ReceivedDataSample rds(this->payload_);
00816         this->payload_ = 0;  // rds takes ownership of payload_
00817         if (this->data_sample_header_.into_received_data_sample(rds)) {
00818 
00819           if (this->data_sample_header_.more_fragments()
00820               || this->receive_transport_header_.last_fragment()) {
00821             VDBG((LM_DEBUG,"(%P|%t) DBG:   Attempt reassembly of fragments\n"));
00822 
00823             if (this->reassemble(rds)) {
00824               VDBG((LM_DEBUG,"(%P|%t) DBG:   Reassembled complete message\n"));
00825               this->deliver_sample(rds, remote_address);
00826             }
00827             // If reassemble() returned false, it takes ownership of the data
00828             // just like deliver_sample() does.
00829 
00830           } else {
00831             this->deliver_sample(rds, remote_address);
00832           }
00833         }
00834 
00835         // For the reassembly algorithm, the 'last_fragment_' header bit only
00836         // applies to the first DataSampleHeader in the TransportHeader
00837         this->receive_transport_header_.last_fragment(false);
00838 
00839         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00840               "Release the sample that we just sent.\n"));
00841         // ~ReceivedDataSample() releases the payload_ message block
00842       }
00843 
00844       update_buffer_index(last_buffer);
00845 
00846       if (last_buffer) {
00847         // Relinquish control if there is no more data to process.
00848         VDBG((LM_DEBUG,"(%P|%t) DBG:   We are done - no more data.\n"));
00849         return 0;
00850       }
00851 
00852     } // End of while (this->pdu_remaining_ > 0)
00853 
00854     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00855           "Let's try to do some more.\n"));
00856   } // End of while (true)
00857 
00858   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00859         "It looks like we are done - the done loop has finished.\n"));
00860   //
00861   // Relinquish control.
00862   //
00863   //   This involves ensuring that when we reenter this method, we will
00864   //   pick up from where we left off correctly.
00865   //
00866   return 0;
00867 }
00868 
00869 template<typename TH, typename DSH>
00870 bool
00871 TransportReceiveStrategy<TH, DSH>::reassemble(ReceivedDataSample&)
00872 {
00873   ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00874     "WARNING: derived class must override if specific transport type uses "
00875     "fragmentation and reassembly\n"));
00876   return false;
00877 }
00878 
00879 template<typename TH, typename DSH>
00880 void
00881 TransportReceiveStrategy<TH, DSH>::reset()
00882 {
00883   this->receive_sample_remaining_ = 0;
00884   ACE_Message_Block::release(this->payload_);
00885   this->payload_ = 0;
00886   this->good_pdu_ = true;
00887   this->pdu_remaining_ = 0;
00888   for (int i = 0; i < RECEIVE_BUFFERS; ++i) {
00889     ACE_Message_Block& rb = *this->receive_buffers_[i];
00890     rb.rd_ptr(rb.wr_ptr());
00891   }
00892 }
00893 
00894 template<typename TH, typename DSH>
00895 void
00896 TransportReceiveStrategy<TH, DSH>::update_buffer_index(bool& done)
00897 {
00898   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00899         "Adjust the buffer chain in case we crossed into the next "
00900         "buffer after the last read(s).\n"));
00901   const size_t initial = this->buffer_index_;
00902   while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00903     this->buffer_index_ = this->successor_index(this->buffer_index_);
00904 
00905     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00906           "Set this->buffer_index_ = %d.\n",
00907           this->buffer_index_));
00908 
00909     if (initial == this->buffer_index_) {
00910       done = true; // no other buffers in receive_buffers_ have data
00911       return;
00912     }
00913   }
00914 }
00915 
00916 template<typename TH, typename DSH>
00917 int
00918 TransportReceiveStrategy<TH, DSH>::skip_bad_pdus()
00919 {
00920   if (this->good_pdu_) return 1;
00921 
00922   //
00923   // Adjust the message block chain pointers to account for the
00924   // skipped data.
00925   //
00926   for (size_t index = this->buffer_index_;
00927        this->pdu_remaining_ > 0;
00928        index = this->successor_index(index)) {
00929     size_t amount =
00930       ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00931 
00932     this->receive_buffers_[index]->rd_ptr(amount);
00933     this->pdu_remaining_ -= amount;
00934 
00935     if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00936       ACE_ERROR_RETURN((LM_ERROR,
00937                         ACE_TEXT("(%P|%t) ERROR: ")
00938                         ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00939                         ACE_TEXT(" - Unrecoverably corrupted ")
00940                         ACE_TEXT("receive buffer management detected: ")
00941                         ACE_TEXT("read more bytes than available.\n")),
00942                        -1);
00943     }
00944   }
00945 
00946   this->receive_sample_remaining_ = 0;
00947 
00948   this->receive_sample_remaining_ = 0;
00949 
00950   bool done = false;
00951   update_buffer_index(done);
00952   return done ? 0 : 1;
00953 }
00954 
00955 }
00956 }
00957 
00958 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1