OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH > Class Template Reference

#include <TransportReceiveStrategy_T.h>

Inheritance diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:
Collaboration graph
[legend]

List of all members.

Public Member Functions

virtual ~TransportReceiveStrategy ()
int start ()
void stop ()
int handle_dds_input (ACE_HANDLE fd)
virtual void relink (bool do_suspend=true)
const TH & received_header () const
TH & received_header ()
const DSH & received_sample_header () const
DSH & received_sample_header ()

Protected Member Functions

 TransportReceiveStrategy ()
virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)=0
 Only our subclass knows how to do this.
virtual bool check_header (const TH &header)
 Check the transport header for suitability.
virtual bool check_header (const DSH &header)
 Check the data sample header for suitability.
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0
 Called when there is a ReceivedDataSample to be delivered.
virtual int start_i ()=0
 Let the subclass start.
virtual void stop_i ()=0
 Let the subclass stop.
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them.
void reset ()
size_t pdu_remaining () const

Protected Attributes

bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received.

Private Types

enum  { RECEIVE_BUFFERS = 16 }
enum  { BUFFER_LOW_WATER = 4096 }
enum  { MESSAGE_BLOCKS = 1000 }
enum  { DATA_BLOCKS = 100 }

Private Member Functions

size_t successor_index (size_t index) const
 Manage an index into the receive buffer array.
void update_buffer_index (bool &done)
virtual bool reassemble (ReceivedDataSample &data)

Private Attributes

size_t receive_sample_remaining_
 Bytes remaining in the current DataSample.
TH receive_transport_header_
 Current receive TransportHeader.
TransportMessageBlockAllocator mb_allocator_
TransportDataBlockAllocator db_allocator_
TransportDataAllocator data_allocator_
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
 Locking strategy for the allocators.
ACE_Message_Blockreceive_buffers_ [RECEIVE_BUFFERS]
 Set of receive buffers in use.
size_t buffer_index_
 Current receive buffer index in use.
DSH data_sample_header_
 Current data sample header.
ACE_Message_Blockpayload_
bool good_pdu_
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet.

Detailed Description

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
class OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >

This class provides buffer for data received by transports, de-assemble the data to individual samples and deliver them.

Definition at line 31 of file TransportReceiveStrategy_T.h.


Member Enumeration Documentation

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]
Enumerator:
RECEIVE_BUFFERS 

Definition at line 114 of file TransportReceiveStrategy_T.h.

00114 { RECEIVE_BUFFERS  =   16 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]
Enumerator:
BUFFER_LOW_WATER 

Definition at line 115 of file TransportReceiveStrategy_T.h.

00115 { BUFFER_LOW_WATER = 4096 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]
Enumerator:
MESSAGE_BLOCKS 

Definition at line 121 of file TransportReceiveStrategy_T.h.

00121 { MESSAGE_BLOCKS   = 1000 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]
Enumerator:
DATA_BLOCKS 

Definition at line 122 of file TransportReceiveStrategy_T.h.

00122 { DATA_BLOCKS      =  100 };


Constructor & Destructor Documentation

template<typename TH , typename DSH >
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy (  )  [inline, virtual]

Definition at line 51 of file TransportReceiveStrategy_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, DBG_ENTRY_LVL, LM_WARNING, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, size, and ACE_Message_Block::total_length().

00052 {
00053   DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00054 
00055   if (this->receive_buffers_[this->buffer_index_] != 0) {
00056     size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00057 
00058     if (size > 0) {
00059       ACE_DEBUG((LM_WARNING,
00060                  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00061                  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00062                  size));
00063     }
00064   }
00065 }

Here is the call graph for this function:

template<typename TH , typename DSH >
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy (  )  [inline, protected]

Definition at line 22 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::DATA_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, ACE_OS::memset(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::MESSAGE_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, and OpenDDS::DCPS::Transport_debug_level.

00023   : gracefully_disconnected_(false),
00024     receive_sample_remaining_(0),
00025     mb_allocator_(MESSAGE_BLOCKS),
00026     db_allocator_(DATA_BLOCKS),
00027     data_allocator_(DATA_BLOCKS),
00028     buffer_index_(0),
00029     payload_(0),
00030     good_pdu_(true),
00031     pdu_remaining_(0)
00032 {
00033   DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00034 
00035   if (Transport_debug_level >= 2) {
00036     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00037                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038                &mb_allocator_, MESSAGE_BLOCKS));
00039     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00040                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041                &db_allocator_, DATA_BLOCKS));
00042     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00043                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00044                &data_allocator_, DATA_BLOCKS));
00045   }
00046 
00047   ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00048 }

Here is the call graph for this function:


Member Function Documentation

template<typename TH, typename DSH>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const DSH &  header  )  [inline, protected, virtual]

Check the data sample header for suitability.

Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Definition at line 76 of file TransportReceiveStrategy_T.cpp.

00077 {
00078   return true;
00079 }

template<typename TH, typename DSH >
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const TH &  header  )  [inline, protected, virtual]

Check the transport header for suitability.

Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Definition at line 69 of file TransportReceiveStrategy_T.cpp.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00070 {
00071   return true;
00072 }

Here is the caller graph for this function:

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
) [protected, pure virtual]
template<typename TH , typename DSH >
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input ( ACE_HANDLE  fd  )  [inline]

Note that this is just an initial implementation. We may take some shortcuts (we will) that will need to be dealt with later once a more robust implementation can be put in place.

Our handle_dds_input() method is called by the reactor when there is data to be pulled from our peer() ACE_SOCK_Stream.

Definition at line 89 of file TransportReceiveStrategy_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::BUFFER_LOW_WATER, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header(), ACE_Message_Block::cont(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample(), ACE::format_hexdump(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_, iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes(), OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), ACE_Message_Block::space(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), VDBG, VDBG_LVL, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.

00090 {
00091   DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00092 
00093   //
00094   // What we will be doing here:
00095   //
00096   //   1) Read as much data as possible from the handle.
00097   //   2) Process each Transport layer packet
00098   //      A) Parse the Transport header
00099   //      B) Process each DataSample in the packet
00100   //         a) Parse the DataSampleHeader
00101   //         b) Parse the remainder of the sample
00102   //         c) call data_received for complete samples
00103   //         d) store any partial sample or header data
00104   //   3) return
00105   //
00106   // The state that we might need to maintain includes:
00107   //     I) Current Transport layer header
00108   //    II) Current DataSample header
00109   //   III) Current DataSample data
00110   //
00111   // For each of these elements, they can be:
00112   //     i) empty
00113   //    ii) partial (not able to parse yet).
00114   //   iii) complete (parsed values available).
00115   //
00116   // The read buffers will be a series of data buffers of a fixed size
00117   // that are each managed by an ACE_Data_Block, each of which is
00118   // managed by an ACE_Message_Block containing the read and write
00119   // pointers into the data.
00120   //
00121   // As messages are parsed, new ACE_Message_Blocks will be formed with
00122   // read and write pointers adjusted to reference the individual
00123   // DataSample buffer contents.
00124   //
00125   // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
00126   // will be acquired from a Cached_Allocater_with_overflow.  The size
00127   // of the individual data buffers will be set initially to the
00128   // ethernet MTU to allow for full TCP messages to be received into
00129   // individual blocks.
00130   //
00131   // Reading will be performed with a two member struct iov, to allow
00132   // for the remainder of the current buffer to be used along with at
00133   // least one completely empty buffer.  There will be a low water
00134   // parameter used to stop the use of the current block when the
00135   // available space in it is reduced.
00136   //
00137 
00138   //
00139   // Establish the current receive buffer.
00140   //
00141   //   This involves checking for any remainder from the previous trip
00142   //   through the code.  If there is no current buffer, or the buffer
00143   //   has less than the low water mark space() left, then promote the
00144   //   next receive buffer to the current.  If none is present, create a
00145   //   new one and use it.
00146   //
00147   //
00148   // Establish the next receive buffer.
00149   //
00150   //   This involves checking for any previous complete buffers that
00151   //   were not promoted in the previous step.  If none are present,
00152   //   create a new one and use that.
00153   //
00154 
00155   //
00156   // Remove any buffers that have been completely read and have less
00157   // than the low water amount of space left.
00158   //
00159   size_t index;
00160 
00161   for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00162     if ((this->receive_buffers_[index] != 0)
00163         && (this->receive_buffers_[index]->length() == 0)
00164         && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00165       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00166             "Remove a receive_buffer_[%d] from use.\n",
00167             index));
00168 
00169       // unlink any Message_Block that continues to this one
00170       // being removed.
00171       // This avoids a possible infinite ->cont() loop.
00172       for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00173         if ((0 != this->receive_buffers_[ii]) &&
00174             (this->receive_buffers_[ii]->cont() ==
00175              this->receive_buffers_[index])) {
00176           this->receive_buffers_[ii]->cont(0);
00177         }
00178       }
00179 
00180       //
00181       // Remove the receive buffer from use.
00182       //
00183       ACE_DES_FREE(
00184         this->receive_buffers_[index],
00185         this->mb_allocator_.free,
00186         ACE_Message_Block);
00187       this->receive_buffers_[index] = 0;
00188     }
00189   }
00190 
00191   //
00192   // Allocate buffers for any empty slots.  We may have emptied one just
00193   // here, but others may have been emptied by a large read during the
00194   // last trip through the code as well.
00195   //
00196   size_t previous = this->buffer_index_;
00197 
00198   for (index = this->buffer_index_;
00199        this->successor_index(previous) != this->buffer_index_;
00200        index = this->successor_index(index)) {
00201     if (this->receive_buffers_[index] == 0) {
00202       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00203             "Allocate a Message_Block for new receive_buffer_[%d].\n",
00204             index));
00205 
00206       ACE_NEW_MALLOC_RETURN(
00207         this->receive_buffers_[index],
00208         (ACE_Message_Block*) this->mb_allocator_.malloc(
00209           sizeof(ACE_Message_Block)),
00210         ACE_Message_Block(
00211           RECEIVE_DATA_BUFFER_SIZE,     // Buffer size
00212           ACE_Message_Block::MB_DATA,   // Default
00213           0,                            // Start with no continuation
00214           0,                            // Let the constructor allocate
00215           &this->data_allocator_,       // Our buffer cache
00216           &this->receive_lock_,         // Our locking strategy
00217           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
00218           ACE_Time_Value::zero,         // Default
00219           ACE_Time_Value::max_time,     // Default
00220           &this->db_allocator_,         // Our data block cache
00221           &this->mb_allocator_          // Our message block cache
00222         ),
00223         -1);
00224     }
00225 
00226     //
00227     // Chain the buffers.  This allows us to have portions of parsed
00228     // data cross buffer boundaries without a problem.
00229     //
00230     if (previous != index) {
00231       this->receive_buffers_[previous]->cont(
00232         this->receive_buffers_[index]);
00233     }
00234 
00235     previous = index;
00236   }
00237 
00238   //
00239   // Read from handle.
00240   //
00241   //   This involves a non-blocking recvv_n().  Any remaining space in
00242   //   the buffers should be saved for the next pass through the code.
00243   //   If more than one receive buffer has data, chain them.  Promote
00244   //   all receive buffers so that the next pass through, the first
00245   //   buffer with space remaining will appear as the current receive
00246   //   buffer.
00247   //
00248   //   This is probably what should remain in the specific
00249   //   implementation, with the rest of this factored back up into the
00250   //   framework.
00251   //
00252 
00253   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00254         "Form the iovec from the message block\n"));
00255 
00256   //
00257   // Form the iovec from the message block chain of receive buffers.
00258   //
00259   iovec iov[RECEIVE_BUFFERS];
00260   size_t vec_index = 0;
00261   size_t current = this->buffer_index_;
00262 
00263   for (index = 0;
00264        index < RECEIVE_BUFFERS;
00265        ++index, current = this->successor_index(current)) {
00266     // Invariant.  ASSERT?
00267     if (this->receive_buffers_[current] == 0) {
00268       ACE_ERROR_RETURN((LM_ERROR,
00269                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00270                         ACE_TEXT("receive buffer management detected.\n")),
00271                        -1);
00272     }
00273 
00274 #ifdef _MSC_VER
00275 #pragma warning(push)
00276 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
00277 // since on other platforms iov_len is 64-bit
00278 #pragma warning(disable : 4267)
00279 #endif
00280     // This check covers the case where we have unread data in
00281     // the first buffer, but no space to write any more data.
00282     if (this->receive_buffers_[current]->space() > 0) {
00283       iov[vec_index].iov_len  = this->receive_buffers_[current]->space();
00284       iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00285 
00286       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00287             "index==%d, len==%d, base==%x\n",
00288             vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00289 
00290       vec_index++;
00291     }
00292   }
00293 
00294 #ifdef _MSC_VER
00295 #pragma warning(pop)
00296 #endif
00297 
00298   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00299         "Perform the recvv() call\n"));
00300 
00301   //
00302   // Read into the buffers.
00303   //
00304   ACE_INET_Addr remote_address;
00305   ssize_t bytes_remaining = this->receive_bytes(iov,
00306                                                 static_cast<int>(vec_index),
00307                                                 remote_address,
00308                                                 fd);
00309 
00310   if (bytes_remaining < 0) {
00311     ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00312               ACE_TEXT("with data link detected: %p.\n"),
00313               ACE_TEXT("receive_bytes")));
00314 
00315     // The relink() will handle the connection to the ReconnectTask to do
00316     // the reconnect so this reactor thread will not be block.
00317     this->relink();
00318 
00319     // Close connection anyway.
00320     return -1;
00321     // Returning -1 takes the handle out of the reactor read mask.
00322   }
00323 
00324   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00325             "recvv() return %d - we call this the bytes_remaining.\n",
00326             bytes_remaining), 5);
00327 
00328   if (bytes_remaining == 0) {
00329     if (this->gracefully_disconnected_) {
00330       VDBG_LVL((LM_INFO,
00331                 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00332                ,1);
00333       return -1;
00334 
00335     } else {
00336       VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ")
00337                 ACE_TEXT("with data link detected\n")), 1);
00338 
00339       // The relink() will handle the connection to the ReconnectTask to do
00340       // the reconnect so this reactor thread will not be block.
00341       this->relink();
00342 
00343       // Close connection anyway.
00344       return -1;
00345       // Returning -1 takes the handle out of the reactor read mask.
00346     }
00347   }
00348 
00349   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00350             "START Adjust the message block chain pointers to account for the "
00351             "new data.\n"), 5);
00352 
00353   //
00354   // Adjust the message block chain pointers to account for the new
00355   // data.
00356   //
00357   size_t bytes = bytes_remaining;
00358 
00359   if (!this->pdu_remaining_) {
00360     this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00361   }
00362 
00363   for (index = this->buffer_index_;
00364        bytes > 0;
00365        index = this->successor_index(index)) {
00366     VDBG((LM_DEBUG,"(%P|%t) DBG:    -> "
00367           "At top of for..loop block.\n"));
00368     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00369           "index == %d.\n", index));
00370     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00371           "bytes == %d.\n", bytes));
00372 
00373     size_t amount
00374     = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00375 
00376     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00377           "amount == %d.\n", amount));
00378 
00379     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00380           "this->receive_buffers_[index]->rd_ptr() ==  %u.\n",
00381           this->receive_buffers_[index]->rd_ptr()));
00382     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00383           "this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00384           this->receive_buffers_[index]->wr_ptr()));
00385 
00386     this->receive_buffers_[index]->wr_ptr(amount);
00387 
00388     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00389           "Now, this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00390           this->receive_buffers_[index]->wr_ptr()));
00391 
00392     bytes -= amount;
00393 
00394     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00395           "Now, bytes == %d .\n", bytes));
00396 
00397     // This is yukky to do here, but there is a fine line between
00398     // where things are moved and where they are checked.
00399     if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00400       // Here we have read more data than we passed in buffer.  Bad.
00401       ACE_ERROR_RETURN((LM_ERROR,
00402                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00403                         ACE_TEXT("receive buffer management detected: ")
00404                         ACE_TEXT("read more bytes than available.\n")),
00405                        -1);
00406     }
00407   }
00408 
00409   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00410         "DONE Adjust the message block chain pointers to account "
00411         "for the new data.\n"));
00412 
00413   //
00414   // While the receive buffer is not empty:
00415   //
00416   //   This is the receive buffer(s) that was just read.  The receive
00417   //   buffer message block is used to manage our parsing through this
00418   //   data.  When we have parsed out all the data, then it will be
00419   //   considered empty.  This message block read pointer indicates
00420   //   where we are in the parsing process.  Other message blocks refer
00421   //   to the parsed data and retain the data block references to
00422   //   prevent the data from being released until we have completed all
00423   //   uses for the data in the receive buffer.  This will normally be
00424   //   after the sample data is demarshaled in the DataReader
00425   //   components.
00426   //
00427   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00428         "Start processing the data we just received.\n"));
00429 
00430   //
00431   // Operate while we have more data to process.
00432   //
00433   while (true) {
00434 
00435     //
00436     // Manage the current transport header.
00437     //
00438     //   This involves checking to see if we have remaining data in the
00439     //   current packet.  If not, or if we have not read a complete
00440     //   header, then we read a transport header and check it for
00441     //   validity.  As we have a valid transport header with data
00442     //   remaining to read in the packet that it represents, we move on.
00443     //
00444     //   As new transport headers are to be read, they are read into a
00445     //   message buffer member variable and demarshaled directly.  The
00446     //   values are retained for the lifetime of the processing of the
00447     //   instant transport packet.  The member message buffer allows us
00448     //   to retain partially read transport headers until we can read
00449     //   more data.
00450     //
00451 
00452     //
00453     // Check the remaining transport packet length to see if we are
00454     // expecting a new one.
00455     //
00456     if (this->pdu_remaining_ == 0) {
00457       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00458             "We are expecting a transport packet header.\n"));
00459 
00460       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00461             "this->buffer_index_ == %d.\n",this->buffer_index_));
00462 
00463       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00464             "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00465             "== %u.\n",
00466             this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00467 
00468       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00469             "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00470             "== %u.\n",
00471             this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00472 
00473       if (this->receive_buffers_[this->buffer_index_]->total_length()
00474           < this->receive_transport_header_.max_marshaled_size()) {
00475         //
00476         // Not enough room in the buffer for the entire Transport
00477         // header that we need to read, so relinquish control until
00478         // we get more data.
00479         //
00480         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00481               "Not enough bytes read to account for a transport "
00482               "packet header.  We are done here - we need to "
00483               "receive more bytes.\n"));
00484 
00485         this->receive_transport_header_.incomplete(
00486           *this->receive_buffers_[this->buffer_index_]);
00487 
00488         return 0;
00489 
00490       } else {
00491         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00492               "We have enough bytes to demarshall the transport "
00493               "packet header.\n"));
00494 
00495         // only do the hexdump if it will be printed - to not impact performance.
00496         if (Transport_debug_level > 5) {
00497           ACE_TCHAR xbuffer[4096];
00498           const ACE_Message_Block& mb =
00499             *this->receive_buffers_[this->buffer_index_];
00500           size_t xbytes = mb.length();
00501 
00502           xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00503 
00504           ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00505 
00506           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00507                 "Hex Dump of transport header block "
00508                 "(%d bytes):\n%s\n", xbytes, xbuffer));
00509         }
00510 
00511         //
00512         // Demarshal the transport header.
00513         //
00514         this->receive_transport_header_ =
00515           *this->receive_buffers_[this->buffer_index_];
00516 
00517         //
00518         // Check the TransportHeader.
00519         //
00520         if (!this->receive_transport_header_.valid()) {
00521           ACE_ERROR_RETURN((LM_ERROR,
00522                             ACE_TEXT
00523                             ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00524                            -1);
00525         }
00526 
00527         this->good_pdu_ = check_header(this->receive_transport_header_);
00528         this->pdu_remaining_ = this->receive_transport_header_.length_;
00529 
00530         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00531               "Amount of transport packet bytes (remaining): %d.\n",
00532               this->pdu_remaining_));
00533       }
00534     }
00535 
00536     //
00537     // Ignore bad PDUs by skipping over them.  We do this out here
00538     // in case we did not skip over the entire bad PDU last time.
00539     //
00540     {
00541       int rtn_code = skip_bad_pdus();
00542       if (rtn_code <= 0) return rtn_code;
00543     }
00544 
00545     //
00546     // Keep processing samples while we have data to read.
00547     //
00548     while (this->pdu_remaining_ > 0) {
00549       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00550             "We have a transport packet now.  There are more sample "
00551             "bytes to parse in order to complete the packet.\n"));
00552 
00553       //
00554       // Manage the current sample header.
00555       //
00556       //  This involves checking to see if we have remaining data in the
00557       //  current sample.  If not, or if we have not read a complete
00558       //  header, then we read a sample header and check it for validity,
00559       //  which currently involves checking the message type only.  As we
00560       //  have a valid sample header with data remaining to read in the
00561       //  sample that it represents, we move on.
00562       //
00563       //  As new sample headers are read, the are read into a message
00564       //  buffer member variable and demarshaled directly.  The values are
00565       //  retained for the lifetime of the sample and are passed as part
00566       //  of the recieve data sample itself.  The member message buffer
00567       //  allows us to retain partially read sample headers until we can
00568       //  read more data.
00569       //
00570       if (this->receive_sample_remaining_ == 0) {
00571         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00572               "We are not working on some remaining sample data.  "
00573               "We are expecting to parse a sample header now.\n"));
00574 
00575         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00576               "this->buffer_index_ == %d.\n",this->buffer_index_));
00577 
00578         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00579               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00580               "== %u.\n",
00581               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00582 
00583         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00584               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00585               "== %u.\n",
00586               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00587 
00588         if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00589           //
00590           // Not enough room in the buffer for the entire Sample
00591           // header that we need to read, so relinquish control until
00592           // we get more data.
00593           //
00594           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00595                 "Not enough bytes have been received to account "
00596                 "for a complete sample header.  We are done for "
00597                 "now - we need to receive more data before we "
00598                 "can go on.\n"));
00599           if (Transport_debug_level > 2) {
00600             ACE_TCHAR ebuffer[350];
00601             ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00602             const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00603             ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00604             ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   "
00605               "Partial DataSampleHeader:\n%s\n", ebuffer));
00606           }
00607 
00608           return 0;
00609 
00610         } else {
00611           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00612                 "We have received enough bytes for the sample "
00613                 "header.  Demarshall the sample header now.\n"));
00614 
00615           // only do the hexdump if it will be printed - to not impact performance.
00616           if (Transport_debug_level > 5) {
00617             ACE_TCHAR ebuffer[4096];
00618             ACE::format_hexdump
00619             (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00620              this->data_sample_header_.max_marshaled_size(),
00621              ebuffer, sizeof(ebuffer));
00622 
00623             VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00624                   "Hex Dump:\n%s\n", ebuffer));
00625           }
00626 
00627           this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00628 
00629           //
00630           // Demarshal the sample header.
00631           //
00632           this->data_sample_header_ =
00633             *this->receive_buffers_[this->buffer_index_];
00634 
00635           //
00636           // Check the DataSampleHeader.
00637           //
00638 
00639           this->good_pdu_ = check_header(data_sample_header_);
00640 
00641           //
00642           // Set the amount to parse into the message buffer.  We
00643           // can't just use the header value to keep track of
00644           // where we are since downstream processing will expect
00645           // the value to be correct (unadjusted by us).
00646           //
00647           this->receive_sample_remaining_ =
00648             this->data_sample_header_.message_length();
00649 
00650           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00651                 "The demarshalled sample header says that we "
00652                 "have %d bytes to read for the data portion of "
00653                 "the sample.\n",
00654                 this->receive_sample_remaining_));
00655 
00656           //
00657           // Decrement packet size.
00658           //
00659           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00660                 "this->data_sample_header_.marshaled_size() "
00661                 "== %d.\n",
00662                 this->data_sample_header_.marshaled_size()));
00663 
00664           this->pdu_remaining_
00665           -= this->data_sample_header_.marshaled_size();
00666 
00667           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00668                 "Amount of transport packet remaining: %d.\n",
00669                 this->pdu_remaining_));
00670 
00671           int rtn_code = skip_bad_pdus();
00672           if (rtn_code <= 0) return rtn_code;
00673         }
00674       }
00675 
00676       bool last_buffer = false;
00677       update_buffer_index(last_buffer);
00678 
00679       if (this->receive_sample_remaining_ > 0) {
00680         //
00681         // Manage the current sample data.
00682         //
00683         //   This involves reading data to complete the current sample.  As
00684         //   samples are completed, they are dispatched via the
00685         //   data_received() mechanism.  This data is read into message
00686         //   blocks that are obtained from the pool of message blocks since
00687         //   the lifetime of this data will last until the DataReader
00688         //   components demarshal the sample data.  A reference to the
00689         //   current sample being built is retained as a member to allow us
00690         //   to hold partialy read samples until they are completed.
00691         //
00692         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00693               "Determine amount of data for the next block in the chain\n"));
00694 
00695         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00696               "this->buffer_index_ == %d.\n",this->buffer_index_));
00697 
00698         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00699               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00700               "== %u.\n",
00701               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00702 
00703         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00704               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00705               "== %u.\n",
00706               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00707         //
00708         // Determine the amount of data for the next block in the chain.
00709         //
00710         const size_t amount = ace_min<size_t>(
00711           this->receive_sample_remaining_,
00712           this->receive_buffers_[this->buffer_index_]->length());
00713 
00714         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00715               "amount of data for the next block in the chain is %d\n",
00716               amount));
00717         //
00718         // Now create a message block for the data in the current buffer
00719         // and chain it if we are starting a new sample.
00720         //
00721         ACE_Message_Block* current_sample_block = 0;
00722         ACE_NEW_MALLOC_RETURN(
00723           current_sample_block,
00724           (ACE_Message_Block*) this->mb_allocator_.malloc(
00725             sizeof(ACE_Message_Block)),
00726           ACE_Message_Block(
00727             this->receive_buffers_[this->buffer_index_]
00728             ->data_block()->duplicate(),
00729             0,
00730             &this->mb_allocator_),
00731           -1);
00732 
00733         //
00734         // Chain it to the end of the current sample.
00735         //
00736         if (this->payload_ == 0) {
00737           this->payload_ = current_sample_block;
00738 
00739         } else {
00740           ACE_Message_Block* block = this->payload_;
00741 
00742           while (block->cont() != 0) {
00743             block = block->cont();
00744           }
00745 
00746           block->cont(current_sample_block);
00747         }
00748 
00749         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00750               "Before adjustment of the pointers and byte counters\n"));
00751 
00752         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00753               "this->payload_->rd_ptr() "
00754               "== %u.\n",
00755               this->payload_->rd_ptr()));
00756 
00757         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00758               "this->payload_->wr_ptr() "
00759               "== %u.\n",
00760               this->payload_->wr_ptr()));
00761 
00762         //
00763         // Adjust the pointers and byte counters.
00764         //
00765         current_sample_block->rd_ptr(
00766           this->receive_buffers_[this->buffer_index_]->rd_ptr());
00767         current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00768         this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00769         this->receive_sample_remaining_ -= amount;
00770         this->pdu_remaining_            -= amount;
00771 
00772         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00773               "After adjustment of the pointers and byte counters\n"));
00774 
00775         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00776               "this->payload_->rd_ptr() "
00777               "== %u.\n",
00778               this->payload_->rd_ptr()));
00779 
00780         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00781               "this->payload_->wr_ptr() "
00782               "== %u.\n",
00783               this->payload_->wr_ptr()));
00784 
00785         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00786               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00787               "== %u.\n",
00788               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00789 
00790         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00791               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00792               "== %u.\n",
00793               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00794 
00795         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00796               "After adjustment, remaining sample bytes == %d\n",
00797               this->receive_sample_remaining_));
00798         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00799               "After adjustment, remaining transport packet bytes == %d\n",
00800               this->pdu_remaining_));
00801       }
00802 
00803       //
00804       // Dispatch the received message if we have received it all.
00805       //
00806       // NB: Since we are doing this synchronously and without passing
00807       //     ownership of the sample, we can use NULL mutex lock for
00808       //     the allocators.  Since only one thread can be in this
00809       //     routine at a time, that is.
00810       //
00811       if (this->receive_sample_remaining_ == 0) {
00812         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00813               "Now dispatch the sample to the DataLink\n"));
00814 
00815         ReceivedDataSample rds(this->payload_);
00816         this->payload_ = 0;  // rds takes ownership of payload_
00817         if (this->data_sample_header_.into_received_data_sample(rds)) {
00818 
00819           if (this->data_sample_header_.more_fragments()
00820               || this->receive_transport_header_.last_fragment()) {
00821             VDBG((LM_DEBUG,"(%P|%t) DBG:   Attempt reassembly of fragments\n"));
00822 
00823             if (this->reassemble(rds)) {
00824               VDBG((LM_DEBUG,"(%P|%t) DBG:   Reassembled complete message\n"));
00825               this->deliver_sample(rds, remote_address);
00826             }
00827             // If reassemble() returned false, it takes ownership of the data
00828             // just like deliver_sample() does.
00829 
00830           } else {
00831             this->deliver_sample(rds, remote_address);
00832           }
00833         }
00834 
00835         // For the reassembly algorithm, the 'last_fragment_' header bit only
00836         // applies to the first DataSampleHeader in the TransportHeader
00837         this->receive_transport_header_.last_fragment(false);
00838 
00839         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00840               "Release the sample that we just sent.\n"));
00841         // ~ReceivedDataSample() releases the payload_ message block
00842       }
00843 
00844       update_buffer_index(last_buffer);
00845 
00846       if (last_buffer) {
00847         // Relinquish control if there is no more data to process.
00848         VDBG((LM_DEBUG,"(%P|%t) DBG:   We are done - no more data.\n"));
00849         return 0;
00850       }
00851 
00852     } // End of while (this->pdu_remaining_ > 0)
00853 
00854     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00855           "Let's try to do some more.\n"));
00856   } // End of while (true)
00857 
00858   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00859         "It looks like we are done - the done loop has finished.\n"));
00860   //
00861   // Relinquish control.
00862   //
00863   //   This involves ensuring that when we reenter this method, we will
00864   //   pick up from where we left off correctly.
00865   //
00866   return 0;
00867 }

Here is the call graph for this function:

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining (  )  const [inline, protected]

Definition at line 89 of file TransportReceiveStrategy_T.h.

00089 { return this->pdu_remaining_; }

template<typename TH , typename DSH >
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble ( ReceivedDataSample data  )  [inline, private, virtual]

Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Definition at line 871 of file TransportReceiveStrategy_T.cpp.

References LM_WARNING.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00872 {
00873   ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00874     "WARNING: derived class must override if specific transport type uses "
00875     "fragmentation and reassembly\n"));
00876   return false;
00877 }

Here is the caller graph for this function:

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual ssize_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr remote_address,
ACE_HANDLE  fd 
) [protected, pure virtual]
template<typename TH , typename DSH >
ACE_INLINE TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header (  )  [inline]
template<typename TH , typename DSH >
ACE_INLINE const TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header (  )  const [inline]
template<typename TH , typename DSH >
ACE_INLINE DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header (  )  [inline]
template<typename TH , typename DSH >
ACE_INLINE const DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header (  )  const [inline]

Provides access to the received sample header for subclasses.

Definition at line 43 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.

00044 {
00045   return this->data_sample_header_;
00046 }

template<typename TH , typename DSH >
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink ( bool  do_suspend = true  )  [inline, virtual]

The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.

Reimplemented in OpenDDS::DCPS::TcpReceiveStrategy.

Definition at line 64 of file TransportReceiveStrategy_T.inl.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00065 {
00066   // The subclass needs implement this function for re-establishing
00067   // the link upon recv failure.
00068 }

Here is the caller graph for this function:

template<typename TH , typename DSH >
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset ( void   )  [inline, protected]
template<typename TH , typename DSH >
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus (  )  [inline, protected]

Ignore bad PDUs by skipping over them.

Definition at line 918 of file TransportReceiveStrategy_T.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, ACE_Message_Block::length(), LM_ERROR, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index().

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00919 {
00920   if (this->good_pdu_) return 1;
00921 
00922   //
00923   // Adjust the message block chain pointers to account for the
00924   // skipped data.
00925   //
00926   for (size_t index = this->buffer_index_;
00927        this->pdu_remaining_ > 0;
00928        index = this->successor_index(index)) {
00929     size_t amount =
00930       ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00931 
00932     this->receive_buffers_[index]->rd_ptr(amount);
00933     this->pdu_remaining_ -= amount;
00934 
00935     if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00936       ACE_ERROR_RETURN((LM_ERROR,
00937                         ACE_TEXT("(%P|%t) ERROR: ")
00938                         ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00939                         ACE_TEXT(" - Unrecoverably corrupted ")
00940                         ACE_TEXT("receive buffer management detected: ")
00941                         ACE_TEXT("read more bytes than available.\n")),
00942                        -1);
00943     }
00944   }
00945 
00946   this->receive_sample_remaining_ = 0;
00947 
00948   this->receive_sample_remaining_ = 0;
00949 
00950   bool done = false;
00951   update_buffer_index(done);
00952   return done ? 0 : 1;
00953 }

Here is the call graph for this function:

Here is the caller graph for this function:

template<typename TH , typename DSH >
ACE_INLINE int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start ( void   )  [inline, virtual]

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 13 of file TransportReceiveStrategy_T.inl.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i().

00014 {
00015   DBG_ENTRY_LVL("TransportReceiveStrategy","start",6);
00016   return this->start_i();
00017 }

Here is the call graph for this function:

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i (  )  [protected, pure virtual]
template<typename TH , typename DSH >
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop ( void   )  [inline, virtual]

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 21 of file TransportReceiveStrategy_T.inl.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i().

00022 {
00023   DBG_ENTRY_LVL("TransportReceiveStrategy","stop",6);
00024   this->stop_i();
00025 }

Here is the call graph for this function:

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i (  )  [protected, pure virtual]
template<typename TH , typename DSH >
ACE_INLINE size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index ( size_t  index  )  const [inline, private]
template<typename TH , typename DSH >
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index ( bool &  done  )  [inline, private]

Definition at line 896 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, LM_DEBUG, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), and VDBG.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().

00897 {
00898   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00899         "Adjust the buffer chain in case we crossed into the next "
00900         "buffer after the last read(s).\n"));
00901   const size_t initial = this->buffer_index_;
00902   while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00903     this->buffer_index_ = this->successor_index(this->buffer_index_);
00904 
00905     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00906           "Set this->buffer_index_ = %d.\n",
00907           this->buffer_index_));
00908 
00909     if (initial == this->buffer_index_) {
00910       done = true; // no other buffers in receive_buffers_ have data
00911       return;
00912     }
00913   }
00914 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
DSH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_ [private]

Flag indicating that the currently resident PDU is a good one (i.e. has not been received and processed previously). This is included in case we receive PDUs that were resent for reliability reasons and we receive one even if we have already processed it. This is a use case from multicast transports.

Definition at line 150 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_ [protected]

Flag indicates if the GRACEFUL_DISCONNECT message is received.

Definition at line 92 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportMessageBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_[RECEIVE_BUFFERS] [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_ [private]

Locking strategy for the allocators.

Definition at line 131 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_ [private]
template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_ [private]

The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1