#include <PacketRemoveVisitor.h>
Public Member Functions | |
PacketRemoveVisitor (const TransportQueueElement::MatchCriteria &match, ACE_Message_Block *&unsent_head_block, ACE_Message_Block *header_block, MessageBlockAllocator &mb_allocator, DataBlockAllocator &db_allocator) | |
virtual | ~PacketRemoveVisitor () |
virtual int | visit_element_ref (TransportQueueElement *&element) |
RemoveResult | status () const |
Private Attributes | |
const TransportQueueElement::MatchCriteria & | match_ |
The sample that needs to be removed. | |
ACE_Message_Block *& | head_ |
The head block of the chain of unsent blocks in the packet. | |
ACE_Message_Block * | header_block_ |
RemoveResult | status_ |
Holds the status of our visit. | |
ACE_Message_Block * | current_block_ |
ACE_Message_Block * | previous_block_ |
MessageBlockAllocator & | replaced_element_mb_allocator_ |
Cached allocator for DataSampleHeader message block. | |
DataBlockAllocator & | replaced_element_db_allocator_ |
Cached allocator for DataSampleHeader data block. |
Definition at line 26 of file PacketRemoveVisitor.h.
OpenDDS::DCPS::PacketRemoveVisitor::PacketRemoveVisitor | ( | const TransportQueueElement::MatchCriteria & | match, | |
ACE_Message_Block *& | unsent_head_block, | |||
ACE_Message_Block * | header_block, | |||
MessageBlockAllocator & | mb_allocator, | |||
DataBlockAllocator & | db_allocator | |||
) |
Definition at line 22 of file PacketRemoveVisitor.cpp.
References DBG_ENTRY_LVL.
00028 : match_(match) 00029 , head_(unsent_head_block) 00030 , header_block_(header_block) 00031 , status_(REMOVE_NOT_FOUND) 00032 , current_block_(0) 00033 , previous_block_(0) 00034 , replaced_element_mb_allocator_(mb_allocator) 00035 , replaced_element_db_allocator_(db_allocator) 00036 { 00037 DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6); 00038 }
OpenDDS::DCPS::PacketRemoveVisitor::~PacketRemoveVisitor | ( | ) | [virtual] |
Definition at line 40 of file PacketRemoveVisitor.cpp.
References DBG_ENTRY_LVL.
00041 { 00042 DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6); 00043 }
ACE_INLINE OpenDDS::DCPS::RemoveResult OpenDDS::DCPS::PacketRemoveVisitor::status | ( | void | ) | const |
Definition at line 11 of file PacketRemoveVisitor.inl.
References DBG_ENTRY_LVL, and status_.
Referenced by OpenDDS::DCPS::TransportSendStrategy::do_remove_sample(), and OpenDDS::DCPS::SingleSendBuffer::retain_buffer().
00012 { 00013 DBG_ENTRY_LVL("PacketRemoveVisitor", "status", 6); 00014 return this->status_; 00015 }
int OpenDDS::DCPS::PacketRemoveVisitor::visit_element_ref | ( | TransportQueueElement *& | element | ) | [virtual] |
The BasicQueue<T>::accept_remove_visitor() method will call this visit_element() method for each element in the queue.
Reimplemented from OpenDDS::DCPS::BasicQueueVisitor< TransportQueueElement >.
Definition at line 46 of file PacketRemoveVisitor.cpp.
References ACE_Message_Block::base(), ACE_Message_Block::cont(), current_block_, OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, ACE_Message_Block::duplicate(), head_, header_block_, LM_DEBUG, LM_ERROR, match_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), OpenDDS::DCPS::TransportQueueElement::msg(), previous_block_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, replaced_element_db_allocator_, replaced_element_mb_allocator_, status_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::unique(), and VDBG.
00047 { 00048 DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6); 00049 00050 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00051 "Obtain the element_blocks using element->msg()\n")); 00052 00053 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00054 "The element is [%0x]\n", 00055 element)); 00056 00057 // These is the head of the chain of "source" blocks from the element 00058 // currently being visited. 00059 ACE_Message_Block* element_blocks = 00060 const_cast<ACE_Message_Block*>(element->msg()); 00061 00062 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00063 "element_blocks == [%0x]\n", element_blocks)); 00064 00065 // As we visit an element, we also adjust our current_block_ and 00066 // previous_block_ data members such that we can correlate the current 00067 // element (being visited) with the message blocks that it contributed 00068 // to the unsent packet blocks. Our head_ data member was set to the 00069 // first unsent block in the chain of blocks that make up the remaining 00070 // portions of the "packet". 00071 00072 if (this->current_block_ == 0) { 00073 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00074 "this->current_block_ == 0. Visiting first element.\n")); 00075 00076 // This must be our first visit_element() call. Set up the 00077 // current_block_ and previous_block_ data members appropriately. 00078 this->current_block_ = this->head_; 00079 this->previous_block_ = 0; 00080 00081 // If the current_block_ is still zero, there is nothing to do here, 00082 // so cancel the visitation 00083 if (this->current_block_ == 0) { 00084 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00085 "No blocks to iterate through, ending visitation.\n")); 00086 return 0; 00087 } 00088 00089 // There is a chance that the head_ block (and thus the current_block_) 00090 // is actually a duplicate of the packet header_block_. If so, we 00091 // need to adjust the current_block_ and previous_block_ appropriately. 00092 if (this->header_block_->base() == this->current_block_->base()) { 00093 // Yup. Just what we thought may be the case. 00094 this->previous_block_ = this->current_block_; 00095 this->current_block_ = this->previous_block_->cont(); 00096 } 00097 00098 } else { 00099 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00100 "this->current_block_ != 0. Visiting element other than " 00101 "the first element.\n")); 00102 00103 // We are visiting an element that is not the very first element in 00104 // the packet. 00105 00106 // Let's get the previous_block_ data member set to point to the 00107 // block in the packet chain that is the predecessor to the first 00108 // block in the packet chain that was contributed from the current 00109 // element. 00110 this->previous_block_ = this->current_block_; 00111 00112 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00113 "Set previous_block_ to the current_block_ [%0x]\n", 00114 this->previous_block_)); 00115 00116 // Keep changing the previous_block_ to the next block in the chain 00117 // until we know that the next block in the chain is a duplicate of 00118 // the block at the front of the element_blocks chain. 00119 while (this->previous_block_->cont()->base() != element_blocks->base()) { 00120 this->previous_block_ = this->previous_block_->cont(); 00121 00122 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00123 "Moved previous_block_ to its cont() block [%0x]\n", 00124 this->previous_block_)); 00125 } 00126 00127 // At this point, we know that the previous_block_ is the block 00128 // that immediately precedes the first block contributed by the 00129 // element that we are currently visiting. Set the current_block_ 00130 // to point to the first block contributed by the element. 00131 this->current_block_ = this->previous_block_->cont(); 00132 00133 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00134 "Set current_block_ to the previous_block_->cont() [%0x]\n", 00135 this->current_block_)); 00136 } 00137 00138 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00139 "Does the current element match the sample to be replaced?\n")); 00140 00141 // Does the current element (being visited) match the sample that we 00142 // need to remove? 00143 if (this->match_.matches(*element)) { 00144 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00145 "YES - The element matches the sample\n")); 00146 00147 // We get inside here if the element we are currently visiting is 00148 // the element that "matches" the sample that needs to be removed. 00149 00150 // At this point, the current_block_ points to the first block in 00151 // the packet that is known to have been contributed by the 00152 // element that we are currently visiting. 00153 00154 // The previous_block_ is either pointing to the block in the packet 00155 // that immediately precedes the current_block_, or the previous_block_ 00156 // is set to 0 indicating that the current_block_ also happens to 00157 // be the head_ block in the packet (and the element we are visiting 00158 // is the first element (remaining) in the packet). 00159 00160 // Our goal now is to extract the blocks from the packet that have 00161 // been contributed by the element that we are currently visiting. 00162 // Then, we will need to replace those blocks in the packet with 00163 // our own blocks. How the replacement blocks are created depends 00164 // upon whether or not we are visiting the first element in the packet. 00165 00166 // The original_blocks will end up being a chain of blocks 00167 // extracted from the packet, all of which were contributed by 00168 // the current element being visited. 00169 ACE_Message_Block* original_blocks = this->current_block_; 00170 00171 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00172 "Set original_blocks to this->current_block_ [%0x]\n", 00173 original_blocks)); 00174 00175 // The remaining_chain will end up being the chain of blocks that 00176 // followed the original blocks in the packet. There is always 00177 // the possibility that the remaining chain will be 0, meaning that 00178 // we are currently visiting (and removing) the last element in the 00179 // packet. 00180 ACE_Message_Block* remaining_chain = this->current_block_->cont(); 00181 00182 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00183 "Set remaining_chain to this->current_block_->cont() [%0x]\n", 00184 remaining_chain)); 00185 00186 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00187 "Set original_blocks->cont(0)\n")); 00188 00189 // At this point, we only know for sure that one block was 00190 // contributed by the element currently being visited. 00191 original_blocks->cont(0); 00192 00193 unsigned int num_elem_blocks_sent = 0; 00194 00195 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00196 "Set num_elem_blocks_sent to 0\n")); 00197 00198 // The original_blocks_tail is a pointer to the last block in the 00199 // chain of blocks contributed by the element currently being visited. 00200 ACE_Message_Block* original_blocks_tail = original_blocks; 00201 00202 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00203 "Set original_blocks_tail to original_blocks [%0x]\n", 00204 original_blocks_tail)); 00205 00206 // Find the block in the element_blocks that contributed the 00207 // block pointed to by the original_blocks_tail. 00208 ACE_Message_Block* contrib_block = element_blocks; 00209 00210 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00211 "Set contrib_block to element_blocks [%0x]\n", 00212 contrib_block)); 00213 00214 // Loop through each block in the element_blocks until we either 00215 // find the contributing element block, or we have checked all of the 00216 // element_blocks, and never found the contributing element block. 00217 while (contrib_block != 0) { 00218 if (contrib_block->base() == original_blocks->base()) { 00219 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00220 "contrib_block->base() == original_blocks->base()\n")); 00221 // Ok. We have found the source block. 00222 break; 00223 } 00224 00225 // That wasn't a match. Try the next contrib_block to see 00226 // if it is the contributing block for the block at the top of 00227 // the original_blocks chain (which is a chain of 1 at this point). 00228 contrib_block = contrib_block->cont(); 00229 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00230 "Move contrib_block to contrib_block->cont() [%0x]\n", 00231 contrib_block)); 00232 ++num_elem_blocks_sent; 00233 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00234 "num_elem_blocks_sent incremented to %d\n", 00235 num_elem_blocks_sent)); 00236 } 00237 00238 // Sanity check - make sure that we found the contributing block 00239 // in the current element (being visited) for the contributed block 00240 // that is the lone block in the original_blocks chain. 00241 if (contrib_block == 0) { 00242 ACE_ERROR((LM_ERROR, 00243 "(%P|%t) ERROR: Element queue and unsent message block " 00244 "chain is out-of-sync. source_block == 0.\n")); 00245 00246 // Set the status to indicate a fatal error occurred. 00247 this->status_ = REMOVE_ERROR; 00248 00249 // Stop vistation now. 00250 return 0; 00251 } 00252 00253 // Now that we have identified the contributing block for the 00254 // single block in the original_blocks chain, we may need to add 00255 // more blocks to the original_blocks chain - one more block for 00256 // each additional block chained to the element's contributing block. 00257 // Note that this while loop doesn't do anything if the contrib_block 00258 // is the last contributing block in the element. In this case, the 00259 // original_blocks contains the lone block that was contributed by 00260 // the lone (last) contributing block in the element - and the 00261 // remaining_chain properly points to the remaining blocks in the 00262 // packet. 00263 while (contrib_block->cont() != 0) { 00264 // The source element block indicates that it has a "next" 00265 // block that would have also contributed a block to the packet. 00266 00267 // This means that there is a block at the front of the 00268 // remaining_chain of blocks that really should be part of the 00269 // original_blocks. 00270 00271 // Sanity check - the remaining_chain better not be NULL (0). 00272 if (remaining_chain == 0) { 00273 ACE_ERROR((LM_ERROR, 00274 "(%P|%t) ERROR: Element queue and unsent message block " 00275 "chain is out-of-synch. remaining_chain == 0.\n")); 00276 this->status_ = REMOVE_ERROR; 00277 return 0; 00278 } 00279 00280 // Extract/unchain the first block from the remaining_chain. 00281 ACE_Message_Block* additional_block = remaining_chain; 00282 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00283 "Extracted additional_block from remaining_chain [%0x]\n", 00284 additional_block)); 00285 00286 remaining_chain = remaining_chain->cont(); 00287 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00288 "Move remaining_chain to remaining_chain->cont() [%0x]\n", 00289 remaining_chain)); 00290 00291 additional_block->cont(0); 00292 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00293 "Set additional_block->cont(0)\n")); 00294 00295 // Attach the block to the end of the original_blocks chain. 00296 original_blocks_tail->cont(additional_block); 00297 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00298 "original_blocks_tail->cont(additional_block)\n")); 00299 00300 original_blocks_tail = additional_block; 00301 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00302 "Set original_blocks_tail to additional_block [%0x]\n", 00303 original_blocks_tail)); 00304 00305 // Advance to the next contributing block. 00306 contrib_block = contrib_block->cont(); 00307 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00308 "Move contrib_block to contrib_block->cont() [%0x]\n", 00309 contrib_block)); 00310 } 00311 00312 // Finally!At this point we have broken the unsent packet chain of 00313 // blocks into three seperate chains: 00314 // 00315 // (1) this->previous_block_ is either 0, or it points to the block 00316 // (from the unsent packet chain) that immediately preceded the 00317 // first block (from the unsent packet chain) that was contributed 00318 // by the sample (that we need to replace). 00319 // this->previous_block_ is 0 when the contributed blocks from 00320 // the sample (that we are replacing) are the first blocks from 00321 // the unsent packet chain. 00322 // 00323 // Thus, sub-chain (1) is either an empty chain (when 00324 // this->previous_block_ is 0), or it is a chain that starts 00325 // with the head_ block (the first block from the unsent packet 00326 // chain), and ends with the this->previous_block_. 00327 // 00328 // (2) original_blocks points to the first block (from the unsent 00329 // packet chain) that was contributed by the sample (that we 00330 // need to replace). 00331 // 00332 // Thus, sub-chain (2) is a chain that starts with the block 00333 // pointed to by original_blocks. 00334 // 00335 // (3) remaining_chain points to the first block (from the unsent 00336 // packet chain) that followed the last block that was 00337 // contributed by the sample (that we need to replace). 00338 // 00339 // Thus, sub-chain (3) is a chain that starts with the block 00340 // pointed to by remaining_chain. Note that this may be 0 if 00341 // the sample being replaced is the last sample in the packet. 00342 // 00343 // If sub-chains (1), (2), and (3) were chained together (in that 00344 // order), we would end up with the original unsent packet chain. 00345 // Whew. 00346 00347 // Now we can perform our replacement duties. 00348 00349 // Save off the pointer to the original element 00350 TransportQueueElement* orig_elem = element; 00351 00352 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00353 "Create the new TransportReplacedElement using the " 00354 "orig_elem [%0x]\n", 00355 orig_elem)); 00356 00357 // Create the replacement element for the original element. 00358 element = new 00359 TransportReplacedElement(orig_elem, 00360 &this->replaced_element_mb_allocator_, 00361 &this->replaced_element_db_allocator_); 00362 00363 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00364 "The new TransportReplacedElement is [%0x]\n", 00365 element)); 00366 00367 // Now we have to deal with replacing the original_blocks chain 00368 // with duplicates from the msg() chain of the replacement element. 00369 00370 ACE_Message_Block* replacement_element_blocks = 00371 const_cast<ACE_Message_Block*>(element->msg()); 00372 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00373 "Set replacement_element_blocks to the replacement element's " 00374 "msg() [%0x]\n", 00375 replacement_element_blocks)); 00376 00377 // Move through the chain to account for the num_elem_blocks_sent 00378 for (unsigned int i = 0; i < num_elem_blocks_sent; i++) { 00379 replacement_element_blocks = replacement_element_blocks->cont(); 00380 00381 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00382 "Moved replacement_element_blocks to its cont() block " 00383 "[%0x]\n", 00384 replacement_element_blocks)); 00385 } 00386 00387 // Make a duplicate of the replacement_element_blocks chain 00388 ACE_Message_Block* replacement_blocks = 00389 replacement_element_blocks->duplicate(); 00390 00391 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00392 "Set replacement_blocks to duplicate of " 00393 "replacement_element_blocks [%0x]\n", 00394 replacement_blocks)); 00395 00396 // Now adjust the block at the front of the replacement_blocks chain 00397 // to match the block at the front of the original_blocks chain - 00398 // with respect to the difference between the rd_ptr() setting and 00399 // the base() setting. 00400 size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base(); 00401 00402 if (rd_offset > 0) { 00403 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00404 "Call replacement_blocks->rd_ptr(rd_offset) with " 00405 "rd_offset == [%d]\n", 00406 rd_offset)); 00407 replacement_blocks->rd_ptr(rd_offset); 00408 } 00409 00410 // Find the last block (the tail) in the replacement_blocks chain 00411 ACE_Message_Block* replacement_blocks_tail = replacement_blocks; 00412 00413 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00414 "Set replacement_blocks_tail to replacement_blocks " 00415 "[%0x]\n", 00416 replacement_blocks_tail)); 00417 00418 while (replacement_blocks_tail->cont() != 0) { 00419 replacement_blocks_tail = replacement_blocks_tail->cont(); 00420 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00421 "Moved replacement_blocks_tail to its cont() block " 00422 "[%0x]\n", 00423 replacement_blocks_tail)); 00424 } 00425 00426 // Now we can stitch the unsent packet chain back together using the 00427 // replacement blocks instead of the orig_blocks. 00428 replacement_blocks_tail->cont(remaining_chain); 00429 00430 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00431 "Stitched replacement_blocks_tail to remaining_chain.\n")); 00432 00433 if (this->previous_block_ == 0) { 00434 // Replacing blocks at the head of the unsent packet chain. 00435 this->head_ = replacement_blocks; 00436 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00437 "Replacing blocks at head of unsent packet chain.\n")); 00438 00439 } else { 00440 // Replacing blocks not at the head of the unsent packet chain. 00441 this->previous_block_->cont(replacement_blocks); 00442 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00443 "Replacing blocks not at head of unsent packet chain.\n")); 00444 } 00445 00446 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00447 "Release the original_blocks.\n")); 00448 00449 // Release the chain of original blocks. 00450 original_blocks->release(); 00451 00452 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00453 "Tell original element that data_dropped().\n")); 00454 00455 // Tell the original element (that we replaced), data_dropped() 00456 // by transport. 00457 // This visitor is used in TransportSendStrategy::do_remove_sample 00458 // and TransportSendBuffer::retain_all. In former case, the sample 00459 // is dropped as a result of writer's remove_sample call. In the 00460 // later case, the dropped_by_transport is not used as the sample 00461 // is retained sample and no callback is made to writer. 00462 this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND; 00463 00464 if (this->status_ == REMOVE_RELEASED || this->match_.unique()) { 00465 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00466 "Return 0 to halt visitation.\n")); 00467 // Replace a single sample if one is specified, otherwise visit the 00468 // entire queue replacing each sample with the specified 00469 // publication Id value. 00470 return 0; 00471 } 00472 } 00473 00474 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00475 "Return 1 to continue visitation.\n")); 00476 00477 // Continue visitation. 00478 return 1; 00479 }
This is the message block in the chain that corresponds to the current (non-head) element being visited.
Definition at line 61 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
The head block of the chain of unsent blocks in the packet.
Definition at line 50 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
The packet header block that was duplicate()'d to form the first block in the packet.
Definition at line 54 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
The sample that needs to be removed.
Definition at line 47 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
This is the message block in the chain that has its cont() set to the current_block_.
Definition at line 65 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
Cached allocator for DataSampleHeader data block.
Definition at line 70 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
Cached allocator for DataSampleHeader message block.
Definition at line 68 of file PacketRemoveVisitor.h.
Referenced by visit_element_ref().
Holds the status of our visit.
Definition at line 57 of file PacketRemoveVisitor.h.
Referenced by status(), and visit_element_ref().