OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH > Class Template Reference

#include <TransportReceiveStrategy_T.h>

Inheritance diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:

Collaboration graph
[legend]
List of all members.

Public Member Functions

virtual ~TransportReceiveStrategy ()
int start ()
void stop ()
int handle_dds_input (ACE_HANDLE fd)
virtual void relink (bool do_suspend=true)
const TH & received_header () const
TH & received_header ()
const DSH & received_sample_header () const
DSH & received_sample_header ()

Protected Member Functions

 TransportReceiveStrategy ()
virtual ssize_t receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)=0
 Only our subclass knows how to do this.
virtual bool check_header (const TH &header)
 Check the transport header for suitability.
virtual bool check_header (const DSH &header)
 Check the data sample header for suitability.
virtual void deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0
 Called when there is a ReceivedDataSample to be delivered.
virtual int start_i ()=0
 Let the subclass start.
virtual void stop_i ()=0
 Let the subclass stop.
int skip_bad_pdus ()
 Ignore bad PDUs by skipping over them.
void reset ()
size_t pdu_remaining () const

Protected Attributes

bool gracefully_disconnected_
 Flag indicates if the GRACEFUL_DISCONNECT message is received.

Private Types

 RECEIVE_BUFFERS = 16
 BUFFER_LOW_WATER = 4096
 MESSAGE_BLOCKS = 1000
 DATA_BLOCKS = 100
enum  { RECEIVE_BUFFERS = 16 }
enum  { BUFFER_LOW_WATER = 4096 }
enum  { MESSAGE_BLOCKS = 1000 }
enum  { DATA_BLOCKS = 100 }

Private Member Functions

size_t successor_index (size_t index) const
 Manage an index into the receive buffer array.
void update_buffer_index (bool &done)
virtual bool reassemble (ReceivedDataSample &data)

Private Attributes

size_t receive_sample_remaining_
 Bytes remaining in the current DataSample.
TH receive_transport_header_
 Current receive TransportHeader.
TransportMessageBlockAllocator mb_allocator_
TransportDataBlockAllocator db_allocator_
TransportDataAllocator data_allocator_
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
 Locking strategy for the allocators.
ACE_Message_Block * receive_buffers_ [RECEIVE_BUFFERS]
 Set of receive buffers in use.
size_t buffer_index_
 Current receive buffer index in use.
DSH data_sample_header_
 Current data sample header.
ACE_Message_Block * payload_
bool good_pdu_
size_t pdu_remaining_
 Amount of the current PDU that has not been processed yet.

Detailed Description

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
class OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >

This class provides buffer for data received by transports, de-assemble the data to individual samples and deliver them.

Definition at line 29 of file TransportReceiveStrategy_T.h.


Member Enumeration Documentation

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]

Enumerator:
RECEIVE_BUFFERS 

Definition at line 112 of file TransportReceiveStrategy_T.h.

00112 { RECEIVE_BUFFERS  =   16 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]

Enumerator:
BUFFER_LOW_WATER 

Definition at line 113 of file TransportReceiveStrategy_T.h.

00113 { BUFFER_LOW_WATER = 4096 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]

Enumerator:
MESSAGE_BLOCKS 

Definition at line 119 of file TransportReceiveStrategy_T.h.

00119 { MESSAGE_BLOCKS   = 1000 };

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
anonymous enum [private]

Enumerator:
DATA_BLOCKS 

Definition at line 120 of file TransportReceiveStrategy_T.h.

00120 { DATA_BLOCKS      =  100 };


Constructor & Destructor Documentation

template<typename TH, typename DSH>
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy (  )  [virtual]

Definition at line 48 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_.

00049 {
00050   DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6);
00051 
00052   if (this->receive_buffers_[this->buffer_index_] != 0) {
00053     size_t size = this->receive_buffers_[this->buffer_index_]->total_length();
00054 
00055     if (size > 0) {
00056       ACE_DEBUG((LM_WARNING,
00057                  ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ")
00058                  ACE_TEXT("terminating with %d unprocessed bytes.\n"),
00059                  size));
00060     }
00061   }
00062 }

template<typename TH, typename DSH>
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy (  )  [protected]

Definition at line 19 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::DATA_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::MESSAGE_BLOCKS, and OpenDDS::DCPS::Transport_debug_level.

00020   : gracefully_disconnected_(false),
00021     receive_sample_remaining_(0),
00022     mb_allocator_(MESSAGE_BLOCKS),
00023     db_allocator_(DATA_BLOCKS),
00024     data_allocator_(DATA_BLOCKS),
00025     buffer_index_(0),
00026     payload_(0),
00027     good_pdu_(true),
00028     pdu_remaining_(0)
00029 {
00030   DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6);
00031 
00032   if (Transport_debug_level >= 2) {
00033     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb"
00034                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00035                &mb_allocator_, MESSAGE_BLOCKS));
00036     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db"
00037                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00038                &db_allocator_, DATA_BLOCKS));
00039     ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data"
00040                " Cached_Allocator_With_Overflow %x with %d chunks\n",
00041                &data_allocator_, DATA_BLOCKS));
00042   }
00043 
00044   ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_));
00045 }


Member Function Documentation

template<typename TH, typename DSH>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const DSH &  header  )  [protected, virtual]

Check the data sample header for suitability.

Definition at line 73 of file TransportReceiveStrategy_T.cpp.

00074 {
00075   return true;
00076 }

template<typename TH, typename DSH>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header ( const TH &  header  )  [protected, virtual]

Check the transport header for suitability.

Definition at line 66 of file TransportReceiveStrategy_T.cpp.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00067 {
00068   return true;
00069 }

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample ( ReceivedDataSample sample,
const ACE_INET_Addr &  remote_address 
) [protected, pure virtual]

Called when there is a ReceivedDataSample to be delivered.

Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

template<typename TH, typename DSH>
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input ( ACE_HANDLE  fd  ) 

Note that this is just an initial implementation. We may take some shortcuts (we will) that will need to be dealt with later once a more robust implementation can be put in place.

Our handle_dds_input() method is called by the reactor when there is data to be pulled from our peer() ACE_SOCK_Stream.

Definition at line 86 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::BUFFER_LOW_WATER, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, max_marshaled_size(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes(), OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::handle_input(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input(), OpenDDS::DCPS::MulticastReceiveStrategy::handle_input(), and OpenDDS::DCPS::ShmemReceiveStrategy::read().

00087 {
00088   DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6);
00089 
00090   //
00091   // What we will be doing here:
00092   //
00093   //   1) Read as much data as possible from the handle.
00094   //   2) Process each Transport layer packet
00095   //      A) Parse the Transport header
00096   //      B) Process each DataSample in the packet
00097   //         a) Parse the DataSampleHeader
00098   //         b) Parse the remainder of the sample
00099   //         c) call data_received for complete samples
00100   //         d) store any partial sample or header data
00101   //   3) return
00102   //
00103   // The state that we might need to maintain includes:
00104   //     I) Current Transport layer header
00105   //    II) Current DataSample header
00106   //   III) Current DataSample data
00107   //
00108   // For each of these elements, they can be:
00109   //     i) empty
00110   //    ii) partial (not able to parse yet).
00111   //   iii) complete (parsed values available).
00112   //
00113   // The read buffers will be a series of data buffers of a fixed size
00114   // that are each managed by an ACE_Data_Block, each of which is
00115   // managed by an ACE_Message_Block containing the read and write
00116   // pointers into the data.
00117   //
00118   // As messages are parsed, new ACE_Message_Blocks will be formed with
00119   // read and write pointers adjusted to reference the individual
00120   // DataSample buffer contents.
00121   //
00122   // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks
00123   // will be acquired from a Cached_Allocater_with_overflow.  The size
00124   // of the individual data buffers will be set initially to the
00125   // ethernet MTU to allow for full TCP messages to be received into
00126   // individual blocks.
00127   //
00128   // Reading will be performed with a two member struct iov, to allow
00129   // for the remainder of the current buffer to be used along with at
00130   // least one completely empty buffer.  There will be a low water
00131   // parameter used to stop the use of the current block when the
00132   // available space in it is reduced.
00133   //
00134 
00135   //
00136   // Establish the current receive buffer.
00137   //
00138   //   This involves checking for any remainder from the previous trip
00139   //   through the code.  If there is no current buffer, or the buffer
00140   //   has less than the low water mark space() left, then promote the
00141   //   next receive buffer to the current.  If none is present, create a
00142   //   new one and use it.
00143   //
00144   //
00145   // Establish the next receive buffer.
00146   //
00147   //   This involves checking for any previous complete buffers that
00148   //   were not promoted in the previous step.  If none are present,
00149   //   create a new one and use that.
00150   //
00151 
00152   //
00153   // Remove any buffers that have been completely read and have less
00154   // than the low water amount of space left.
00155   //
00156   size_t index;
00157 
00158   for (index = 0; index < RECEIVE_BUFFERS; ++index) {
00159     if ((this->receive_buffers_[index] != 0)
00160         && (this->receive_buffers_[index]->length() == 0)
00161         && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) {
00162       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00163             "Remove a receive_buffer_[%d] from use.\n",
00164             index));
00165 
00166       // unlink any Message_Block that continues to this one
00167       // being removed.
00168       // This avoids a possible infinite ->cont() loop.
00169       for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) {
00170         if ((0 != this->receive_buffers_[ii]) &&
00171             (this->receive_buffers_[ii]->cont() ==
00172              this->receive_buffers_[index])) {
00173           this->receive_buffers_[ii]->cont(0);
00174         }
00175       }
00176 
00177       //
00178       // Remove the receive buffer from use.
00179       //
00180       ACE_DES_FREE(
00181         this->receive_buffers_[index],
00182         this->mb_allocator_.free,
00183         ACE_Message_Block);
00184       this->receive_buffers_[index] = 0;
00185     }
00186   }
00187 
00188   //
00189   // Allocate buffers for any empty slots.  We may have emptied one just
00190   // here, but others may have been emptied by a large read during the
00191   // last trip through the code as well.
00192   //
00193   size_t previous = this->buffer_index_;
00194 
00195   for (index = this->buffer_index_;
00196        this->successor_index(previous) != this->buffer_index_;
00197        index = this->successor_index(index)) {
00198     if (this->receive_buffers_[index] == 0) {
00199       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00200             "Allocate a Message_Block for new receive_buffer_[%d].\n",
00201             index));
00202 
00203       ACE_NEW_MALLOC_RETURN(
00204         this->receive_buffers_[index],
00205         (ACE_Message_Block*) this->mb_allocator_.malloc(
00206           sizeof(ACE_Message_Block)),
00207         ACE_Message_Block(
00208           RECEIVE_DATA_BUFFER_SIZE,     // Buffer size
00209           ACE_Message_Block::MB_DATA,   // Default
00210           0,                            // Start with no continuation
00211           0,                            // Let the constructor allocate
00212           &this->data_allocator_,       // Our buffer cache
00213           &this->receive_lock_,         // Our locking strategy
00214           ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default
00215           ACE_Time_Value::zero,         // Default
00216           ACE_Time_Value::max_time,     // Default
00217           &this->db_allocator_,         // Our data block cache
00218           &this->mb_allocator_          // Our message block cache
00219         ),
00220         -1);
00221     }
00222 
00223     //
00224     // Chain the buffers.  This allows us to have portions of parsed
00225     // data cross buffer boundaries without a problem.
00226     //
00227     if (previous != index) {
00228       this->receive_buffers_[previous]->cont(
00229         this->receive_buffers_[index]);
00230     }
00231 
00232     previous = index;
00233   }
00234 
00235   //
00236   // Read from handle.
00237   //
00238   //   This involves a non-blocking recvv_n().  Any remaining space in
00239   //   the buffers should be saved for the next pass through the code.
00240   //   If more than one receive buffer has data, chain them.  Promote
00241   //   all receive buffers so that the next pass through, the first
00242   //   buffer with space remaining will appear as the current receive
00243   //   buffer.
00244   //
00245   //   This is probably what should remain in the specific
00246   //   implementation, with the rest of this factored back up into the
00247   //   framework.
00248   //
00249 
00250   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00251         "Form the iovec from the message block\n"));
00252 
00253   //
00254   // Form the iovec from the message block chain of receive buffers.
00255   //
00256   iovec iov[RECEIVE_BUFFERS];
00257   size_t vec_index = 0;
00258   size_t current = this->buffer_index_;
00259 
00260   for (index = 0;
00261        index < RECEIVE_BUFFERS;
00262        ++index, current = this->successor_index(current)) {
00263     // Invariant.  ASSERT?
00264     if (this->receive_buffers_[current] == 0) {
00265       ACE_ERROR_RETURN((LM_ERROR,
00266                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00267                         ACE_TEXT("receive buffer management detected.\n")),
00268                        -1);
00269     }
00270 
00271 #ifdef _MSC_VER
00272 #pragma warning(push)
00273 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
00274 // since on other platforms iov_len is 64-bit
00275 #pragma warning(disable : 4267)
00276 #endif
00277     // This check covers the case where we have unread data in
00278     // the first buffer, but no space to write any more data.
00279     if (this->receive_buffers_[current]->space() > 0) {
00280       iov[vec_index].iov_len  = this->receive_buffers_[current]->space();
00281       iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr();
00282 
00283       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00284             "index==%d, len==%d, base==%x\n",
00285             vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base));
00286 
00287       vec_index++;
00288     }
00289   }
00290 
00291 #ifdef _MSC_VER
00292 #pragma warning(pop)
00293 #endif
00294 
00295   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00296         "Perform the recvv() call\n"));
00297 
00298   //
00299   // Read into the buffers.
00300   //
00301   ACE_INET_Addr remote_address;
00302   ssize_t bytes_remaining = this->receive_bytes(iov,
00303                                                 static_cast<int>(vec_index),
00304                                                 remote_address,
00305                                                 fd);
00306 
00307   if (bytes_remaining < 0) {
00308     ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ")
00309               ACE_TEXT("with data link detected: %p.\n"),
00310               ACE_TEXT("receive_bytes")));
00311 
00312     // The relink() will handle the connection to the ReconnectTask to do
00313     // the reconnect so this reactor thread will not be block.
00314     this->relink();
00315 
00316     // Close connection anyway.
00317     return -1;
00318     // Returning -1 takes the handle out of the reactor read mask.
00319   }
00320 
00321   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00322             "recvv() return %d - we call this the bytes_remaining.\n",
00323             bytes_remaining), 5);
00324 
00325   if (bytes_remaining == 0) {
00326     if (this->gracefully_disconnected_) {
00327       VDBG_LVL((LM_INFO,
00328                 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n"))
00329                ,1);
00330       return -1;
00331 
00332     } else {
00333       VDBG_LVL((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Unrecoverable problem ")
00334                 ACE_TEXT("with data link detected\n")), 1);
00335 
00336       // The relink() will handle the connection to the ReconnectTask to do
00337       // the reconnect so this reactor thread will not be block.
00338       this->relink();
00339 
00340       // Close connection anyway.
00341       return -1;
00342       // Returning -1 takes the handle out of the reactor read mask.
00343     }
00344   }
00345 
00346   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
00347             "START Adjust the message block chain pointers to account for the "
00348             "new data.\n"), 5);
00349 
00350   //
00351   // Adjust the message block chain pointers to account for the new
00352   // data.
00353   //
00354   size_t bytes = bytes_remaining;
00355 
00356   if (!this->pdu_remaining_) {
00357     this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes);
00358   }
00359 
00360   for (index = this->buffer_index_;
00361        bytes > 0;
00362        index = this->successor_index(index)) {
00363     VDBG((LM_DEBUG,"(%P|%t) DBG:    -> "
00364           "At top of for..loop block.\n"));
00365     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00366           "index == %d.\n", index));
00367     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00368           "bytes == %d.\n", bytes));
00369 
00370     size_t amount
00371     = ace_min<size_t>(bytes, this->receive_buffers_[index]->space());
00372 
00373     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00374           "amount == %d.\n", amount));
00375 
00376     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00377           "this->receive_buffers_[index]->rd_ptr() ==  %u.\n",
00378           this->receive_buffers_[index]->rd_ptr()));
00379     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00380           "this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00381           this->receive_buffers_[index]->wr_ptr()));
00382 
00383     this->receive_buffers_[index]->wr_ptr(amount);
00384 
00385     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00386           "Now, this->receive_buffers_[index]->wr_ptr() ==  %u.\n",
00387           this->receive_buffers_[index]->wr_ptr()));
00388 
00389     bytes -= amount;
00390 
00391     VDBG((LM_DEBUG,"(%P|%t) DBG:       "
00392           "Now, bytes == %d .\n", bytes));
00393 
00394     // This is yukky to do here, but there is a fine line between
00395     // where things are moved and where they are checked.
00396     if (bytes > 0 && this->successor_index(index) == this->buffer_index_) {
00397       // Here we have read more data than we passed in buffer.  Bad.
00398       ACE_ERROR_RETURN((LM_ERROR,
00399                         ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ")
00400                         ACE_TEXT("receive buffer management detected: ")
00401                         ACE_TEXT("read more bytes than available.\n")),
00402                        -1);
00403     }
00404   }
00405 
00406   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00407         "DONE Adjust the message block chain pointers to account "
00408         "for the new data.\n"));
00409 
00410   //
00411   // While the receive buffer is not empty:
00412   //
00413   //   This is the receive buffer(s) that was just read.  The receive
00414   //   buffer message block is used to manage our parsing through this
00415   //   data.  When we have parsed out all the data, then it will be
00416   //   considered empty.  This message block read pointer indicates
00417   //   where we are in the parsing process.  Other message blocks refer
00418   //   to the parsed data and retain the data block references to
00419   //   prevent the data from being released until we have completed all
00420   //   uses for the data in the receive buffer.  This will normally be
00421   //   after the sample data is demarshaled in the DataReader
00422   //   components.
00423   //
00424   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00425         "Start processing the data we just received.\n"));
00426 
00427   //
00428   // Operate while we have more data to process.
00429   //
00430   while (true) {
00431 
00432     //
00433     // Manage the current transport header.
00434     //
00435     //   This involves checking to see if we have remaining data in the
00436     //   current packet.  If not, or if we have not read a complete
00437     //   header, then we read a transport header and check it for
00438     //   validity.  As we have a valid transport header with data
00439     //   remaining to read in the packet that it represents, we move on.
00440     //
00441     //   As new transport headers are to be read, they are read into a
00442     //   message buffer member variable and demarshaled directly.  The
00443     //   values are retained for the lifetime of the processing of the
00444     //   instant transport packet.  The member message buffer allows us
00445     //   to retain partially read transport headers until we can read
00446     //   more data.
00447     //
00448 
00449     //
00450     // Check the remaining transport packet length to see if we are
00451     // expecting a new one.
00452     //
00453     if (this->pdu_remaining_ == 0) {
00454       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00455             "We are expecting a transport packet header.\n"));
00456 
00457       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00458             "this->buffer_index_ == %d.\n",this->buffer_index_));
00459 
00460       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00461             "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00462             "== %u.\n",
00463             this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00464 
00465       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00466             "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00467             "== %u.\n",
00468             this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00469 
00470       if (this->receive_buffers_[this->buffer_index_]->total_length()
00471           < this->receive_transport_header_.max_marshaled_size()) {
00472         //
00473         // Not enough room in the buffer for the entire Transport
00474         // header that we need to read, so relinquish control until
00475         // we get more data.
00476         //
00477         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00478               "Not enough bytes read to account for a transport "
00479               "packet header.  We are done here - we need to "
00480               "receive more bytes.\n"));
00481 
00482         this->receive_transport_header_.incomplete(
00483           *this->receive_buffers_[this->buffer_index_]);
00484 
00485         return 0;
00486 
00487       } else {
00488         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00489               "We have enough bytes to demarshall the transport "
00490               "packet header.\n"));
00491 
00492         // only do the hexdump if it will be printed - to not impact performance.
00493         if (Transport_debug_level > 5) {
00494           ACE_TCHAR xbuffer[4096];
00495           const ACE_Message_Block& mb =
00496             *this->receive_buffers_[this->buffer_index_];
00497           size_t xbytes = mb.length();
00498 
00499           xbytes = (std::min)(xbytes, TH::max_marshaled_size());
00500 
00501           ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer));
00502 
00503           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00504                 "Hex Dump of transport header block "
00505                 "(%d bytes):\n%s\n", xbytes, xbuffer));
00506         }
00507 
00508         //
00509         // Demarshal the transport header.
00510         //
00511         this->receive_transport_header_ =
00512           *this->receive_buffers_[this->buffer_index_];
00513 
00514         //
00515         // Check the TransportHeader.
00516         //
00517         if (!this->receive_transport_header_.valid()) {
00518           ACE_ERROR_RETURN((LM_ERROR,
00519                             ACE_TEXT
00520                             ("(%P|%t) ERROR: TransportHeader invalid.\n")),
00521                            -1);
00522         }
00523 
00524         this->good_pdu_ = check_header(this->receive_transport_header_);
00525         this->pdu_remaining_ = this->receive_transport_header_.length_;
00526 
00527         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00528               "Amount of transport packet bytes (remaining): %d.\n",
00529               this->pdu_remaining_));
00530       }
00531     }
00532 
00533     //
00534     // Ignore bad PDUs by skipping over them.  We do this out here
00535     // in case we did not skip over the entire bad PDU last time.
00536     //
00537     {
00538       int rtn_code = skip_bad_pdus();
00539       if (rtn_code <= 0) return rtn_code;
00540     }
00541 
00542     //
00543     // Keep processing samples while we have data to read.
00544     //
00545     while (this->pdu_remaining_ > 0) {
00546       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00547             "We have a transport packet now.  There are more sample "
00548             "bytes to parse in order to complete the packet.\n"));
00549 
00550       //
00551       // Manage the current sample header.
00552       //
00553       //  This involves checking to see if we have remaining data in the
00554       //  current sample.  If not, or if we have not read a complete
00555       //  header, then we read a sample header and check it for validity,
00556       //  which currently involves checking the message type only.  As we
00557       //  have a valid sample header with data remaining to read in the
00558       //  sample that it represents, we move on.
00559       //
00560       //  As new sample headers are read, the are read into a message
00561       //  buffer member variable and demarshaled directly.  The values are
00562       //  retained for the lifetime of the sample and are passed as part
00563       //  of the recieve data sample itself.  The member message buffer
00564       //  allows us to retain partially read sample headers until we can
00565       //  read more data.
00566       //
00567       if (this->receive_sample_remaining_ == 0) {
00568         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00569               "We are not working on some remaining sample data.  "
00570               "We are expecting to parse a sample header now.\n"));
00571 
00572         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00573               "this->buffer_index_ == %d.\n",this->buffer_index_));
00574 
00575         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00576               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00577               "== %u.\n",
00578               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00579 
00580         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00581               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00582               "== %u.\n",
00583               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00584 
00585         if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) {
00586           //
00587           // Not enough room in the buffer for the entire Sample
00588           // header that we need to read, so relinquish control until
00589           // we get more data.
00590           //
00591           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00592                 "Not enough bytes have been received to account "
00593                 "for a complete sample header.  We are done for "
00594                 "now - we need to receive more data before we "
00595                 "can go on.\n"));
00596           if (Transport_debug_level > 2) {
00597             ACE_TCHAR ebuffer[350];
00598             ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_];
00599             const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length());
00600             ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer));
00601             ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   "
00602               "Partial DataSampleHeader:\n%s\n", ebuffer));
00603           }
00604 
00605           return 0;
00606 
00607         } else {
00608           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00609                 "We have received enough bytes for the sample "
00610                 "header.  Demarshall the sample header now.\n"));
00611 
00612           // only do the hexdump if it will be printed - to not impact performance.
00613           if (Transport_debug_level > 5) {
00614             ACE_TCHAR ebuffer[4096];
00615             ACE::format_hexdump
00616             (this->receive_buffers_[this->buffer_index_]->rd_ptr(),
00617              this->data_sample_header_.max_marshaled_size(),
00618              ebuffer, sizeof(ebuffer));
00619 
00620             VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00621                   "Hex Dump:\n%s\n", ebuffer));
00622           }
00623 
00624           this->data_sample_header_.pdu_remaining(this->pdu_remaining_);
00625 
00626           //
00627           // Demarshal the sample header.
00628           //
00629           this->data_sample_header_ =
00630             *this->receive_buffers_[this->buffer_index_];
00631 
00632           //
00633           // Check the DataSampleHeader.
00634           //
00635 
00636           this->good_pdu_ = check_header(data_sample_header_);
00637 
00638           //
00639           // Set the amount to parse into the message buffer.  We
00640           // can't just use the header value to keep track of
00641           // where we are since downstream processing will expect
00642           // the value to be correct (unadjusted by us).
00643           //
00644           this->receive_sample_remaining_ =
00645             this->data_sample_header_.message_length();
00646 
00647           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00648                 "The demarshalled sample header says that we "
00649                 "have %d bytes to read for the data portion of "
00650                 "the sample.\n",
00651                 this->receive_sample_remaining_));
00652 
00653           //
00654           // Decrement packet size.
00655           //
00656           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00657                 "this->data_sample_header_.marshaled_size() "
00658                 "== %d.\n",
00659                 this->data_sample_header_.marshaled_size()));
00660 
00661           this->pdu_remaining_
00662           -= this->data_sample_header_.marshaled_size();
00663 
00664           VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00665                 "Amount of transport packet remaining: %d.\n",
00666                 this->pdu_remaining_));
00667 
00668           int rtn_code = skip_bad_pdus();
00669           if (rtn_code <= 0) return rtn_code;
00670         }
00671       }
00672 
00673       bool last_buffer = false;
00674       update_buffer_index(last_buffer);
00675 
00676       if (this->receive_sample_remaining_ > 0) {
00677         //
00678         // Manage the current sample data.
00679         //
00680         //   This involves reading data to complete the current sample.  As
00681         //   samples are completed, they are dispatched via the
00682         //   data_received() mechanism.  This data is read into message
00683         //   blocks that are obtained from the pool of message blocks since
00684         //   the lifetime of this data will last until the DataReader
00685         //   components demarshal the sample data.  A reference to the
00686         //   current sample being built is retained as a member to allow us
00687         //   to hold partialy read samples until they are completed.
00688         //
00689         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00690               "Determine amount of data for the next block in the chain\n"));
00691 
00692         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00693               "this->buffer_index_ == %d.\n",this->buffer_index_));
00694 
00695         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00696               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00697               "== %u.\n",
00698               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00699 
00700         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00701               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00702               "== %u.\n",
00703               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00704         //
00705         // Determine the amount of data for the next block in the chain.
00706         //
00707         const size_t amount = ace_min<size_t>(
00708           this->receive_sample_remaining_,
00709           this->receive_buffers_[this->buffer_index_]->length());
00710 
00711         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00712               "amount of data for the next block in the chain is %d\n",
00713               amount));
00714         //
00715         // Now create a message block for the data in the current buffer
00716         // and chain it if we are starting a new sample.
00717         //
00718         ACE_Message_Block* current_sample_block = 0;
00719         ACE_NEW_MALLOC_RETURN(
00720           current_sample_block,
00721           (ACE_Message_Block*) this->mb_allocator_.malloc(
00722             sizeof(ACE_Message_Block)),
00723           ACE_Message_Block(
00724             this->receive_buffers_[this->buffer_index_]
00725             ->data_block()->duplicate(),
00726             0,
00727             &this->mb_allocator_),
00728           -1);
00729 
00730         //
00731         // Chain it to the end of the current sample.
00732         //
00733         if (this->payload_ == 0) {
00734           this->payload_ = current_sample_block;
00735 
00736         } else {
00737           ACE_Message_Block* block = this->payload_;
00738 
00739           while (block->cont() != 0) {
00740             block = block->cont();
00741           }
00742 
00743           block->cont(current_sample_block);
00744         }
00745 
00746         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00747               "Before adjustment of the pointers and byte counters\n"));
00748 
00749         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00750               "this->payload_->rd_ptr() "
00751               "== %u.\n",
00752               this->payload_->rd_ptr()));
00753 
00754         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00755               "this->payload_->wr_ptr() "
00756               "== %u.\n",
00757               this->payload_->wr_ptr()));
00758 
00759         //
00760         // Adjust the pointers and byte counters.
00761         //
00762         current_sample_block->rd_ptr(
00763           this->receive_buffers_[this->buffer_index_]->rd_ptr());
00764         current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount);
00765         this->receive_buffers_[this->buffer_index_]->rd_ptr(amount);
00766         this->receive_sample_remaining_ -= amount;
00767         this->pdu_remaining_            -= amount;
00768 
00769         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00770               "After adjustment of the pointers and byte counters\n"));
00771 
00772         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00773               "this->payload_->rd_ptr() "
00774               "== %u.\n",
00775               this->payload_->rd_ptr()));
00776 
00777         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00778               "this->payload_->wr_ptr() "
00779               "== %u.\n",
00780               this->payload_->wr_ptr()));
00781 
00782         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00783               "this->receive_buffers_[this->buffer_index_]->rd_ptr() "
00784               "== %u.\n",
00785               this->receive_buffers_[this->buffer_index_]->rd_ptr()));
00786 
00787         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00788               "this->receive_buffers_[this->buffer_index_]->wr_ptr() "
00789               "== %u.\n",
00790               this->receive_buffers_[this->buffer_index_]->wr_ptr()));
00791 
00792         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00793               "After adjustment, remaining sample bytes == %d\n",
00794               this->receive_sample_remaining_));
00795         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00796               "After adjustment, remaining transport packet bytes == %d\n",
00797               this->pdu_remaining_));
00798       }
00799 
00800       //
00801       // Dispatch the received message if we have received it all.
00802       //
00803       // NB: Since we are doing this synchronously and without passing
00804       //     ownership of the sample, we can use NULL mutex lock for
00805       //     the allocators.  Since only one thread can be in this
00806       //     routine at a time, that is.
00807       //
00808       if (this->receive_sample_remaining_ == 0) {
00809         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00810               "Now dispatch the sample to the DataLink\n"));
00811 
00812         ReceivedDataSample rds(this->payload_);
00813         this->payload_ = 0;  // rds takes ownership of payload_
00814         if (this->data_sample_header_.into_received_data_sample(rds)) {
00815 
00816           if (this->data_sample_header_.more_fragments()
00817               || this->receive_transport_header_.last_fragment()) {
00818             VDBG((LM_DEBUG,"(%P|%t) DBG:   Attempt reassembly of fragments\n"));
00819 
00820             if (this->reassemble(rds)) {
00821               VDBG((LM_DEBUG,"(%P|%t) DBG:   Reassembled complete message\n"));
00822               this->deliver_sample(rds, remote_address);
00823             }
00824             // If reassemble() returned false, it takes ownership of the data
00825             // just like deliver_sample() does.
00826 
00827           } else {
00828             this->deliver_sample(rds, remote_address);
00829           }
00830         }
00831 
00832         // For the reassembly algorithm, the 'last_fragment_' header bit only
00833         // applies to the first DataSampleHeader in the TransportHeader
00834         this->receive_transport_header_.last_fragment(false);
00835 
00836         VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00837               "Release the sample that we just sent.\n"));
00838         // ~ReceivedDataSample() releases the payload_ message block
00839       }
00840 
00841       update_buffer_index(last_buffer);
00842 
00843       if (last_buffer) {
00844         // Relinquish control if there is no more data to process.
00845         VDBG((LM_DEBUG,"(%P|%t) DBG:   We are done - no more data.\n"));
00846         return 0;
00847       }
00848 
00849     } // End of while (this->pdu_remaining_ > 0)
00850 
00851     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00852           "Let's try to do some more.\n"));
00853   } // End of while (true)
00854 
00855   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00856         "It looks like we are done - the done loop has finished.\n"));
00857   //
00858   // Relinquish control.
00859   //
00860   //   This involves ensuring that when we reenter this method, we will
00861   //   pick up from where we left off correctly.
00862   //
00863   return 0;
00864 }

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining (  )  const [inline, protected]

Definition at line 87 of file TransportReceiveStrategy_T.h.

00087 { return this->pdu_remaining_; }

template<typename TH, typename DSH>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble ( ReceivedDataSample data  )  [private, virtual]

Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Definition at line 868 of file TransportReceiveStrategy_T.cpp.

00869 {
00870   ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() "
00871     "WARNING: derived class must override if specific transport type uses "
00872     "fragmentation and reassembly\n"));
00873   return false;
00874 }

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual ssize_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes ( iovec  iov[],
int  n,
ACE_INET_Addr &  remote_address,
ACE_HANDLE  fd 
) [protected, pure virtual]

Only our subclass knows how to do this.

Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

template<typename TH, typename DSH>
ACE_INLINE TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header (  ) 

Definition at line 36 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.

00037 {
00038   return this->receive_transport_header_;
00039 }

template<typename TH, typename DSH>
ACE_INLINE const TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header (  )  const

Provides access to the received transport header for subclasses.

Definition at line 29 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.

Referenced by OpenDDS::DCPS::MulticastDataLink::ready_to_deliver(), OpenDDS::DCPS::UdpReceiveStrategy::reassemble(), OpenDDS::DCPS::MulticastReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastDataLink::sample_received().

00030 {
00031   return this->receive_transport_header_;
00032 }

template<typename TH, typename DSH>
ACE_INLINE DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header (  ) 

Definition at line 50 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.

00051 {
00052   return this->data_sample_header_;
00053 }

template<typename TH, typename DSH>
ACE_INLINE const DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header (  )  const

Provides access to the received sample header for subclasses.

Definition at line 43 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.

Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble().

00044 {
00045   return this->data_sample_header_;
00046 }

template<typename TH, typename DSH>
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink ( bool  do_suspend = true  )  [virtual]

The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.

Reimplemented in OpenDDS::DCPS::TcpReceiveStrategy.

Definition at line 64 of file TransportReceiveStrategy_T.inl.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00065 {
00066   // The subclass needs implement this function for re-establishing
00067   // the link upon recv failure.
00068 }

template<typename TH, typename DSH>
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset (  )  [protected]

For datagram-based derived classes, reset() can be called to clear any state that may be remaining from parsing the previous datagram.

Definition at line 878 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_.

Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::handle_input().

00879 {
00880   this->receive_sample_remaining_ = 0;
00881   ACE_Message_Block::release(this->payload_);
00882   this->payload_ = 0;
00883   this->good_pdu_ = true;
00884   this->pdu_remaining_ = 0;
00885   for (int i = 0; i < RECEIVE_BUFFERS; ++i) {
00886     ACE_Message_Block& rb = *this->receive_buffers_[i];
00887     rb.rd_ptr(rb.wr_ptr());
00888   }
00889 }

template<typename TH, typename DSH>
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus (  )  [protected]

Ignore bad PDUs by skipping over them.

Definition at line 915 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index().

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00916 {
00917   if (this->good_pdu_) return 1;
00918 
00919   //
00920   // Adjust the message block chain pointers to account for the
00921   // skipped data.
00922   //
00923   for (size_t index = this->buffer_index_;
00924        this->pdu_remaining_ > 0;
00925        index = this->successor_index(index)) {
00926     size_t amount =
00927       ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length());
00928 
00929     this->receive_buffers_[index]->rd_ptr(amount);
00930     this->pdu_remaining_ -= amount;
00931 
00932     if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) {
00933       ACE_ERROR_RETURN((LM_ERROR,
00934                         ACE_TEXT("(%P|%t) ERROR: ")
00935                         ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()")
00936                         ACE_TEXT(" - Unrecoverably corrupted ")
00937                         ACE_TEXT("receive buffer management detected: ")
00938                         ACE_TEXT("read more bytes than available.\n")),
00939                        -1);
00940     }
00941   }
00942 
00943   this->receive_sample_remaining_ = 0;
00944 
00945   this->receive_sample_remaining_ = 0;
00946 
00947   bool done = false;
00948   update_buffer_index(done);
00949   return done ? 0 : 1;
00950 }

template<typename TH, typename DSH>
ACE_INLINE int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start (  )  [virtual]

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 13 of file TransportReceiveStrategy_T.inl.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i().

Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::read().

00014 {
00015   DBG_ENTRY_LVL("TransportReceiveStrategy","start",6);
00016   return this->start_i();
00017 }

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i (  )  [protected, pure virtual]

Let the subclass start.

Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start().

template<typename TH, typename DSH>
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop (  )  [virtual]

Implements OpenDDS::DCPS::TransportStrategy.

Definition at line 21 of file TransportReceiveStrategy_T.inl.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i().

00022 {
00023   DBG_ENTRY_LVL("TransportReceiveStrategy","stop",6);
00024   this->stop_i();
00025 }

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i (  )  [protected, pure virtual]

Let the subclass stop.

Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop().

template<typename TH, typename DSH>
ACE_INLINE size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index ( size_t  index  )  const [private]

Manage an index into the receive buffer array.

Definition at line 57 of file TransportReceiveStrategy_T.inl.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().

00058 {
00059   return ++index % RECEIVE_BUFFERS;
00060 }

template<typename TH, typename DSH>
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index ( bool &  done  )  [private]

Definition at line 893 of file TransportReceiveStrategy_T.cpp.

References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, and VDBG.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().

00894 {
00895   VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00896         "Adjust the buffer chain in case we crossed into the next "
00897         "buffer after the last read(s).\n"));
00898   const size_t initial = this->buffer_index_;
00899   while (this->receive_buffers_[this->buffer_index_]->length() == 0) {
00900     this->buffer_index_ = this->successor_index(this->buffer_index_);
00901 
00902     VDBG((LM_DEBUG,"(%P|%t) DBG:   "
00903           "Set this->buffer_index_ = %d.\n",
00904           this->buffer_index_));
00905 
00906     if (initial == this->buffer_index_) {
00907       done = true; // no other buffers in receive_buffers_ have data
00908       return;
00909     }
00910   }
00911 }


Member Data Documentation

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_ [private]

Current receive buffer index in use.

Definition at line 135 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_ [private]

Definition at line 126 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
DSH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_ [private]

Current data sample header.

Definition at line 138 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportDataBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_ [private]

Definition at line 125 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_ [private]

Flag indicating that the currently resident PDU is a good one (i.e. has not been received and processed previously). This is included in case we receive PDUs that were resent for reliability reasons and we receive one even if we have already processed it. This is a use case from multicast transports.

Definition at line 148 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_ [protected]

Flag indicates if the GRACEFUL_DISCONNECT message is received.

Definition at line 90 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected(), and OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TransportMessageBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_ [private]

Definition at line 124 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_ [private]

Definition at line 140 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_ [private]

Amount of the current PDU that has not been processed yet.

Definition at line 151 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< OpenDDS::DCPS::RtpsTransportHeader, OpenDDS::DCPS::RtpsSampleHeader >::pdu_remaining(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_[RECEIVE_BUFFERS] [private]

Set of receive buffers in use.

Definition at line 132 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_ [private]

Locking strategy for the allocators.

Definition at line 129 of file TransportReceiveStrategy_T.h.

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_ [private]

Bytes remaining in the current DataSample.

Definition at line 102 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().

template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
TH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_ [private]

Current receive TransportHeader.

Definition at line 105 of file TransportReceiveStrategy_T.h.

Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:32 2016 for OpenDDS by  doxygen 1.4.7