#include <TransportReceiveStrategy_T.h>
Public Member Functions | |
virtual | ~TransportReceiveStrategy () |
int | start () |
void | stop () |
int | handle_dds_input (ACE_HANDLE fd) |
virtual void | relink (bool do_suspend=true) |
const TH & | received_header () const |
TH & | received_header () |
const DSH & | received_sample_header () const |
DSH & | received_sample_header () |
Protected Member Functions | |
TransportReceiveStrategy () | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)=0 |
Only our subclass knows how to do this. | |
virtual bool | check_header (const TH &header) |
Check the transport header for suitability. | |
virtual bool | check_header (const DSH &header) |
Check the data sample header for suitability. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0 |
Called when there is a ReceivedDataSample to be delivered. | |
virtual int | start_i ()=0 |
Let the subclass start. | |
virtual void | stop_i ()=0 |
Let the subclass stop. | |
int | skip_bad_pdus () |
Ignore bad PDUs by skipping over them. | |
void | reset () |
size_t | pdu_remaining () const |
Protected Attributes | |
bool | gracefully_disconnected_ |
Flag indicates if the GRACEFUL_DISCONNECT message is received. | |
Private Types | |
enum | { RECEIVE_BUFFERS = 16 } |
enum | { BUFFER_LOW_WATER = 4096 } |
enum | { MESSAGE_BLOCKS = 1000 } |
enum | { DATA_BLOCKS = 100 } |
Private Member Functions | |
size_t | successor_index (size_t index) const |
Manage an index into the receive buffer array. | |
void | update_buffer_index (bool &done) |
virtual bool | reassemble (ReceivedDataSample &data) |
Private Attributes | |
size_t | receive_sample_remaining_ |
Bytes remaining in the current DataSample. | |
TH | receive_transport_header_ |
Current receive TransportHeader. | |
TransportMessageBlockAllocator | mb_allocator_ |
TransportDataBlockAllocator | db_allocator_ |
TransportDataAllocator | data_allocator_ |
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > | receive_lock_ |
Locking strategy for the allocators. | |
ACE_Message_Block * | receive_buffers_ [RECEIVE_BUFFERS] |
Set of receive buffers in use. | |
size_t | buffer_index_ |
Current receive buffer index in use. | |
DSH | data_sample_header_ |
Current data sample header. | |
ACE_Message_Block * | payload_ |
bool | good_pdu_ |
size_t | pdu_remaining_ |
Amount of the current PDU that has not been processed yet. |
This class provides buffer for data received by transports, de-assemble the data to individual samples and deliver them.
Definition at line 31 of file TransportReceiveStrategy_T.h.
anonymous enum [private] |
Definition at line 114 of file TransportReceiveStrategy_T.h.
00114 { RECEIVE_BUFFERS = 16 };
anonymous enum [private] |
Definition at line 115 of file TransportReceiveStrategy_T.h.
00115 { BUFFER_LOW_WATER = 4096 };
anonymous enum [private] |
Definition at line 121 of file TransportReceiveStrategy_T.h.
00121 { MESSAGE_BLOCKS = 1000 };
anonymous enum [private] |
Definition at line 122 of file TransportReceiveStrategy_T.h.
00122 { DATA_BLOCKS = 100 };
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy | ( | ) | [inline, virtual] |
Definition at line 51 of file TransportReceiveStrategy_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, DBG_ENTRY_LVL, LM_WARNING, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, size, and ACE_Message_Block::total_length().
00052 { 00053 DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6); 00054 00055 if (this->receive_buffers_[this->buffer_index_] != 0) { 00056 size_t size = this->receive_buffers_[this->buffer_index_]->total_length(); 00057 00058 if (size > 0) { 00059 ACE_DEBUG((LM_WARNING, 00060 ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ") 00061 ACE_TEXT("terminating with %d unprocessed bytes.\n"), 00062 size)); 00063 } 00064 } 00065 }
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy | ( | ) | [inline, protected] |
Definition at line 22 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::DATA_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, ACE_OS::memset(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::MESSAGE_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, and OpenDDS::DCPS::Transport_debug_level.
00023 : gracefully_disconnected_(false), 00024 receive_sample_remaining_(0), 00025 mb_allocator_(MESSAGE_BLOCKS), 00026 db_allocator_(DATA_BLOCKS), 00027 data_allocator_(DATA_BLOCKS), 00028 buffer_index_(0), 00029 payload_(0), 00030 good_pdu_(true), 00031 pdu_remaining_(0) 00032 { 00033 DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6); 00034 00035 if (Transport_debug_level >= 2) { 00036 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb" 00037 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00038 &mb_allocator_, MESSAGE_BLOCKS)); 00039 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db" 00040 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00041 &db_allocator_, DATA_BLOCKS)); 00042 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data" 00043 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00044 &data_allocator_, DATA_BLOCKS)); 00045 } 00046 00047 ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_)); 00048 }
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header | ( | const DSH & | header | ) | [inline, protected, virtual] |
Check the data sample header for suitability.
Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Definition at line 76 of file TransportReceiveStrategy_T.cpp.
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header | ( | const TH & | header | ) | [inline, protected, virtual] |
Check the transport header for suitability.
Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Definition at line 69 of file TransportReceiveStrategy_T.cpp.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [protected, pure virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input | ( | ACE_HANDLE | fd | ) | [inline] |
Note that this is just an initial implementation. We may take some shortcuts (we will) that will need to be dealt with later once a more robust implementation can be put in place.
Our handle_dds_input() method is called by the reactor when there is data to be pulled from our peer() ACE_SOCK_Stream.
Definition at line 89 of file TransportReceiveStrategy_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::BUFFER_LOW_WATER, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header(), ACE_Message_Block::cont(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample(), ACE::format_hexdump(), OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::free(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_, iovec::iov_base, iovec::iov_len, ACE_Message_Block::length(), LM_DEBUG, LM_ERROR, LM_INFO, LM_WARNING, OpenDDS::DCPS::Cached_Allocator_With_Overflow< T, ACE_LOCK >::malloc(), ACE_Time_Value::max_time, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes(), OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), ACE_Message_Block::space(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), VDBG, VDBG_LVL, ACE_Message_Block::wr_ptr(), and ACE_Time_Value::zero.
00090 { 00091 DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6); 00092 00093 // 00094 // What we will be doing here: 00095 // 00096 // 1) Read as much data as possible from the handle. 00097 // 2) Process each Transport layer packet 00098 // A) Parse the Transport header 00099 // B) Process each DataSample in the packet 00100 // a) Parse the DataSampleHeader 00101 // b) Parse the remainder of the sample 00102 // c) call data_received for complete samples 00103 // d) store any partial sample or header data 00104 // 3) return 00105 // 00106 // The state that we might need to maintain includes: 00107 // I) Current Transport layer header 00108 // II) Current DataSample header 00109 // III) Current DataSample data 00110 // 00111 // For each of these elements, they can be: 00112 // i) empty 00113 // ii) partial (not able to parse yet). 00114 // iii) complete (parsed values available). 00115 // 00116 // The read buffers will be a series of data buffers of a fixed size 00117 // that are each managed by an ACE_Data_Block, each of which is 00118 // managed by an ACE_Message_Block containing the read and write 00119 // pointers into the data. 00120 // 00121 // As messages are parsed, new ACE_Message_Blocks will be formed with 00122 // read and write pointers adjusted to reference the individual 00123 // DataSample buffer contents. 00124 // 00125 // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks 00126 // will be acquired from a Cached_Allocater_with_overflow. The size 00127 // of the individual data buffers will be set initially to the 00128 // ethernet MTU to allow for full TCP messages to be received into 00129 // individual blocks. 00130 // 00131 // Reading will be performed with a two member struct iov, to allow 00132 // for the remainder of the current buffer to be used along with at 00133 // least one completely empty buffer. There will be a low water 00134 // parameter used to stop the use of the current block when the 00135 // available space in it is reduced. 00136 // 00137 00138 // 00139 // Establish the current receive buffer. 00140 // 00141 // This involves checking for any remainder from the previous trip 00142 // through the code. If there is no current buffer, or the buffer 00143 // has less than the low water mark space() left, then promote the 00144 // next receive buffer to the current. If none is present, create a 00145 // new one and use it. 00146 // 00147 // 00148 // Establish the next receive buffer. 00149 // 00150 // This involves checking for any previous complete buffers that 00151 // were not promoted in the previous step. If none are present, 00152 // create a new one and use that. 00153 // 00154 00155 // 00156 // Remove any buffers that have been completely read and have less 00157 // than the low water amount of space left. 00158 // 00159 size_t index; 00160 00161 for (index = 0; index < RECEIVE_BUFFERS; ++index) { 00162 if ((this->receive_buffers_[index] != 0) 00163 && (this->receive_buffers_[index]->length() == 0) 00164 && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) { 00165 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00166 "Remove a receive_buffer_[%d] from use.\n", 00167 index)); 00168 00169 // unlink any Message_Block that continues to this one 00170 // being removed. 00171 // This avoids a possible infinite ->cont() loop. 00172 for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) { 00173 if ((0 != this->receive_buffers_[ii]) && 00174 (this->receive_buffers_[ii]->cont() == 00175 this->receive_buffers_[index])) { 00176 this->receive_buffers_[ii]->cont(0); 00177 } 00178 } 00179 00180 // 00181 // Remove the receive buffer from use. 00182 // 00183 ACE_DES_FREE( 00184 this->receive_buffers_[index], 00185 this->mb_allocator_.free, 00186 ACE_Message_Block); 00187 this->receive_buffers_[index] = 0; 00188 } 00189 } 00190 00191 // 00192 // Allocate buffers for any empty slots. We may have emptied one just 00193 // here, but others may have been emptied by a large read during the 00194 // last trip through the code as well. 00195 // 00196 size_t previous = this->buffer_index_; 00197 00198 for (index = this->buffer_index_; 00199 this->successor_index(previous) != this->buffer_index_; 00200 index = this->successor_index(index)) { 00201 if (this->receive_buffers_[index] == 0) { 00202 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00203 "Allocate a Message_Block for new receive_buffer_[%d].\n", 00204 index)); 00205 00206 ACE_NEW_MALLOC_RETURN( 00207 this->receive_buffers_[index], 00208 (ACE_Message_Block*) this->mb_allocator_.malloc( 00209 sizeof(ACE_Message_Block)), 00210 ACE_Message_Block( 00211 RECEIVE_DATA_BUFFER_SIZE, // Buffer size 00212 ACE_Message_Block::MB_DATA, // Default 00213 0, // Start with no continuation 00214 0, // Let the constructor allocate 00215 &this->data_allocator_, // Our buffer cache 00216 &this->receive_lock_, // Our locking strategy 00217 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default 00218 ACE_Time_Value::zero, // Default 00219 ACE_Time_Value::max_time, // Default 00220 &this->db_allocator_, // Our data block cache 00221 &this->mb_allocator_ // Our message block cache 00222 ), 00223 -1); 00224 } 00225 00226 // 00227 // Chain the buffers. This allows us to have portions of parsed 00228 // data cross buffer boundaries without a problem. 00229 // 00230 if (previous != index) { 00231 this->receive_buffers_[previous]->cont( 00232 this->receive_buffers_[index]); 00233 } 00234 00235 previous = index; 00236 } 00237 00238 // 00239 // Read from handle. 00240 // 00241 // This involves a non-blocking recvv_n(). Any remaining space in 00242 // the buffers should be saved for the next pass through the code. 00243 // If more than one receive buffer has data, chain them. Promote 00244 // all receive buffers so that the next pass through, the first 00245 // buffer with space remaining will appear as the current receive 00246 // buffer. 00247 // 00248 // This is probably what should remain in the specific 00249 // implementation, with the rest of this factored back up into the 00250 // framework. 00251 // 00252 00253 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00254 "Form the iovec from the message block\n")); 00255 00256 // 00257 // Form the iovec from the message block chain of receive buffers. 00258 // 00259 iovec iov[RECEIVE_BUFFERS]; 00260 size_t vec_index = 0; 00261 size_t current = this->buffer_index_; 00262 00263 for (index = 0; 00264 index < RECEIVE_BUFFERS; 00265 ++index, current = this->successor_index(current)) { 00266 // Invariant. ASSERT? 00267 if (this->receive_buffers_[current] == 0) { 00268 ACE_ERROR_RETURN((LM_ERROR, 00269 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ") 00270 ACE_TEXT("receive buffer management detected.\n")), 00271 -1); 00272 } 00273 00274 #ifdef _MSC_VER 00275 #pragma warning(push) 00276 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here 00277 // since on other platforms iov_len is 64-bit 00278 #pragma warning(disable : 4267) 00279 #endif 00280 // This check covers the case where we have unread data in 00281 // the first buffer, but no space to write any more data. 00282 if (this->receive_buffers_[current]->space() > 0) { 00283 iov[vec_index].iov_len = this->receive_buffers_[current]->space(); 00284 iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr(); 00285 00286 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00287 "index==%d, len==%d, base==%x\n", 00288 vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base)); 00289 00290 vec_index++; 00291 } 00292 } 00293 00294 #ifdef _MSC_VER 00295 #pragma warning(pop) 00296 #endif 00297 00298 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00299 "Perform the recvv() call\n")); 00300 00301 // 00302 // Read into the buffers. 00303 // 00304 ACE_INET_Addr remote_address; 00305 ssize_t bytes_remaining = this->receive_bytes(iov, 00306 static_cast<int>(vec_index), 00307 remote_address, 00308 fd); 00309 00310 if (bytes_remaining < 0) { 00311 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ") 00312 ACE_TEXT("with data link detected: %p.\n"), 00313 ACE_TEXT("receive_bytes"))); 00314 00315 // The relink() will handle the connection to the ReconnectTask to do 00316 // the reconnect so this reactor thread will not be block. 00317 this->relink(); 00318 00319 // Close connection anyway. 00320 return -1; 00321 // Returning -1 takes the handle out of the reactor read mask. 00322 } 00323 00324 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00325 "recvv() return %d - we call this the bytes_remaining.\n", 00326 bytes_remaining), 5); 00327 00328 if (bytes_remaining == 0) { 00329 if (this->gracefully_disconnected_) { 00330 VDBG_LVL((LM_INFO, 00331 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n")) 00332 ,1); 00333 return -1; 00334 00335 } else { 00336 VDBG_LVL((LM_DEBUG, ACE_TEXT("(%P|%t) DBG: Unrecoverable problem ") 00337 ACE_TEXT("with data link detected\n")), 1); 00338 00339 // The relink() will handle the connection to the ReconnectTask to do 00340 // the reconnect so this reactor thread will not be block. 00341 this->relink(); 00342 00343 // Close connection anyway. 00344 return -1; 00345 // Returning -1 takes the handle out of the reactor read mask. 00346 } 00347 } 00348 00349 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00350 "START Adjust the message block chain pointers to account for the " 00351 "new data.\n"), 5); 00352 00353 // 00354 // Adjust the message block chain pointers to account for the new 00355 // data. 00356 // 00357 size_t bytes = bytes_remaining; 00358 00359 if (!this->pdu_remaining_) { 00360 this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes); 00361 } 00362 00363 for (index = this->buffer_index_; 00364 bytes > 0; 00365 index = this->successor_index(index)) { 00366 VDBG((LM_DEBUG,"(%P|%t) DBG: -> " 00367 "At top of for..loop block.\n")); 00368 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00369 "index == %d.\n", index)); 00370 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00371 "bytes == %d.\n", bytes)); 00372 00373 size_t amount 00374 = ace_min<size_t>(bytes, this->receive_buffers_[index]->space()); 00375 00376 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00377 "amount == %d.\n", amount)); 00378 00379 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00380 "this->receive_buffers_[index]->rd_ptr() == %u.\n", 00381 this->receive_buffers_[index]->rd_ptr())); 00382 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00383 "this->receive_buffers_[index]->wr_ptr() == %u.\n", 00384 this->receive_buffers_[index]->wr_ptr())); 00385 00386 this->receive_buffers_[index]->wr_ptr(amount); 00387 00388 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00389 "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n", 00390 this->receive_buffers_[index]->wr_ptr())); 00391 00392 bytes -= amount; 00393 00394 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00395 "Now, bytes == %d .\n", bytes)); 00396 00397 // This is yukky to do here, but there is a fine line between 00398 // where things are moved and where they are checked. 00399 if (bytes > 0 && this->successor_index(index) == this->buffer_index_) { 00400 // Here we have read more data than we passed in buffer. Bad. 00401 ACE_ERROR_RETURN((LM_ERROR, 00402 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ") 00403 ACE_TEXT("receive buffer management detected: ") 00404 ACE_TEXT("read more bytes than available.\n")), 00405 -1); 00406 } 00407 } 00408 00409 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00410 "DONE Adjust the message block chain pointers to account " 00411 "for the new data.\n")); 00412 00413 // 00414 // While the receive buffer is not empty: 00415 // 00416 // This is the receive buffer(s) that was just read. The receive 00417 // buffer message block is used to manage our parsing through this 00418 // data. When we have parsed out all the data, then it will be 00419 // considered empty. This message block read pointer indicates 00420 // where we are in the parsing process. Other message blocks refer 00421 // to the parsed data and retain the data block references to 00422 // prevent the data from being released until we have completed all 00423 // uses for the data in the receive buffer. This will normally be 00424 // after the sample data is demarshaled in the DataReader 00425 // components. 00426 // 00427 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00428 "Start processing the data we just received.\n")); 00429 00430 // 00431 // Operate while we have more data to process. 00432 // 00433 while (true) { 00434 00435 // 00436 // Manage the current transport header. 00437 // 00438 // This involves checking to see if we have remaining data in the 00439 // current packet. If not, or if we have not read a complete 00440 // header, then we read a transport header and check it for 00441 // validity. As we have a valid transport header with data 00442 // remaining to read in the packet that it represents, we move on. 00443 // 00444 // As new transport headers are to be read, they are read into a 00445 // message buffer member variable and demarshaled directly. The 00446 // values are retained for the lifetime of the processing of the 00447 // instant transport packet. The member message buffer allows us 00448 // to retain partially read transport headers until we can read 00449 // more data. 00450 // 00451 00452 // 00453 // Check the remaining transport packet length to see if we are 00454 // expecting a new one. 00455 // 00456 if (this->pdu_remaining_ == 0) { 00457 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00458 "We are expecting a transport packet header.\n")); 00459 00460 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00461 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00462 00463 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00464 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00465 "== %u.\n", 00466 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00467 00468 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00469 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00470 "== %u.\n", 00471 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00472 00473 if (this->receive_buffers_[this->buffer_index_]->total_length() 00474 < this->receive_transport_header_.max_marshaled_size()) { 00475 // 00476 // Not enough room in the buffer for the entire Transport 00477 // header that we need to read, so relinquish control until 00478 // we get more data. 00479 // 00480 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00481 "Not enough bytes read to account for a transport " 00482 "packet header. We are done here - we need to " 00483 "receive more bytes.\n")); 00484 00485 this->receive_transport_header_.incomplete( 00486 *this->receive_buffers_[this->buffer_index_]); 00487 00488 return 0; 00489 00490 } else { 00491 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00492 "We have enough bytes to demarshall the transport " 00493 "packet header.\n")); 00494 00495 // only do the hexdump if it will be printed - to not impact performance. 00496 if (Transport_debug_level > 5) { 00497 ACE_TCHAR xbuffer[4096]; 00498 const ACE_Message_Block& mb = 00499 *this->receive_buffers_[this->buffer_index_]; 00500 size_t xbytes = mb.length(); 00501 00502 xbytes = (std::min)(xbytes, TH::max_marshaled_size()); 00503 00504 ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer)); 00505 00506 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00507 "Hex Dump of transport header block " 00508 "(%d bytes):\n%s\n", xbytes, xbuffer)); 00509 } 00510 00511 // 00512 // Demarshal the transport header. 00513 // 00514 this->receive_transport_header_ = 00515 *this->receive_buffers_[this->buffer_index_]; 00516 00517 // 00518 // Check the TransportHeader. 00519 // 00520 if (!this->receive_transport_header_.valid()) { 00521 ACE_ERROR_RETURN((LM_ERROR, 00522 ACE_TEXT 00523 ("(%P|%t) ERROR: TransportHeader invalid.\n")), 00524 -1); 00525 } 00526 00527 this->good_pdu_ = check_header(this->receive_transport_header_); 00528 this->pdu_remaining_ = this->receive_transport_header_.length_; 00529 00530 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00531 "Amount of transport packet bytes (remaining): %d.\n", 00532 this->pdu_remaining_)); 00533 } 00534 } 00535 00536 // 00537 // Ignore bad PDUs by skipping over them. We do this out here 00538 // in case we did not skip over the entire bad PDU last time. 00539 // 00540 { 00541 int rtn_code = skip_bad_pdus(); 00542 if (rtn_code <= 0) return rtn_code; 00543 } 00544 00545 // 00546 // Keep processing samples while we have data to read. 00547 // 00548 while (this->pdu_remaining_ > 0) { 00549 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00550 "We have a transport packet now. There are more sample " 00551 "bytes to parse in order to complete the packet.\n")); 00552 00553 // 00554 // Manage the current sample header. 00555 // 00556 // This involves checking to see if we have remaining data in the 00557 // current sample. If not, or if we have not read a complete 00558 // header, then we read a sample header and check it for validity, 00559 // which currently involves checking the message type only. As we 00560 // have a valid sample header with data remaining to read in the 00561 // sample that it represents, we move on. 00562 // 00563 // As new sample headers are read, the are read into a message 00564 // buffer member variable and demarshaled directly. The values are 00565 // retained for the lifetime of the sample and are passed as part 00566 // of the recieve data sample itself. The member message buffer 00567 // allows us to retain partially read sample headers until we can 00568 // read more data. 00569 // 00570 if (this->receive_sample_remaining_ == 0) { 00571 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00572 "We are not working on some remaining sample data. " 00573 "We are expecting to parse a sample header now.\n")); 00574 00575 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00576 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00577 00578 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00579 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00580 "== %u.\n", 00581 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00582 00583 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00584 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00585 "== %u.\n", 00586 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00587 00588 if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) { 00589 // 00590 // Not enough room in the buffer for the entire Sample 00591 // header that we need to read, so relinquish control until 00592 // we get more data. 00593 // 00594 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00595 "Not enough bytes have been received to account " 00596 "for a complete sample header. We are done for " 00597 "now - we need to receive more data before we " 00598 "can go on.\n")); 00599 if (Transport_debug_level > 2) { 00600 ACE_TCHAR ebuffer[350]; 00601 ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_]; 00602 const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length()); 00603 ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer)); 00604 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: " 00605 "Partial DataSampleHeader:\n%s\n", ebuffer)); 00606 } 00607 00608 return 0; 00609 00610 } else { 00611 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00612 "We have received enough bytes for the sample " 00613 "header. Demarshall the sample header now.\n")); 00614 00615 // only do the hexdump if it will be printed - to not impact performance. 00616 if (Transport_debug_level > 5) { 00617 ACE_TCHAR ebuffer[4096]; 00618 ACE::format_hexdump 00619 (this->receive_buffers_[this->buffer_index_]->rd_ptr(), 00620 this->data_sample_header_.max_marshaled_size(), 00621 ebuffer, sizeof(ebuffer)); 00622 00623 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00624 "Hex Dump:\n%s\n", ebuffer)); 00625 } 00626 00627 this->data_sample_header_.pdu_remaining(this->pdu_remaining_); 00628 00629 // 00630 // Demarshal the sample header. 00631 // 00632 this->data_sample_header_ = 00633 *this->receive_buffers_[this->buffer_index_]; 00634 00635 // 00636 // Check the DataSampleHeader. 00637 // 00638 00639 this->good_pdu_ = check_header(data_sample_header_); 00640 00641 // 00642 // Set the amount to parse into the message buffer. We 00643 // can't just use the header value to keep track of 00644 // where we are since downstream processing will expect 00645 // the value to be correct (unadjusted by us). 00646 // 00647 this->receive_sample_remaining_ = 00648 this->data_sample_header_.message_length(); 00649 00650 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00651 "The demarshalled sample header says that we " 00652 "have %d bytes to read for the data portion of " 00653 "the sample.\n", 00654 this->receive_sample_remaining_)); 00655 00656 // 00657 // Decrement packet size. 00658 // 00659 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00660 "this->data_sample_header_.marshaled_size() " 00661 "== %d.\n", 00662 this->data_sample_header_.marshaled_size())); 00663 00664 this->pdu_remaining_ 00665 -= this->data_sample_header_.marshaled_size(); 00666 00667 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00668 "Amount of transport packet remaining: %d.\n", 00669 this->pdu_remaining_)); 00670 00671 int rtn_code = skip_bad_pdus(); 00672 if (rtn_code <= 0) return rtn_code; 00673 } 00674 } 00675 00676 bool last_buffer = false; 00677 update_buffer_index(last_buffer); 00678 00679 if (this->receive_sample_remaining_ > 0) { 00680 // 00681 // Manage the current sample data. 00682 // 00683 // This involves reading data to complete the current sample. As 00684 // samples are completed, they are dispatched via the 00685 // data_received() mechanism. This data is read into message 00686 // blocks that are obtained from the pool of message blocks since 00687 // the lifetime of this data will last until the DataReader 00688 // components demarshal the sample data. A reference to the 00689 // current sample being built is retained as a member to allow us 00690 // to hold partialy read samples until they are completed. 00691 // 00692 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00693 "Determine amount of data for the next block in the chain\n")); 00694 00695 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00696 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00697 00698 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00699 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00700 "== %u.\n", 00701 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00702 00703 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00704 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00705 "== %u.\n", 00706 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00707 // 00708 // Determine the amount of data for the next block in the chain. 00709 // 00710 const size_t amount = ace_min<size_t>( 00711 this->receive_sample_remaining_, 00712 this->receive_buffers_[this->buffer_index_]->length()); 00713 00714 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00715 "amount of data for the next block in the chain is %d\n", 00716 amount)); 00717 // 00718 // Now create a message block for the data in the current buffer 00719 // and chain it if we are starting a new sample. 00720 // 00721 ACE_Message_Block* current_sample_block = 0; 00722 ACE_NEW_MALLOC_RETURN( 00723 current_sample_block, 00724 (ACE_Message_Block*) this->mb_allocator_.malloc( 00725 sizeof(ACE_Message_Block)), 00726 ACE_Message_Block( 00727 this->receive_buffers_[this->buffer_index_] 00728 ->data_block()->duplicate(), 00729 0, 00730 &this->mb_allocator_), 00731 -1); 00732 00733 // 00734 // Chain it to the end of the current sample. 00735 // 00736 if (this->payload_ == 0) { 00737 this->payload_ = current_sample_block; 00738 00739 } else { 00740 ACE_Message_Block* block = this->payload_; 00741 00742 while (block->cont() != 0) { 00743 block = block->cont(); 00744 } 00745 00746 block->cont(current_sample_block); 00747 } 00748 00749 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00750 "Before adjustment of the pointers and byte counters\n")); 00751 00752 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00753 "this->payload_->rd_ptr() " 00754 "== %u.\n", 00755 this->payload_->rd_ptr())); 00756 00757 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00758 "this->payload_->wr_ptr() " 00759 "== %u.\n", 00760 this->payload_->wr_ptr())); 00761 00762 // 00763 // Adjust the pointers and byte counters. 00764 // 00765 current_sample_block->rd_ptr( 00766 this->receive_buffers_[this->buffer_index_]->rd_ptr()); 00767 current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount); 00768 this->receive_buffers_[this->buffer_index_]->rd_ptr(amount); 00769 this->receive_sample_remaining_ -= amount; 00770 this->pdu_remaining_ -= amount; 00771 00772 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00773 "After adjustment of the pointers and byte counters\n")); 00774 00775 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00776 "this->payload_->rd_ptr() " 00777 "== %u.\n", 00778 this->payload_->rd_ptr())); 00779 00780 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00781 "this->payload_->wr_ptr() " 00782 "== %u.\n", 00783 this->payload_->wr_ptr())); 00784 00785 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00786 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00787 "== %u.\n", 00788 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00789 00790 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00791 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00792 "== %u.\n", 00793 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00794 00795 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00796 "After adjustment, remaining sample bytes == %d\n", 00797 this->receive_sample_remaining_)); 00798 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00799 "After adjustment, remaining transport packet bytes == %d\n", 00800 this->pdu_remaining_)); 00801 } 00802 00803 // 00804 // Dispatch the received message if we have received it all. 00805 // 00806 // NB: Since we are doing this synchronously and without passing 00807 // ownership of the sample, we can use NULL mutex lock for 00808 // the allocators. Since only one thread can be in this 00809 // routine at a time, that is. 00810 // 00811 if (this->receive_sample_remaining_ == 0) { 00812 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00813 "Now dispatch the sample to the DataLink\n")); 00814 00815 ReceivedDataSample rds(this->payload_); 00816 this->payload_ = 0; // rds takes ownership of payload_ 00817 if (this->data_sample_header_.into_received_data_sample(rds)) { 00818 00819 if (this->data_sample_header_.more_fragments() 00820 || this->receive_transport_header_.last_fragment()) { 00821 VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n")); 00822 00823 if (this->reassemble(rds)) { 00824 VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n")); 00825 this->deliver_sample(rds, remote_address); 00826 } 00827 // If reassemble() returned false, it takes ownership of the data 00828 // just like deliver_sample() does. 00829 00830 } else { 00831 this->deliver_sample(rds, remote_address); 00832 } 00833 } 00834 00835 // For the reassembly algorithm, the 'last_fragment_' header bit only 00836 // applies to the first DataSampleHeader in the TransportHeader 00837 this->receive_transport_header_.last_fragment(false); 00838 00839 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00840 "Release the sample that we just sent.\n")); 00841 // ~ReceivedDataSample() releases the payload_ message block 00842 } 00843 00844 update_buffer_index(last_buffer); 00845 00846 if (last_buffer) { 00847 // Relinquish control if there is no more data to process. 00848 VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n")); 00849 return 0; 00850 } 00851 00852 } // End of while (this->pdu_remaining_ > 0) 00853 00854 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00855 "Let's try to do some more.\n")); 00856 } // End of while (true) 00857 00858 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00859 "It looks like we are done - the done loop has finished.\n")); 00860 // 00861 // Relinquish control. 00862 // 00863 // This involves ensuring that when we reenter this method, we will 00864 // pick up from where we left off correctly. 00865 // 00866 return 0; 00867 }
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining | ( | ) | const [inline, protected] |
Definition at line 89 of file TransportReceiveStrategy_T.h.
00089 { return this->pdu_remaining_; }
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble | ( | ReceivedDataSample & | data | ) | [inline, private, virtual] |
Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Definition at line 871 of file TransportReceiveStrategy_T.cpp.
References LM_WARNING.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00872 { 00873 ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() " 00874 "WARNING: derived class must override if specific transport type uses " 00875 "fragmentation and reassembly\n")); 00876 return false; 00877 }
virtual ssize_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [protected, pure virtual] |
Only our subclass knows how to do this.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
ACE_INLINE TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header | ( | ) | [inline] |
Definition at line 36 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.
00037 { 00038 return this->receive_transport_header_; 00039 }
ACE_INLINE const TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header | ( | ) | const [inline] |
Provides access to the received transport header for subclasses.
Definition at line 29 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.
Referenced by OpenDDS::DCPS::ReliableSession::nak_received(), OpenDDS::DCPS::ReliableSession::nakack_received(), OpenDDS::DCPS::MulticastDataLink::ready_to_deliver(), OpenDDS::DCPS::MulticastDataLink::sample_received(), OpenDDS::DCPS::MulticastSession::syn_received(), and OpenDDS::DCPS::MulticastSession::synack_received().
00030 { 00031 return this->receive_transport_header_; 00032 }
ACE_INLINE DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header | ( | ) | [inline] |
Definition at line 50 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.
00051 { 00052 return this->data_sample_header_; 00053 }
ACE_INLINE const DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header | ( | ) | const [inline] |
Provides access to the received sample header for subclasses.
Definition at line 43 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.
00044 { 00045 return this->data_sample_header_; 00046 }
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink | ( | bool | do_suspend = true |
) | [inline, virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.
Reimplemented in OpenDDS::DCPS::TcpReceiveStrategy.
Definition at line 64 of file TransportReceiveStrategy_T.inl.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00065 { 00066 // The subclass needs implement this function for re-establishing 00067 // the link upon recv failure. 00068 }
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset | ( | void | ) | [inline, protected] |
For datagram-based derived classes, reset() can be called to clear any state that may be remaining from parsing the previous datagram.
Definition at line 881 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, ACE_Message_Block::release(), and ACE_Message_Block::wr_ptr().
00882 { 00883 this->receive_sample_remaining_ = 0; 00884 ACE_Message_Block::release(this->payload_); 00885 this->payload_ = 0; 00886 this->good_pdu_ = true; 00887 this->pdu_remaining_ = 0; 00888 for (int i = 0; i < RECEIVE_BUFFERS; ++i) { 00889 ACE_Message_Block& rb = *this->receive_buffers_[i]; 00890 rb.rd_ptr(rb.wr_ptr()); 00891 } 00892 }
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus | ( | ) | [inline, protected] |
Ignore bad PDUs by skipping over them.
Definition at line 918 of file TransportReceiveStrategy_T.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, ACE_Message_Block::length(), LM_ERROR, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index().
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00919 { 00920 if (this->good_pdu_) return 1; 00921 00922 // 00923 // Adjust the message block chain pointers to account for the 00924 // skipped data. 00925 // 00926 for (size_t index = this->buffer_index_; 00927 this->pdu_remaining_ > 0; 00928 index = this->successor_index(index)) { 00929 size_t amount = 00930 ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length()); 00931 00932 this->receive_buffers_[index]->rd_ptr(amount); 00933 this->pdu_remaining_ -= amount; 00934 00935 if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) { 00936 ACE_ERROR_RETURN((LM_ERROR, 00937 ACE_TEXT("(%P|%t) ERROR: ") 00938 ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()") 00939 ACE_TEXT(" - Unrecoverably corrupted ") 00940 ACE_TEXT("receive buffer management detected: ") 00941 ACE_TEXT("read more bytes than available.\n")), 00942 -1); 00943 } 00944 } 00945 00946 this->receive_sample_remaining_ = 0; 00947 00948 this->receive_sample_remaining_ = 0; 00949 00950 bool done = false; 00951 update_buffer_index(done); 00952 return done ? 0 : 1; 00953 }
ACE_INLINE int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start | ( | void | ) | [inline, virtual] |
Implements OpenDDS::DCPS::TransportStrategy.
Definition at line 13 of file TransportReceiveStrategy_T.inl.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i().
00014 { 00015 DBG_ENTRY_LVL("TransportReceiveStrategy","start",6); 00016 return this->start_i(); 00017 }
virtual int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i | ( | ) | [protected, pure virtual] |
Let the subclass start.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start().
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop | ( | void | ) | [inline, virtual] |
Implements OpenDDS::DCPS::TransportStrategy.
Definition at line 21 of file TransportReceiveStrategy_T.inl.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i().
00022 { 00023 DBG_ENTRY_LVL("TransportReceiveStrategy","stop",6); 00024 this->stop_i(); 00025 }
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i | ( | ) | [protected, pure virtual] |
Let the subclass stop.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop().
ACE_INLINE size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index | ( | size_t | index | ) | const [inline, private] |
Manage an index into the receive buffer array.
Definition at line 57 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index().
00058 { 00059 return ++index % RECEIVE_BUFFERS; 00060 }
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index | ( | bool & | done | ) | [inline, private] |
Definition at line 896 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, LM_DEBUG, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), and VDBG.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
00897 { 00898 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00899 "Adjust the buffer chain in case we crossed into the next " 00900 "buffer after the last read(s).\n")); 00901 const size_t initial = this->buffer_index_; 00902 while (this->receive_buffers_[this->buffer_index_]->length() == 0) { 00903 this->buffer_index_ = this->successor_index(this->buffer_index_); 00904 00905 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00906 "Set this->buffer_index_ = %d.\n", 00907 this->buffer_index_)); 00908 00909 if (initial == this->buffer_index_) { 00910 done = true; // no other buffers in receive_buffers_ have data 00911 return; 00912 } 00913 } 00914 }
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_ [private] |
Current receive buffer index in use.
Definition at line 137 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().
TransportDataAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_ [private] |
Definition at line 128 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
DSH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_ [private] |
Current data sample header.
Definition at line 140 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header().
TransportDataBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_ [private] |
Definition at line 127 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_ [private] |
Flag indicating that the currently resident PDU is a good one (i.e. has not been received and processed previously). This is included in case we receive PDUs that were resent for reliability reasons and we receive one even if we have already processed it. This is a use case from multicast transports.
Definition at line 150 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_ [protected] |
Flag indicates if the GRACEFUL_DISCONNECT message is received.
Definition at line 92 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
TransportMessageBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_ [private] |
Definition at line 126 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_ [private] |
Definition at line 142 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset().
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_ [private] |
Amount of the current PDU that has not been processed yet.
Definition at line 153 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy<>::pdu_remaining(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_[RECEIVE_BUFFERS] [private] |
Set of receive buffers in use.
Definition at line 134 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_ [private] |
Locking strategy for the allocators.
Definition at line 131 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_ [private] |
Bytes remaining in the current DataSample.
Definition at line 104 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
TH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_ [private] |
Current receive TransportHeader.
Definition at line 107 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header().