#include <TransportReceiveStrategy_T.h>
Inheritance diagram for OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >:
Public Member Functions | |
virtual | ~TransportReceiveStrategy () |
int | start () |
void | stop () |
int | handle_dds_input (ACE_HANDLE fd) |
virtual void | relink (bool do_suspend=true) |
const TH & | received_header () const |
TH & | received_header () |
const DSH & | received_sample_header () const |
DSH & | received_sample_header () |
Protected Member Functions | |
TransportReceiveStrategy () | |
virtual ssize_t | receive_bytes (iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd)=0 |
Only our subclass knows how to do this. | |
virtual bool | check_header (const TH &header) |
Check the transport header for suitability. | |
virtual bool | check_header (const DSH &header) |
Check the data sample header for suitability. | |
virtual void | deliver_sample (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)=0 |
Called when there is a ReceivedDataSample to be delivered. | |
virtual int | start_i ()=0 |
Let the subclass start. | |
virtual void | stop_i ()=0 |
Let the subclass stop. | |
int | skip_bad_pdus () |
Ignore bad PDUs by skipping over them. | |
void | reset () |
size_t | pdu_remaining () const |
Protected Attributes | |
bool | gracefully_disconnected_ |
Flag indicates if the GRACEFUL_DISCONNECT message is received. | |
Private Types | |
RECEIVE_BUFFERS = 16 | |
BUFFER_LOW_WATER = 4096 | |
MESSAGE_BLOCKS = 1000 | |
DATA_BLOCKS = 100 | |
enum | { RECEIVE_BUFFERS = 16 } |
enum | { BUFFER_LOW_WATER = 4096 } |
enum | { MESSAGE_BLOCKS = 1000 } |
enum | { DATA_BLOCKS = 100 } |
Private Member Functions | |
size_t | successor_index (size_t index) const |
Manage an index into the receive buffer array. | |
void | update_buffer_index (bool &done) |
virtual bool | reassemble (ReceivedDataSample &data) |
Private Attributes | |
size_t | receive_sample_remaining_ |
Bytes remaining in the current DataSample. | |
TH | receive_transport_header_ |
Current receive TransportHeader. | |
TransportMessageBlockAllocator | mb_allocator_ |
TransportDataBlockAllocator | db_allocator_ |
TransportDataAllocator | data_allocator_ |
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > | receive_lock_ |
Locking strategy for the allocators. | |
ACE_Message_Block * | receive_buffers_ [RECEIVE_BUFFERS] |
Set of receive buffers in use. | |
size_t | buffer_index_ |
Current receive buffer index in use. | |
DSH | data_sample_header_ |
Current data sample header. | |
ACE_Message_Block * | payload_ |
bool | good_pdu_ |
size_t | pdu_remaining_ |
Amount of the current PDU that has not been processed yet. |
Definition at line 29 of file TransportReceiveStrategy_T.h.
anonymous enum [private] |
anonymous enum [private] |
anonymous enum [private] |
anonymous enum [private] |
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy | ( | ) | [virtual] |
Definition at line 48 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_.
00049 { 00050 DBG_ENTRY_LVL("TransportReceiveStrategy","~TransportReceiveStrategy",6); 00051 00052 if (this->receive_buffers_[this->buffer_index_] != 0) { 00053 size_t size = this->receive_buffers_[this->buffer_index_]->total_length(); 00054 00055 if (size > 0) { 00056 ACE_DEBUG((LM_WARNING, 00057 ACE_TEXT("(%P|%t) WARNING: TransportReceiveStrategy::~TransportReceiveStrategy() - ") 00058 ACE_TEXT("terminating with %d unprocessed bytes.\n"), 00059 size)); 00060 } 00061 } 00062 }
OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy | ( | ) | [protected] |
Definition at line 19 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::DATA_BLOCKS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::MESSAGE_BLOCKS, and OpenDDS::DCPS::Transport_debug_level.
00020 : gracefully_disconnected_(false), 00021 receive_sample_remaining_(0), 00022 mb_allocator_(MESSAGE_BLOCKS), 00023 db_allocator_(DATA_BLOCKS), 00024 data_allocator_(DATA_BLOCKS), 00025 buffer_index_(0), 00026 payload_(0), 00027 good_pdu_(true), 00028 pdu_remaining_(0) 00029 { 00030 DBG_ENTRY_LVL("TransportReceiveStrategy", "TransportReceiveStrategy" ,6); 00031 00032 if (Transport_debug_level >= 2) { 00033 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-mb" 00034 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00035 &mb_allocator_, MESSAGE_BLOCKS)); 00036 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-db" 00037 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00038 &db_allocator_, DATA_BLOCKS)); 00039 ACE_DEBUG((LM_DEBUG,"(%P|%t) TransportReceiveStrategy-data" 00040 " Cached_Allocator_With_Overflow %x with %d chunks\n", 00041 &data_allocator_, DATA_BLOCKS)); 00042 } 00043 00044 ACE_OS::memset(this->receive_buffers_, 0, sizeof(this->receive_buffers_)); 00045 }
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header | ( | const DSH & | header | ) | [protected, virtual] |
Check the data sample header for suitability.
Definition at line 73 of file TransportReceiveStrategy_T.cpp.
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header | ( | const TH & | header | ) | [protected, virtual] |
Check the transport header for suitability.
Definition at line 66 of file TransportReceiveStrategy_T.cpp.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) | [protected, pure virtual] |
Called when there is a ReceivedDataSample to be delivered.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input | ( | ACE_HANDLE | fd | ) |
Note that this is just an initial implementation. We may take some shortcuts (we will) that will need to be dealt with later once a more robust implementation can be put in place.
Our handle_dds_input() method is called by the reactor when there is data to be pulled from our peer() ACE_SOCK_Stream.
Definition at line 86 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::BUFFER_LOW_WATER, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::check_header(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_, DBG_ENTRY_LVL, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::deliver_sample(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, max_marshaled_size(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes(), OpenDDS::DCPS::RECEIVE_DATA_BUFFER_SIZE, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index(), OpenDDS::DCPS::Transport_debug_level, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::handle_input(), OpenDDS::DCPS::RtpsUdpReceiveStrategy::handle_input(), OpenDDS::DCPS::MulticastReceiveStrategy::handle_input(), and OpenDDS::DCPS::ShmemReceiveStrategy::read().
00087 { 00088 DBG_ENTRY_LVL("TransportReceiveStrategy", "handle_dds_input", 6); 00089 00090 // 00091 // What we will be doing here: 00092 // 00093 // 1) Read as much data as possible from the handle. 00094 // 2) Process each Transport layer packet 00095 // A) Parse the Transport header 00096 // B) Process each DataSample in the packet 00097 // a) Parse the DataSampleHeader 00098 // b) Parse the remainder of the sample 00099 // c) call data_received for complete samples 00100 // d) store any partial sample or header data 00101 // 3) return 00102 // 00103 // The state that we might need to maintain includes: 00104 // I) Current Transport layer header 00105 // II) Current DataSample header 00106 // III) Current DataSample data 00107 // 00108 // For each of these elements, they can be: 00109 // i) empty 00110 // ii) partial (not able to parse yet). 00111 // iii) complete (parsed values available). 00112 // 00113 // The read buffers will be a series of data buffers of a fixed size 00114 // that are each managed by an ACE_Data_Block, each of which is 00115 // managed by an ACE_Message_Block containing the read and write 00116 // pointers into the data. 00117 // 00118 // As messages are parsed, new ACE_Message_Blocks will be formed with 00119 // read and write pointers adjusted to reference the individual 00120 // DataSample buffer contents. 00121 // 00122 // The Underlying buffers, ACE_Data_Blocks, and ACE_Message_Blocks 00123 // will be acquired from a Cached_Allocater_with_overflow. The size 00124 // of the individual data buffers will be set initially to the 00125 // ethernet MTU to allow for full TCP messages to be received into 00126 // individual blocks. 00127 // 00128 // Reading will be performed with a two member struct iov, to allow 00129 // for the remainder of the current buffer to be used along with at 00130 // least one completely empty buffer. There will be a low water 00131 // parameter used to stop the use of the current block when the 00132 // available space in it is reduced. 00133 // 00134 00135 // 00136 // Establish the current receive buffer. 00137 // 00138 // This involves checking for any remainder from the previous trip 00139 // through the code. If there is no current buffer, or the buffer 00140 // has less than the low water mark space() left, then promote the 00141 // next receive buffer to the current. If none is present, create a 00142 // new one and use it. 00143 // 00144 // 00145 // Establish the next receive buffer. 00146 // 00147 // This involves checking for any previous complete buffers that 00148 // were not promoted in the previous step. If none are present, 00149 // create a new one and use that. 00150 // 00151 00152 // 00153 // Remove any buffers that have been completely read and have less 00154 // than the low water amount of space left. 00155 // 00156 size_t index; 00157 00158 for (index = 0; index < RECEIVE_BUFFERS; ++index) { 00159 if ((this->receive_buffers_[index] != 0) 00160 && (this->receive_buffers_[index]->length() == 0) 00161 && (this->receive_buffers_[index]->space() < BUFFER_LOW_WATER)) { 00162 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00163 "Remove a receive_buffer_[%d] from use.\n", 00164 index)); 00165 00166 // unlink any Message_Block that continues to this one 00167 // being removed. 00168 // This avoids a possible infinite ->cont() loop. 00169 for (size_t ii =0; ii < RECEIVE_BUFFERS; ii++) { 00170 if ((0 != this->receive_buffers_[ii]) && 00171 (this->receive_buffers_[ii]->cont() == 00172 this->receive_buffers_[index])) { 00173 this->receive_buffers_[ii]->cont(0); 00174 } 00175 } 00176 00177 // 00178 // Remove the receive buffer from use. 00179 // 00180 ACE_DES_FREE( 00181 this->receive_buffers_[index], 00182 this->mb_allocator_.free, 00183 ACE_Message_Block); 00184 this->receive_buffers_[index] = 0; 00185 } 00186 } 00187 00188 // 00189 // Allocate buffers for any empty slots. We may have emptied one just 00190 // here, but others may have been emptied by a large read during the 00191 // last trip through the code as well. 00192 // 00193 size_t previous = this->buffer_index_; 00194 00195 for (index = this->buffer_index_; 00196 this->successor_index(previous) != this->buffer_index_; 00197 index = this->successor_index(index)) { 00198 if (this->receive_buffers_[index] == 0) { 00199 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00200 "Allocate a Message_Block for new receive_buffer_[%d].\n", 00201 index)); 00202 00203 ACE_NEW_MALLOC_RETURN( 00204 this->receive_buffers_[index], 00205 (ACE_Message_Block*) this->mb_allocator_.malloc( 00206 sizeof(ACE_Message_Block)), 00207 ACE_Message_Block( 00208 RECEIVE_DATA_BUFFER_SIZE, // Buffer size 00209 ACE_Message_Block::MB_DATA, // Default 00210 0, // Start with no continuation 00211 0, // Let the constructor allocate 00212 &this->data_allocator_, // Our buffer cache 00213 &this->receive_lock_, // Our locking strategy 00214 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, // Default 00215 ACE_Time_Value::zero, // Default 00216 ACE_Time_Value::max_time, // Default 00217 &this->db_allocator_, // Our data block cache 00218 &this->mb_allocator_ // Our message block cache 00219 ), 00220 -1); 00221 } 00222 00223 // 00224 // Chain the buffers. This allows us to have portions of parsed 00225 // data cross buffer boundaries without a problem. 00226 // 00227 if (previous != index) { 00228 this->receive_buffers_[previous]->cont( 00229 this->receive_buffers_[index]); 00230 } 00231 00232 previous = index; 00233 } 00234 00235 // 00236 // Read from handle. 00237 // 00238 // This involves a non-blocking recvv_n(). Any remaining space in 00239 // the buffers should be saved for the next pass through the code. 00240 // If more than one receive buffer has data, chain them. Promote 00241 // all receive buffers so that the next pass through, the first 00242 // buffer with space remaining will appear as the current receive 00243 // buffer. 00244 // 00245 // This is probably what should remain in the specific 00246 // implementation, with the rest of this factored back up into the 00247 // framework. 00248 // 00249 00250 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00251 "Form the iovec from the message block\n")); 00252 00253 // 00254 // Form the iovec from the message block chain of receive buffers. 00255 // 00256 iovec iov[RECEIVE_BUFFERS]; 00257 size_t vec_index = 0; 00258 size_t current = this->buffer_index_; 00259 00260 for (index = 0; 00261 index < RECEIVE_BUFFERS; 00262 ++index, current = this->successor_index(current)) { 00263 // Invariant. ASSERT? 00264 if (this->receive_buffers_[current] == 0) { 00265 ACE_ERROR_RETURN((LM_ERROR, 00266 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ") 00267 ACE_TEXT("receive buffer management detected.\n")), 00268 -1); 00269 } 00270 00271 #ifdef _MSC_VER 00272 #pragma warning(push) 00273 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here 00274 // since on other platforms iov_len is 64-bit 00275 #pragma warning(disable : 4267) 00276 #endif 00277 // This check covers the case where we have unread data in 00278 // the first buffer, but no space to write any more data. 00279 if (this->receive_buffers_[current]->space() > 0) { 00280 iov[vec_index].iov_len = this->receive_buffers_[current]->space(); 00281 iov[vec_index].iov_base = this->receive_buffers_[current]->wr_ptr(); 00282 00283 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00284 "index==%d, len==%d, base==%x\n", 00285 vec_index, iov[vec_index].iov_len, iov[vec_index].iov_base)); 00286 00287 vec_index++; 00288 } 00289 } 00290 00291 #ifdef _MSC_VER 00292 #pragma warning(pop) 00293 #endif 00294 00295 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00296 "Perform the recvv() call\n")); 00297 00298 // 00299 // Read into the buffers. 00300 // 00301 ACE_INET_Addr remote_address; 00302 ssize_t bytes_remaining = this->receive_bytes(iov, 00303 static_cast<int>(vec_index), 00304 remote_address, 00305 fd); 00306 00307 if (bytes_remaining < 0) { 00308 ACE_ERROR((LM_WARNING, ACE_TEXT("(%P|%t) WARNING: Problem ") 00309 ACE_TEXT("with data link detected: %p.\n"), 00310 ACE_TEXT("receive_bytes"))); 00311 00312 // The relink() will handle the connection to the ReconnectTask to do 00313 // the reconnect so this reactor thread will not be block. 00314 this->relink(); 00315 00316 // Close connection anyway. 00317 return -1; 00318 // Returning -1 takes the handle out of the reactor read mask. 00319 } 00320 00321 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00322 "recvv() return %d - we call this the bytes_remaining.\n", 00323 bytes_remaining), 5); 00324 00325 if (bytes_remaining == 0) { 00326 if (this->gracefully_disconnected_) { 00327 VDBG_LVL((LM_INFO, 00328 ACE_TEXT("(%P|%t) Peer has gracefully disconnected.\n")) 00329 ,1); 00330 return -1; 00331 00332 } else { 00333 VDBG_LVL((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: Unrecoverable problem ") 00334 ACE_TEXT("with data link detected\n")), 1); 00335 00336 // The relink() will handle the connection to the ReconnectTask to do 00337 // the reconnect so this reactor thread will not be block. 00338 this->relink(); 00339 00340 // Close connection anyway. 00341 return -1; 00342 // Returning -1 takes the handle out of the reactor read mask. 00343 } 00344 } 00345 00346 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 00347 "START Adjust the message block chain pointers to account for the " 00348 "new data.\n"), 5); 00349 00350 // 00351 // Adjust the message block chain pointers to account for the new 00352 // data. 00353 // 00354 size_t bytes = bytes_remaining; 00355 00356 if (!this->pdu_remaining_) { 00357 this->receive_transport_header_.length_ = static_cast<ACE_UINT32>(bytes); 00358 } 00359 00360 for (index = this->buffer_index_; 00361 bytes > 0; 00362 index = this->successor_index(index)) { 00363 VDBG((LM_DEBUG,"(%P|%t) DBG: -> " 00364 "At top of for..loop block.\n")); 00365 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00366 "index == %d.\n", index)); 00367 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00368 "bytes == %d.\n", bytes)); 00369 00370 size_t amount 00371 = ace_min<size_t>(bytes, this->receive_buffers_[index]->space()); 00372 00373 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00374 "amount == %d.\n", amount)); 00375 00376 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00377 "this->receive_buffers_[index]->rd_ptr() == %u.\n", 00378 this->receive_buffers_[index]->rd_ptr())); 00379 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00380 "this->receive_buffers_[index]->wr_ptr() == %u.\n", 00381 this->receive_buffers_[index]->wr_ptr())); 00382 00383 this->receive_buffers_[index]->wr_ptr(amount); 00384 00385 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00386 "Now, this->receive_buffers_[index]->wr_ptr() == %u.\n", 00387 this->receive_buffers_[index]->wr_ptr())); 00388 00389 bytes -= amount; 00390 00391 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00392 "Now, bytes == %d .\n", bytes)); 00393 00394 // This is yukky to do here, but there is a fine line between 00395 // where things are moved and where they are checked. 00396 if (bytes > 0 && this->successor_index(index) == this->buffer_index_) { 00397 // Here we have read more data than we passed in buffer. Bad. 00398 ACE_ERROR_RETURN((LM_ERROR, 00399 ACE_TEXT("(%P|%t) ERROR: Unrecoverably corrupted ") 00400 ACE_TEXT("receive buffer management detected: ") 00401 ACE_TEXT("read more bytes than available.\n")), 00402 -1); 00403 } 00404 } 00405 00406 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00407 "DONE Adjust the message block chain pointers to account " 00408 "for the new data.\n")); 00409 00410 // 00411 // While the receive buffer is not empty: 00412 // 00413 // This is the receive buffer(s) that was just read. The receive 00414 // buffer message block is used to manage our parsing through this 00415 // data. When we have parsed out all the data, then it will be 00416 // considered empty. This message block read pointer indicates 00417 // where we are in the parsing process. Other message blocks refer 00418 // to the parsed data and retain the data block references to 00419 // prevent the data from being released until we have completed all 00420 // uses for the data in the receive buffer. This will normally be 00421 // after the sample data is demarshaled in the DataReader 00422 // components. 00423 // 00424 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00425 "Start processing the data we just received.\n")); 00426 00427 // 00428 // Operate while we have more data to process. 00429 // 00430 while (true) { 00431 00432 // 00433 // Manage the current transport header. 00434 // 00435 // This involves checking to see if we have remaining data in the 00436 // current packet. If not, or if we have not read a complete 00437 // header, then we read a transport header and check it for 00438 // validity. As we have a valid transport header with data 00439 // remaining to read in the packet that it represents, we move on. 00440 // 00441 // As new transport headers are to be read, they are read into a 00442 // message buffer member variable and demarshaled directly. The 00443 // values are retained for the lifetime of the processing of the 00444 // instant transport packet. The member message buffer allows us 00445 // to retain partially read transport headers until we can read 00446 // more data. 00447 // 00448 00449 // 00450 // Check the remaining transport packet length to see if we are 00451 // expecting a new one. 00452 // 00453 if (this->pdu_remaining_ == 0) { 00454 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00455 "We are expecting a transport packet header.\n")); 00456 00457 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00458 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00459 00460 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00461 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00462 "== %u.\n", 00463 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00464 00465 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00466 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00467 "== %u.\n", 00468 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00469 00470 if (this->receive_buffers_[this->buffer_index_]->total_length() 00471 < this->receive_transport_header_.max_marshaled_size()) { 00472 // 00473 // Not enough room in the buffer for the entire Transport 00474 // header that we need to read, so relinquish control until 00475 // we get more data. 00476 // 00477 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00478 "Not enough bytes read to account for a transport " 00479 "packet header. We are done here - we need to " 00480 "receive more bytes.\n")); 00481 00482 this->receive_transport_header_.incomplete( 00483 *this->receive_buffers_[this->buffer_index_]); 00484 00485 return 0; 00486 00487 } else { 00488 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00489 "We have enough bytes to demarshall the transport " 00490 "packet header.\n")); 00491 00492 // only do the hexdump if it will be printed - to not impact performance. 00493 if (Transport_debug_level > 5) { 00494 ACE_TCHAR xbuffer[4096]; 00495 const ACE_Message_Block& mb = 00496 *this->receive_buffers_[this->buffer_index_]; 00497 size_t xbytes = mb.length(); 00498 00499 xbytes = (std::min)(xbytes, TH::max_marshaled_size()); 00500 00501 ACE::format_hexdump(mb.rd_ptr(), xbytes, xbuffer, sizeof(xbuffer)); 00502 00503 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00504 "Hex Dump of transport header block " 00505 "(%d bytes):\n%s\n", xbytes, xbuffer)); 00506 } 00507 00508 // 00509 // Demarshal the transport header. 00510 // 00511 this->receive_transport_header_ = 00512 *this->receive_buffers_[this->buffer_index_]; 00513 00514 // 00515 // Check the TransportHeader. 00516 // 00517 if (!this->receive_transport_header_.valid()) { 00518 ACE_ERROR_RETURN((LM_ERROR, 00519 ACE_TEXT 00520 ("(%P|%t) ERROR: TransportHeader invalid.\n")), 00521 -1); 00522 } 00523 00524 this->good_pdu_ = check_header(this->receive_transport_header_); 00525 this->pdu_remaining_ = this->receive_transport_header_.length_; 00526 00527 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00528 "Amount of transport packet bytes (remaining): %d.\n", 00529 this->pdu_remaining_)); 00530 } 00531 } 00532 00533 // 00534 // Ignore bad PDUs by skipping over them. We do this out here 00535 // in case we did not skip over the entire bad PDU last time. 00536 // 00537 { 00538 int rtn_code = skip_bad_pdus(); 00539 if (rtn_code <= 0) return rtn_code; 00540 } 00541 00542 // 00543 // Keep processing samples while we have data to read. 00544 // 00545 while (this->pdu_remaining_ > 0) { 00546 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00547 "We have a transport packet now. There are more sample " 00548 "bytes to parse in order to complete the packet.\n")); 00549 00550 // 00551 // Manage the current sample header. 00552 // 00553 // This involves checking to see if we have remaining data in the 00554 // current sample. If not, or if we have not read a complete 00555 // header, then we read a sample header and check it for validity, 00556 // which currently involves checking the message type only. As we 00557 // have a valid sample header with data remaining to read in the 00558 // sample that it represents, we move on. 00559 // 00560 // As new sample headers are read, the are read into a message 00561 // buffer member variable and demarshaled directly. The values are 00562 // retained for the lifetime of the sample and are passed as part 00563 // of the recieve data sample itself. The member message buffer 00564 // allows us to retain partially read sample headers until we can 00565 // read more data. 00566 // 00567 if (this->receive_sample_remaining_ == 0) { 00568 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00569 "We are not working on some remaining sample data. " 00570 "We are expecting to parse a sample header now.\n")); 00571 00572 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00573 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00574 00575 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00576 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00577 "== %u.\n", 00578 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00579 00580 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00581 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00582 "== %u.\n", 00583 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00584 00585 if (DSH::partial(*this->receive_buffers_[this->buffer_index_])) { 00586 // 00587 // Not enough room in the buffer for the entire Sample 00588 // header that we need to read, so relinquish control until 00589 // we get more data. 00590 // 00591 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00592 "Not enough bytes have been received to account " 00593 "for a complete sample header. We are done for " 00594 "now - we need to receive more data before we " 00595 "can go on.\n")); 00596 if (Transport_debug_level > 2) { 00597 ACE_TCHAR ebuffer[350]; 00598 ACE_Message_Block& mb = *this->receive_buffers_[this->buffer_index_]; 00599 const size_t sz = (std::min)(DSH::max_marshaled_size(), mb.length()); 00600 ACE::format_hexdump(mb.rd_ptr(), sz, ebuffer, sizeof(ebuffer)); 00601 ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG: " 00602 "Partial DataSampleHeader:\n%s\n", ebuffer)); 00603 } 00604 00605 return 0; 00606 00607 } else { 00608 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00609 "We have received enough bytes for the sample " 00610 "header. Demarshall the sample header now.\n")); 00611 00612 // only do the hexdump if it will be printed - to not impact performance. 00613 if (Transport_debug_level > 5) { 00614 ACE_TCHAR ebuffer[4096]; 00615 ACE::format_hexdump 00616 (this->receive_buffers_[this->buffer_index_]->rd_ptr(), 00617 this->data_sample_header_.max_marshaled_size(), 00618 ebuffer, sizeof(ebuffer)); 00619 00620 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00621 "Hex Dump:\n%s\n", ebuffer)); 00622 } 00623 00624 this->data_sample_header_.pdu_remaining(this->pdu_remaining_); 00625 00626 // 00627 // Demarshal the sample header. 00628 // 00629 this->data_sample_header_ = 00630 *this->receive_buffers_[this->buffer_index_]; 00631 00632 // 00633 // Check the DataSampleHeader. 00634 // 00635 00636 this->good_pdu_ = check_header(data_sample_header_); 00637 00638 // 00639 // Set the amount to parse into the message buffer. We 00640 // can't just use the header value to keep track of 00641 // where we are since downstream processing will expect 00642 // the value to be correct (unadjusted by us). 00643 // 00644 this->receive_sample_remaining_ = 00645 this->data_sample_header_.message_length(); 00646 00647 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00648 "The demarshalled sample header says that we " 00649 "have %d bytes to read for the data portion of " 00650 "the sample.\n", 00651 this->receive_sample_remaining_)); 00652 00653 // 00654 // Decrement packet size. 00655 // 00656 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00657 "this->data_sample_header_.marshaled_size() " 00658 "== %d.\n", 00659 this->data_sample_header_.marshaled_size())); 00660 00661 this->pdu_remaining_ 00662 -= this->data_sample_header_.marshaled_size(); 00663 00664 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00665 "Amount of transport packet remaining: %d.\n", 00666 this->pdu_remaining_)); 00667 00668 int rtn_code = skip_bad_pdus(); 00669 if (rtn_code <= 0) return rtn_code; 00670 } 00671 } 00672 00673 bool last_buffer = false; 00674 update_buffer_index(last_buffer); 00675 00676 if (this->receive_sample_remaining_ > 0) { 00677 // 00678 // Manage the current sample data. 00679 // 00680 // This involves reading data to complete the current sample. As 00681 // samples are completed, they are dispatched via the 00682 // data_received() mechanism. This data is read into message 00683 // blocks that are obtained from the pool of message blocks since 00684 // the lifetime of this data will last until the DataReader 00685 // components demarshal the sample data. A reference to the 00686 // current sample being built is retained as a member to allow us 00687 // to hold partialy read samples until they are completed. 00688 // 00689 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00690 "Determine amount of data for the next block in the chain\n")); 00691 00692 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00693 "this->buffer_index_ == %d.\n",this->buffer_index_)); 00694 00695 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00696 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00697 "== %u.\n", 00698 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00699 00700 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00701 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00702 "== %u.\n", 00703 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00704 // 00705 // Determine the amount of data for the next block in the chain. 00706 // 00707 const size_t amount = ace_min<size_t>( 00708 this->receive_sample_remaining_, 00709 this->receive_buffers_[this->buffer_index_]->length()); 00710 00711 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00712 "amount of data for the next block in the chain is %d\n", 00713 amount)); 00714 // 00715 // Now create a message block for the data in the current buffer 00716 // and chain it if we are starting a new sample. 00717 // 00718 ACE_Message_Block* current_sample_block = 0; 00719 ACE_NEW_MALLOC_RETURN( 00720 current_sample_block, 00721 (ACE_Message_Block*) this->mb_allocator_.malloc( 00722 sizeof(ACE_Message_Block)), 00723 ACE_Message_Block( 00724 this->receive_buffers_[this->buffer_index_] 00725 ->data_block()->duplicate(), 00726 0, 00727 &this->mb_allocator_), 00728 -1); 00729 00730 // 00731 // Chain it to the end of the current sample. 00732 // 00733 if (this->payload_ == 0) { 00734 this->payload_ = current_sample_block; 00735 00736 } else { 00737 ACE_Message_Block* block = this->payload_; 00738 00739 while (block->cont() != 0) { 00740 block = block->cont(); 00741 } 00742 00743 block->cont(current_sample_block); 00744 } 00745 00746 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00747 "Before adjustment of the pointers and byte counters\n")); 00748 00749 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00750 "this->payload_->rd_ptr() " 00751 "== %u.\n", 00752 this->payload_->rd_ptr())); 00753 00754 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00755 "this->payload_->wr_ptr() " 00756 "== %u.\n", 00757 this->payload_->wr_ptr())); 00758 00759 // 00760 // Adjust the pointers and byte counters. 00761 // 00762 current_sample_block->rd_ptr( 00763 this->receive_buffers_[this->buffer_index_]->rd_ptr()); 00764 current_sample_block->wr_ptr(current_sample_block->rd_ptr() + amount); 00765 this->receive_buffers_[this->buffer_index_]->rd_ptr(amount); 00766 this->receive_sample_remaining_ -= amount; 00767 this->pdu_remaining_ -= amount; 00768 00769 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00770 "After adjustment of the pointers and byte counters\n")); 00771 00772 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00773 "this->payload_->rd_ptr() " 00774 "== %u.\n", 00775 this->payload_->rd_ptr())); 00776 00777 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00778 "this->payload_->wr_ptr() " 00779 "== %u.\n", 00780 this->payload_->wr_ptr())); 00781 00782 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00783 "this->receive_buffers_[this->buffer_index_]->rd_ptr() " 00784 "== %u.\n", 00785 this->receive_buffers_[this->buffer_index_]->rd_ptr())); 00786 00787 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00788 "this->receive_buffers_[this->buffer_index_]->wr_ptr() " 00789 "== %u.\n", 00790 this->receive_buffers_[this->buffer_index_]->wr_ptr())); 00791 00792 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00793 "After adjustment, remaining sample bytes == %d\n", 00794 this->receive_sample_remaining_)); 00795 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00796 "After adjustment, remaining transport packet bytes == %d\n", 00797 this->pdu_remaining_)); 00798 } 00799 00800 // 00801 // Dispatch the received message if we have received it all. 00802 // 00803 // NB: Since we are doing this synchronously and without passing 00804 // ownership of the sample, we can use NULL mutex lock for 00805 // the allocators. Since only one thread can be in this 00806 // routine at a time, that is. 00807 // 00808 if (this->receive_sample_remaining_ == 0) { 00809 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00810 "Now dispatch the sample to the DataLink\n")); 00811 00812 ReceivedDataSample rds(this->payload_); 00813 this->payload_ = 0; // rds takes ownership of payload_ 00814 if (this->data_sample_header_.into_received_data_sample(rds)) { 00815 00816 if (this->data_sample_header_.more_fragments() 00817 || this->receive_transport_header_.last_fragment()) { 00818 VDBG((LM_DEBUG,"(%P|%t) DBG: Attempt reassembly of fragments\n")); 00819 00820 if (this->reassemble(rds)) { 00821 VDBG((LM_DEBUG,"(%P|%t) DBG: Reassembled complete message\n")); 00822 this->deliver_sample(rds, remote_address); 00823 } 00824 // If reassemble() returned false, it takes ownership of the data 00825 // just like deliver_sample() does. 00826 00827 } else { 00828 this->deliver_sample(rds, remote_address); 00829 } 00830 } 00831 00832 // For the reassembly algorithm, the 'last_fragment_' header bit only 00833 // applies to the first DataSampleHeader in the TransportHeader 00834 this->receive_transport_header_.last_fragment(false); 00835 00836 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00837 "Release the sample that we just sent.\n")); 00838 // ~ReceivedDataSample() releases the payload_ message block 00839 } 00840 00841 update_buffer_index(last_buffer); 00842 00843 if (last_buffer) { 00844 // Relinquish control if there is no more data to process. 00845 VDBG((LM_DEBUG,"(%P|%t) DBG: We are done - no more data.\n")); 00846 return 0; 00847 } 00848 00849 } // End of while (this->pdu_remaining_ > 0) 00850 00851 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00852 "Let's try to do some more.\n")); 00853 } // End of while (true) 00854 00855 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00856 "It looks like we are done - the done loop has finished.\n")); 00857 // 00858 // Relinquish control. 00859 // 00860 // This involves ensuring that when we reenter this method, we will 00861 // pick up from where we left off correctly. 00862 // 00863 return 0; 00864 }
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining | ( | ) | const [inline, protected] |
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reassemble | ( | ReceivedDataSample & | data | ) | [private, virtual] |
Reimplemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Definition at line 868 of file TransportReceiveStrategy_T.cpp.
00869 { 00870 ACE_DEBUG((LM_WARNING, "(%P|%t) TransportReceiveStrategy::reassemble() " 00871 "WARNING: derived class must override if specific transport type uses " 00872 "fragmentation and reassembly\n")); 00873 return false; 00874 }
virtual ssize_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_bytes | ( | iovec | iov[], | |
int | n, | |||
ACE_INET_Addr & | remote_address, | |||
ACE_HANDLE | fd | |||
) | [protected, pure virtual] |
Only our subclass knows how to do this.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
ACE_INLINE TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header | ( | ) |
Definition at line 36 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.
00037 { 00038 return this->receive_transport_header_; 00039 }
ACE_INLINE const TH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header | ( | ) | const |
Provides access to the received transport header for subclasses.
Definition at line 29 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_.
Referenced by OpenDDS::DCPS::MulticastDataLink::ready_to_deliver(), OpenDDS::DCPS::UdpReceiveStrategy::reassemble(), OpenDDS::DCPS::MulticastReceiveStrategy::reassemble(), and OpenDDS::DCPS::MulticastDataLink::sample_received().
00030 { 00031 return this->receive_transport_header_; 00032 }
ACE_INLINE DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header | ( | ) |
Definition at line 50 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.
00051 { 00052 return this->data_sample_header_; 00053 }
ACE_INLINE const DSH & OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header | ( | ) | const |
Provides access to the received sample header for subclasses.
Definition at line 43 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_.
Referenced by OpenDDS::DCPS::RtpsUdpReceiveStrategy::deliver_sample(), and OpenDDS::DCPS::RtpsUdpReceiveStrategy::reassemble().
00044 { 00045 return this->data_sample_header_; 00046 }
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::relink | ( | bool | do_suspend = true |
) | [virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when recv returns an error.
Reimplemented in OpenDDS::DCPS::TcpReceiveStrategy.
Definition at line 64 of file TransportReceiveStrategy_T.inl.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00065 { 00066 // The subclass needs implement this function for re-establishing 00067 // the link upon recv failure. 00068 }
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset | ( | ) | [protected] |
For datagram-based derived classes, reset() can be called to clear any state that may be remaining from parsing the previous datagram.
Definition at line 878 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_.
Referenced by OpenDDS::DCPS::MulticastReceiveStrategy::handle_input().
00879 { 00880 this->receive_sample_remaining_ = 0; 00881 ACE_Message_Block::release(this->payload_); 00882 this->payload_ = 0; 00883 this->good_pdu_ = true; 00884 this->pdu_remaining_ = 0; 00885 for (int i = 0; i < RECEIVE_BUFFERS; ++i) { 00886 ACE_Message_Block& rb = *this->receive_buffers_[i]; 00887 rb.rd_ptr(rb.wr_ptr()); 00888 } 00889 }
int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus | ( | ) | [protected] |
Ignore bad PDUs by skipping over them.
Definition at line 915 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_, OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index().
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00916 { 00917 if (this->good_pdu_) return 1; 00918 00919 // 00920 // Adjust the message block chain pointers to account for the 00921 // skipped data. 00922 // 00923 for (size_t index = this->buffer_index_; 00924 this->pdu_remaining_ > 0; 00925 index = this->successor_index(index)) { 00926 size_t amount = 00927 ace_min<size_t>(this->pdu_remaining_, this->receive_buffers_[index]->length()); 00928 00929 this->receive_buffers_[index]->rd_ptr(amount); 00930 this->pdu_remaining_ -= amount; 00931 00932 if (this->pdu_remaining_ > 0 && this->successor_index(index) == this->buffer_index_) { 00933 ACE_ERROR_RETURN((LM_ERROR, 00934 ACE_TEXT("(%P|%t) ERROR: ") 00935 ACE_TEXT("TransportReceiveStrategy::skip_bad_pdus()") 00936 ACE_TEXT(" - Unrecoverably corrupted ") 00937 ACE_TEXT("receive buffer management detected: ") 00938 ACE_TEXT("read more bytes than available.\n")), 00939 -1); 00940 } 00941 } 00942 00943 this->receive_sample_remaining_ = 0; 00944 00945 this->receive_sample_remaining_ = 0; 00946 00947 bool done = false; 00948 update_buffer_index(done); 00949 return done ? 0 : 1; 00950 }
ACE_INLINE int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start | ( | ) | [virtual] |
Implements OpenDDS::DCPS::TransportStrategy.
Definition at line 13 of file TransportReceiveStrategy_T.inl.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i().
Referenced by OpenDDS::DCPS::ShmemReceiveStrategy::read().
00014 { 00015 DBG_ENTRY_LVL("TransportReceiveStrategy","start",6); 00016 return this->start_i(); 00017 }
virtual int OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start_i | ( | ) | [protected, pure virtual] |
Let the subclass start.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::start().
ACE_INLINE void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop | ( | ) | [virtual] |
Implements OpenDDS::DCPS::TransportStrategy.
Definition at line 21 of file TransportReceiveStrategy_T.inl.
References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i().
00022 { 00023 DBG_ENTRY_LVL("TransportReceiveStrategy","stop",6); 00024 this->stop_i(); 00025 }
virtual void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop_i | ( | ) | [protected, pure virtual] |
Let the subclass stop.
Implemented in OpenDDS::DCPS::MulticastReceiveStrategy, OpenDDS::DCPS::RtpsUdpReceiveStrategy, OpenDDS::DCPS::ShmemReceiveStrategy, OpenDDS::DCPS::TcpReceiveStrategy, and OpenDDS::DCPS::UdpReceiveStrategy.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::stop().
ACE_INLINE size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::successor_index | ( | size_t | index | ) | const [private] |
Manage an index into the receive buffer array.
Definition at line 57 of file TransportReceiveStrategy_T.inl.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::RECEIVE_BUFFERS.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input().
00058 { 00059 return ++index % RECEIVE_BUFFERS; 00060 }
void OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index | ( | bool & | done | ) | [private] |
Definition at line 893 of file TransportReceiveStrategy_T.cpp.
References OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_, and VDBG.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
00894 { 00895 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00896 "Adjust the buffer chain in case we crossed into the next " 00897 "buffer after the last read(s).\n")); 00898 const size_t initial = this->buffer_index_; 00899 while (this->receive_buffers_[this->buffer_index_]->length() == 0) { 00900 this->buffer_index_ = this->successor_index(this->buffer_index_); 00901 00902 VDBG((LM_DEBUG,"(%P|%t) DBG: " 00903 "Set this->buffer_index_ = %d.\n", 00904 this->buffer_index_)); 00905 00906 if (initial == this->buffer_index_) { 00907 done = true; // no other buffers in receive_buffers_ have data 00908 return; 00909 } 00910 } 00911 }
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::buffer_index_ [private] |
Current receive buffer index in use.
Definition at line 135 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::update_buffer_index(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().
TransportDataAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_allocator_ [private] |
Definition at line 126 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
DSH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::data_sample_header_ [private] |
Current data sample header.
Definition at line 138 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_sample_header().
TransportDataBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::db_allocator_ [private] |
Definition at line 125 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::good_pdu_ [private] |
Flag indicating that the currently resident PDU is a good one (i.e. has not been received and processed previously). This is included in case we receive PDUs that were resent for reliability reasons and we receive one even if we have already processed it. This is a use case from multicast transports.
Definition at line 148 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset().
bool OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::gracefully_disconnected_ [protected] |
Flag indicates if the GRACEFUL_DISCONNECT message is received.
Definition at line 90 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TcpReceiveStrategy::gracefully_disconnected(), and OpenDDS::DCPS::ShmemReceiveStrategy::receive_bytes().
TransportMessageBlockAllocator OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::mb_allocator_ [private] |
Definition at line 124 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::TransportReceiveStrategy().
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::payload_ [private] |
Definition at line 140 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset().
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::pdu_remaining_ [private] |
Amount of the current PDU that has not been processed yet.
Definition at line 151 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< OpenDDS::DCPS::RtpsTransportHeader, OpenDDS::DCPS::RtpsSampleHeader >::pdu_remaining(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
ACE_Message_Block* OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_buffers_[RECEIVE_BUFFERS] [private] |
Set of receive buffers in use.
Definition at line 132 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::~TransportReceiveStrategy().
ACE_Lock_Adapter<ACE_SYNCH_MUTEX> OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_lock_ [private] |
size_t OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_sample_remaining_ [private] |
Bytes remaining in the current DataSample.
Definition at line 102 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::reset(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::skip_bad_pdus().
TH OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::receive_transport_header_ [private] |
Current receive TransportHeader.
Definition at line 105 of file TransportReceiveStrategy_T.h.
Referenced by OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::handle_dds_input(), and OpenDDS::DCPS::TransportReceiveStrategy< TH, DSH >::received_header().