#include <PacketRemoveVisitor.h>
Inheritance diagram for OpenDDS::DCPS::PacketRemoveVisitor:
Public Member Functions | |
PacketRemoveVisitor (const TransportQueueElement::MatchCriteria &match, ACE_Message_Block *&unsent_head_block, ACE_Message_Block *header_block, TransportReplacedElementAllocator &allocator, MessageBlockAllocator &mb_allocator, DataBlockAllocator &db_allocator) | |
virtual | ~PacketRemoveVisitor () |
virtual int | visit_element_ref (TransportQueueElement *&element) |
RemoveResult | status () const |
Private Attributes | |
const TransportQueueElement::MatchCriteria & | match_ |
The sample that needs to be removed. | |
ACE_Message_Block *& | head_ |
The head block of the chain of unsent blocks in the packet. | |
ACE_Message_Block * | header_block_ |
RemoveResult | status_ |
Holds the status of our visit. | |
ACE_Message_Block * | current_block_ |
ACE_Message_Block * | previous_block_ |
TransportReplacedElementAllocator & | replaced_element_allocator_ |
Cached allocator for TransportReplaceElement. | |
MessageBlockAllocator & | replaced_element_mb_allocator_ |
Cached allocator for DataSampleHeader message block. | |
DataBlockAllocator & | replaced_element_db_allocator_ |
Cached allocator for DataSampleHeader data block. |
Definition at line 24 of file PacketRemoveVisitor.h.
OpenDDS::DCPS::PacketRemoveVisitor::PacketRemoveVisitor | ( | const TransportQueueElement::MatchCriteria & | match, | |
ACE_Message_Block *& | unsent_head_block, | |||
ACE_Message_Block * | header_block, | |||
TransportReplacedElementAllocator & | allocator, | |||
MessageBlockAllocator & | mb_allocator, | |||
DataBlockAllocator & | db_allocator | |||
) |
Definition at line 20 of file PacketRemoveVisitor.cpp.
References DBG_ENTRY_LVL.
00027 : match_(match) 00028 , head_(unsent_head_block) 00029 , header_block_(header_block) 00030 , status_(REMOVE_NOT_FOUND) 00031 , current_block_(0) 00032 , previous_block_(0) 00033 , replaced_element_allocator_(allocator) 00034 , replaced_element_mb_allocator_(mb_allocator) 00035 , replaced_element_db_allocator_(db_allocator) 00036 { 00037 DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6); 00038 }
OpenDDS::DCPS::PacketRemoveVisitor::~PacketRemoveVisitor | ( | ) | [virtual] |
Definition at line 40 of file PacketRemoveVisitor.cpp.
References DBG_ENTRY_LVL.
00041 { 00042 DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6); 00043 }
ACE_INLINE OpenDDS::DCPS::RemoveResult OpenDDS::DCPS::PacketRemoveVisitor::status | ( | ) | const |
Definition at line 11 of file PacketRemoveVisitor.inl.
References DBG_ENTRY_LVL, and status_.
Referenced by OpenDDS::DCPS::TransportSendStrategy::do_remove_sample(), and OpenDDS::DCPS::SingleSendBuffer::retain_buffer().
00012 { 00013 DBG_ENTRY_LVL("PacketRemoveVisitor", "status", 6); 00014 return this->status_; 00015 }
int OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref | ( | TransportQueueElement *& | element | ) | [virtual] |
The BasicQueue<T>::accept_remove_visitor() method will call this visit_element() method for each element in the queue.
Definition at line 46 of file PacketRemoveVisitor.cpp.
References current_block_, OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, head_, OpenDDS::DCPS::TransportQueueElement::msg(), previous_block_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, status_, and VDBG.
00047 { 00048 DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6); 00049 00050 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00051 "Obtain the element_blocks using element->msg()\n")); 00052 00053 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00054 "The element is [%0x]\n", 00055 element)); 00056 00057 // These is the head of the chain of "source" blocks from the element 00058 // currently being visited. 00059 ACE_Message_Block* element_blocks = 00060 const_cast<ACE_Message_Block*>(element->msg()); 00061 00062 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00063 "element_blocks == [%0x]\n", element_blocks)); 00064 00065 // As we visit an element, we also adjust our current_block_ and 00066 // previous_block_ data members such that we can correlate the current 00067 // element (being visited) with the message blocks that it contributed 00068 // to the unsent packet blocks. Our head_ data member was set to the 00069 // first unsent block in the chain of blocks that make up the remaining 00070 // portions of the "packet". 00071 00072 if (this->current_block_ == 0) { 00073 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00074 "this->current_block_ == 0. Visiting first element.\n")); 00075 00076 // This must be our first visit_element() call. Set up the 00077 // current_block_ and previous_block_ data members appropriately. 00078 this->current_block_ = this->head_; 00079 this->previous_block_ = 0; 00080 00081 // If the current_block_ is still zero, there is nothing to do here, 00082 // so cancel the visitation 00083 if (this->current_block_ == 0) { 00084 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00085 "No blocks to iterate through, ending visitation.\n")); 00086 return 0; 00087 } 00088 00089 // There is a chance that the head_ block (and thus the current_block_) 00090 // is actually a duplicate of the packet header_block_. If so, we 00091 // need to adust the current_block_ and previous_block_ appropriately. 00092 if (this->header_block_->base() == this->current_block_->base()) { 00093 // Yup. Just what we thought may be the case. 00094 this->previous_block_ = this->current_block_; 00095 this->current_block_ = this->previous_block_->cont(); 00096 } 00097 00098 } else { 00099 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00100 "this->current_block_ != 0. Visiting element other than " 00101 "the first element.\n")); 00102 00103 // We are visiting an element that is not the very first element in 00104 // the packet. 00105 00106 // Let's get the previous_block_ data member set to point to the 00107 // block in the packet chain that is the predecessor to the first 00108 // block in the packet chain that was contributed from the current 00109 // element. 00110 this->previous_block_ = this->current_block_; 00111 00112 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00113 "Set previous_block_ to the current_block_ [%0x]\n", 00114 this->previous_block_)); 00115 00116 // Keep changing the previous_block_ to the next block in the chain 00117 // until we know that the next block in the chain is a duplicate of 00118 // the block at the front of the element_blocks chain. 00119 while (this->previous_block_->cont()->base() != element_blocks->base()) { 00120 this->previous_block_ = this->previous_block_->cont(); 00121 00122 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00123 "Moved previous_block_ to its cont() block [%0x]\n", 00124 this->previous_block_)); 00125 } 00126 00127 // At this point, we know that the previous_block_ is the block 00128 // that immediately precedes the first block contributed by the 00129 // element that we are currently visiting. Set the current_block_ 00130 // to point to the first block contributed by the element. 00131 this->current_block_ = this->previous_block_->cont(); 00132 00133 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00134 "Set current_block_ to the previous_block_->cont() [%0x]\n", 00135 this->current_block_)); 00136 } 00137 00138 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00139 "Does the current element match the sample to be replaced?\n")); 00140 00141 // Does the current element (being visited) match the sample that we 00142 // need to remove? 00143 if (this->match_.matches(*element)) { 00144 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00145 "YES - The element matches the sample\n")); 00146 00147 // We get inside here if the element we are currently visiting is 00148 // the element that "matches" the sample that needs to be removed. 00149 00150 // At this point, the current_block_ points to the first block in 00151 // the packet that is known to have been contributed by the 00152 // element that we are currently visiting. 00153 00154 // The previous_block_ is either pointing to the block in the packet 00155 // that immediately precedes the current_block_, or the previous_block_ 00156 // is set to 0 indicating that the current_block_ also happens to 00157 // be the head_ block in the packet (and the element we are visiting 00158 // is the first element (remaining) in the packet). 00159 00160 // Our goal now is to extract the blocks from the packet that have 00161 // been contributed by the element that we are currently visiting. 00162 // Then, we will need to replace those blocks in the packet with 00163 // our own blocks. How the replacement blocks are created depends 00164 // upon whether or not we are visiting the first element in the packet. 00165 00166 // The original_blocks will end up being a chain of blocks 00167 // extracted from the packet, all of which were contributed by 00168 // the current element being visited. 00169 ACE_Message_Block* original_blocks = this->current_block_; 00170 00171 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00172 "Set original_blocks to this->current_block_ [%0x]\n", 00173 original_blocks)); 00174 00175 // The remaining_chain will end up being the chain of blocks that 00176 // followed the original blocks in the packet. There is always 00177 // the possibility that the remaining chain will be 0, meaning that 00178 // we are currently visiting (and removing) the last element in the 00179 // packet. 00180 ACE_Message_Block* remaining_chain = this->current_block_->cont(); 00181 00182 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00183 "Set remaining_chain to this->current_block_->cont() [%0x]\n", 00184 remaining_chain)); 00185 00186 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00187 "Set original_blocks->cont(0)\n")); 00188 00189 // At this point, we only know for sure that one block was 00190 // contributed by the element currently being visited. 00191 original_blocks->cont(0); 00192 00193 unsigned int num_elem_blocks_sent = 0; 00194 00195 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00196 "Set num_elem_blocks_sent to 0\n")); 00197 00198 // The original_blocks_tail is a pointer to the last block in the 00199 // chain of blocks contributed by the element currently being visited. 00200 ACE_Message_Block* original_blocks_tail = original_blocks; 00201 00202 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00203 "Set original_blocks_tail to original_blocks [%0x]\n", 00204 original_blocks_tail)); 00205 00206 // Find the block in the element_blocks that contributed the 00207 // block pointed to by the original_blocks_tail. 00208 ACE_Message_Block* contrib_block = element_blocks; 00209 00210 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00211 "Set contrib_block to element_blocks [%0x]\n", 00212 contrib_block)); 00213 00214 // Loop through each block in the element_blocks until we either 00215 // find the contributing element block, or we have checked all of the 00216 // element_blocks, and never found the contributing element block. 00217 while (contrib_block != 0) { 00218 if (contrib_block->base() == original_blocks->base()) { 00219 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00220 "contrib_block->base() == original_blocks->base()\n")); 00221 // Ok. We have found the source block. 00222 break; 00223 } 00224 00225 // That wasn't a match. Try the next contrib_block to see 00226 // if it is the contributing block for the block at the top of 00227 // the original_blocks chain (which is a chain of 1 at this point). 00228 contrib_block = contrib_block->cont(); 00229 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00230 "Move contrib_block to contrib_block->cont() [%0x]\n", 00231 contrib_block)); 00232 ++num_elem_blocks_sent; 00233 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00234 "num_elem_blocks_sent incremented to %d\n", 00235 num_elem_blocks_sent)); 00236 } 00237 00238 // Sanity check - make sure that we found the contributing block 00239 // in the current element (being visited) for the contributed block 00240 // that is the lone block in the original_blocks chain. 00241 if (contrib_block == 0) { 00242 ACE_ERROR((LM_ERROR, 00243 "(%P|%t) ERROR: Element queue and unsent message block " 00244 "chain is out-of-sync. source_block == 0.\n")); 00245 00246 // Set the status to indicate a fatal error occurred. 00247 this->status_ = REMOVE_ERROR; 00248 00249 // Stop vistation now. 00250 return 0; 00251 } 00252 00253 // Now that we have identified the contributing block for the 00254 // single block in the original_blocks chain, we may need to add 00255 // more blocks to the original_blocks chain - one more block for 00256 // each additional block chained to the element's contributing block. 00257 // Note that this while loop doesn't do anything if the contrib_block 00258 // is the last contributing block in the element. In this case, the 00259 // original_blocks contains the lone block that was contributed by 00260 // the lone (last) contributing block in the element - and the 00261 // remaining_chain properly points to the remaining blocks in the 00262 // packet. 00263 while (contrib_block->cont() != 0) { 00264 // The source element block indicates that it has a "next" 00265 // block that would have also contributed a block to the packet. 00266 00267 // This means that there is a block at the front of the 00268 // remaining_chain of blocks that really should be part of the 00269 // original_blocks. 00270 00271 // Sanity check - the remaining_chain better not be NULL (0). 00272 if (remaining_chain == 0) { 00273 ACE_ERROR((LM_ERROR, 00274 "(%P|%t) ERROR: Element queue and unsent message block " 00275 "chain is out-of-synch. remaining_chain == 0.\n")); 00276 this->status_ = REMOVE_ERROR; 00277 return 0; 00278 } 00279 00280 // Extract/unchain the first block from the remaining_chain. 00281 ACE_Message_Block* additional_block = remaining_chain; 00282 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00283 "Extracted additional_block from remaining_chain [%0x]\n", 00284 additional_block)); 00285 00286 remaining_chain = remaining_chain->cont(); 00287 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00288 "Move remaining_chain to remaining_chain->cont() [%0x]\n", 00289 remaining_chain)); 00290 00291 additional_block->cont(0); 00292 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00293 "Set additional_block->cont(0)\n")); 00294 00295 // Attach the block to the end of the original_blocks chain. 00296 original_blocks_tail->cont(additional_block); 00297 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00298 "original_blocks_tail->cont(additional_block)\n")); 00299 00300 original_blocks_tail = additional_block; 00301 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00302 "Set original_blocks_tail to additional_block [%0x]\n", 00303 original_blocks_tail)); 00304 00305 // Advance to the next contributing block. 00306 contrib_block = contrib_block->cont(); 00307 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00308 "Move contrib_block to contrib_block->cont() [%0x]\n", 00309 contrib_block)); 00310 } 00311 00312 // Finally!At this point we have broken the unsent packet chain of 00313 // blocks into three seperate chains: 00314 // 00315 // (1) this->previous_block_ is either 0, or it points to the block 00316 // (from the unsent packet chain) that immediately preceded the 00317 // first block (from the unsent packet chain) that was contributed 00318 // by the sample (that we need to replace). 00319 // this->previous_block_ is 0 when the contributed blocks from 00320 // the sample (that we are replacing) are the first blocks from 00321 // the unsent packet chain. 00322 // 00323 // Thus, sub-chain (1) is either an empty chain (when 00324 // this->previous_block_ is 0), or it is a chain that starts 00325 // with the head_ block (the first block from the unsent packet 00326 // chain), and ends with the this->previous_block_. 00327 // 00328 // (2) original_blocks points to the first block (from the unsent 00329 // packet chain) that was contributed by the sample (that we 00330 // need to replace). 00331 // 00332 // Thus, sub-chain (2) is a chain that starts with the block 00333 // pointed to by original_blocks. 00334 // 00335 // (3) remaining_chain points to the first block (from the unsent 00336 // packet chain) that followed the last block that was 00337 // contributed by the sample (that we need to replace). 00338 // 00339 // Thus, sub-chain (3) is a chain that starts with the block 00340 // pointed to by remaining_chain. Note that this may be 0 if 00341 // the sample being replaced is the last sample in the packet. 00342 // 00343 // If sub-chains (1), (2), and (3) were chained together (in that 00344 // order), we would end up with the original unsent packet chain. 00345 // Whew. 00346 00347 // Now we can perform our replacement duties. 00348 00349 // Save off the pointer to the original element 00350 TransportQueueElement* orig_elem = element; 00351 00352 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00353 "Create the new TransportReplacedElement using the " 00354 "orig_elem [%0x]\n", 00355 orig_elem)); 00356 00357 // Create the replacement element for the original element. 00358 ACE_NEW_MALLOC_NORETURN( 00359 element, 00360 (TransportQueueElement*)this->replaced_element_allocator_.malloc(), 00361 TransportReplacedElement(orig_elem, &this->replaced_element_allocator_, 00362 &this->replaced_element_mb_allocator_, 00363 &this->replaced_element_db_allocator_)); 00364 if (element == 0) { 00365 // Set fatal error and stop visitation. 00366 this->status_ = REMOVE_ERROR; 00367 return 0; 00368 } 00369 00370 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00371 "The new TransportReplacedElement is [%0x]\n", 00372 element)); 00373 00374 // Now we have to deal with replacing the original_blocks chain 00375 // with duplicates from the msg() chain of the replacement element. 00376 00377 ACE_Message_Block* replacement_element_blocks = 00378 const_cast<ACE_Message_Block*>(element->msg()); 00379 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00380 "Set replacement_element_blocks to the replacement element's " 00381 "msg() [%0x]\n", 00382 replacement_element_blocks)); 00383 00384 // Move through the chain to account for the num_elem_blocks_sent 00385 for (unsigned int i = 0; i < num_elem_blocks_sent; i++) { 00386 replacement_element_blocks = replacement_element_blocks->cont(); 00387 00388 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00389 "Moved replacement_element_blocks to its cont() block " 00390 "[%0x]\n", 00391 replacement_element_blocks)); 00392 } 00393 00394 // Make a duplicate of the replacement_element_blocks chain 00395 ACE_Message_Block* replacement_blocks = 00396 replacement_element_blocks->duplicate(); 00397 00398 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00399 "Set replacement_blocks to duplicate of " 00400 "replacement_element_blocks [%0x]\n", 00401 replacement_blocks)); 00402 00403 // Now adjust the block at the front of the replacement_blocks chain 00404 // to match the block at the front of the original_blocks chain - 00405 // with respect to the difference between the rd_ptr() setting and 00406 // the base() setting. 00407 size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base(); 00408 00409 if (rd_offset > 0) { 00410 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00411 "Call replacement_blocks->rd_ptr(rd_offset) with " 00412 "rd_offset == [%d]\n", 00413 rd_offset)); 00414 replacement_blocks->rd_ptr(rd_offset); 00415 } 00416 00417 // Find the last block (the tail) in the replacement_blocks chain 00418 ACE_Message_Block* replacement_blocks_tail = replacement_blocks; 00419 00420 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00421 "Set replacement_blocks_tail to replacement_blocks " 00422 "[%0x]\n", 00423 replacement_blocks_tail)); 00424 00425 while (replacement_blocks_tail->cont() != 0) { 00426 replacement_blocks_tail = replacement_blocks_tail->cont(); 00427 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00428 "Moved replacement_blocks_tail to its cont() block " 00429 "[%0x]\n", 00430 replacement_blocks_tail)); 00431 } 00432 00433 // Now we can stitch the unsent packet chain back together using the 00434 // replacement blocks instead of the orig_blocks. 00435 replacement_blocks_tail->cont(remaining_chain); 00436 00437 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00438 "Stitched replacement_blocks_tail to remaining_chain.\n")); 00439 00440 if (this->previous_block_ == 0) { 00441 // Replacing blocks at the head of the unsent packet chain. 00442 this->head_ = replacement_blocks; 00443 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00444 "Replacing blocks at head of unsent packet chain.\n")); 00445 00446 } else { 00447 // Replacing blocks not at the head of the unsent packet chain. 00448 this->previous_block_->cont(replacement_blocks); 00449 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00450 "Replacing blocks not at head of unsent packet chain.\n")); 00451 } 00452 00453 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00454 "Release the original_blocks.\n")); 00455 00456 // Release the chain of original blocks. 00457 original_blocks->release(); 00458 00459 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00460 "Tell original element that data_dropped().\n")); 00461 00462 // Tell the original element (that we replaced), data_dropped() 00463 // by transport. 00464 // This visitor is used in TransportSendStrategy::do_remove_sample 00465 // and TransportSendBuffer::retain_all. In former case, the sample 00466 // is dropped as a result of writer's remove_sample call. In the 00467 // later case, the dropped_by_transport is not used as the sample 00468 // is retained sample and no callback is made to writer. 00469 this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND; 00470 00471 if (this->status_ == REMOVE_RELEASED || this->match_.unique()) { 00472 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00473 "Return 0 to halt visitation.\n")); 00474 // Replace a single sample if one is specified, otherwise visit the 00475 // entire queue replacing each sample with the specified 00476 // publication Id value. 00477 return 0; 00478 } 00479 } 00480 00481 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00482 "Return 1 to continue visitation.\n")); 00483 00484 // Continue visitation. 00485 return 1; 00486 }
ACE_Message_Block* OpenDDS::DCPS::PacketRemoveVisitor::current_block_ [private] |
This is the message block in the chain that corresponds to the current (non-head) element being visited.
Definition at line 60 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
ACE_Message_Block*& OpenDDS::DCPS::PacketRemoveVisitor::head_ [private] |
The head block of the chain of unsent blocks in the packet.
Definition at line 49 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
ACE_Message_Block* OpenDDS::DCPS::PacketRemoveVisitor::header_block_ [private] |
The packet header block that was duplicate()'d to form the first block in the packet.
Definition at line 53 of file PacketRemoveVisitor.h.
ACE_Message_Block* OpenDDS::DCPS::PacketRemoveVisitor::previous_block_ [private] |
This is the message block in the chain that has its cont() set to the current_block_.
Definition at line 64 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
Cached allocator for DataSampleHeader data block.
Definition at line 71 of file PacketRemoveVisitor.h.
Cached allocator for DataSampleHeader message block.
Definition at line 69 of file PacketRemoveVisitor.h.
Holds the status of our visit.
Definition at line 56 of file PacketRemoveVisitor.h.
Referenced by status(), and visit_element_ref().