PacketRemoveVisitor.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "PacketRemoveVisitor.h"
00010 #include "TransportRetainedElement.h"
00011 #include "ace/Message_Block.h"
00012 
00013 #if !defined (__ACE_INLINE__)
00014 #include "PacketRemoveVisitor.inl"
00015 #endif /* __ACE_INLINE__ */
00016 
00017 namespace OpenDDS {
00018 namespace DCPS {
00019 
00020 PacketRemoveVisitor::PacketRemoveVisitor(
00021   const TransportQueueElement::MatchCriteria& match,
00022   ACE_Message_Block*& unsent_head_block,
00023   ACE_Message_Block* header_block,
00024   TransportReplacedElementAllocator& allocator,
00025   MessageBlockAllocator& mb_allocator,
00026   DataBlockAllocator& db_allocator)
00027   : match_(match)
00028   , head_(unsent_head_block)
00029   , header_block_(header_block)
00030   , status_(REMOVE_NOT_FOUND)
00031   , current_block_(0)
00032   , previous_block_(0)
00033   , replaced_element_allocator_(allocator)
00034   , replaced_element_mb_allocator_(mb_allocator)
00035   , replaced_element_db_allocator_(db_allocator)
00036 {
00037   DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6);
00038 }
00039 
00040 PacketRemoveVisitor::~PacketRemoveVisitor()
00041 {
00042   DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6);
00043 }
00044 
00045 int
00046 PacketRemoveVisitor::visit_element_ref(TransportQueueElement*& element)
00047 {
00048   DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6);
00049 
00050   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00051         "Obtain the element_blocks using element->msg()\n"));
00052 
00053   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00054         "The element is [%0x]\n",
00055         element));
00056 
00057   // These is the head of the chain of "source" blocks from the element
00058   // currently being visited.
00059   ACE_Message_Block* element_blocks =
00060     const_cast<ACE_Message_Block*>(element->msg());
00061 
00062   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00063         "element_blocks == [%0x]\n", element_blocks));
00064 
00065   // As we visit an element, we also adjust our current_block_ and
00066   // previous_block_ data members such that we can correlate the current
00067   // element (being visited) with the message blocks that it contributed
00068   // to the unsent packet blocks.  Our head_ data member was set to the
00069   // first unsent block in the chain of blocks that make up the remaining
00070   // portions of the "packet".
00071 
00072   if (this->current_block_ == 0) {
00073     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00074           "this->current_block_ == 0.  Visiting first element.\n"));
00075 
00076     // This must be our first visit_element() call.  Set up the
00077     // current_block_ and previous_block_ data members appropriately.
00078     this->current_block_ = this->head_;
00079     this->previous_block_ = 0;
00080 
00081     // If the current_block_ is still zero, there is nothing to do here,
00082     // so cancel the visitation
00083     if (this->current_block_ == 0) {
00084       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00085             "No blocks to iterate through, ending visitation.\n"));
00086       return 0;
00087     }
00088 
00089     // There is a chance that the head_ block (and thus the current_block_)
00090     // is actually a duplicate of the packet header_block_.  If so, we
00091     // need to adust the current_block_ and previous_block_ appropriately.
00092     if (this->header_block_->base() == this->current_block_->base()) {
00093       // Yup.  Just what we thought may be the case.
00094       this->previous_block_ = this->current_block_;
00095       this->current_block_ = this->previous_block_->cont();
00096     }
00097 
00098   } else {
00099     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00100           "this->current_block_ != 0.  Visiting element other than "
00101           "the first element.\n"));
00102 
00103     // We are visiting an element that is not the very first element in
00104     // the packet.
00105 
00106     // Let's get the previous_block_ data member set to point to the
00107     // block in the packet chain that is the predecessor to the first
00108     // block in the packet chain that was contributed from the current
00109     // element.
00110     this->previous_block_ = this->current_block_;
00111 
00112     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00113           "Set previous_block_ to the current_block_ [%0x]\n",
00114           this->previous_block_));
00115 
00116     // Keep changing the previous_block_ to the next block in the chain
00117     // until we know that the next block in the chain is a duplicate of
00118     // the block at the front of the element_blocks chain.
00119     while (this->previous_block_->cont()->base() != element_blocks->base()) {
00120       this->previous_block_ = this->previous_block_->cont();
00121 
00122       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00123             "Moved previous_block_ to its cont() block [%0x]\n",
00124             this->previous_block_));
00125     }
00126 
00127     // At this point, we know that the previous_block_ is the block
00128     // that immediately precedes the first block contributed by the
00129     // element that we are currently visiting.  Set the current_block_
00130     // to point to the first block contributed by the element.
00131     this->current_block_ = this->previous_block_->cont();
00132 
00133     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00134           "Set current_block_ to the previous_block_->cont() [%0x]\n",
00135           this->current_block_));
00136   }
00137 
00138   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00139         "Does the current element match the sample to be replaced?\n"));
00140 
00141   // Does the current element (being visited) match the sample that we
00142   // need to remove?
00143   if (this->match_.matches(*element)) {
00144     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00145           "YES - The element matches the sample\n"));
00146 
00147     // We get inside here if the element we are currently visiting is
00148     // the element that "matches" the sample that needs to be removed.
00149 
00150     // At this point, the current_block_ points to the first block in
00151     // the packet that is known to have been contributed by the
00152     // element that we are currently visiting.
00153 
00154     // The previous_block_ is either pointing to the block in the packet
00155     // that immediately precedes the current_block_, or the previous_block_
00156     // is set to 0 indicating that the current_block_ also happens to
00157     // be the head_ block in the packet (and the element we are visiting
00158     // is the first element (remaining) in the packet).
00159 
00160     // Our goal now is to extract the blocks from the packet that have
00161     // been contributed by the element that we are currently visiting.
00162     // Then, we will need to replace those blocks in the packet with
00163     // our own blocks.  How the replacement blocks are created depends
00164     // upon whether or not we are visiting the first element in the packet.
00165 
00166     // The original_blocks will end up being a chain of blocks
00167     // extracted from the packet, all of which were contributed by
00168     // the current element being visited.
00169     ACE_Message_Block* original_blocks = this->current_block_;
00170 
00171     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00172           "Set original_blocks to this->current_block_ [%0x]\n",
00173           original_blocks));
00174 
00175     // The remaining_chain will end up being the chain of blocks that
00176     // followed the original blocks in the packet.  There is always
00177     // the possibility that the remaining chain will be 0, meaning that
00178     // we are currently visiting (and removing) the last element in the
00179     // packet.
00180     ACE_Message_Block* remaining_chain = this->current_block_->cont();
00181 
00182     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00183           "Set remaining_chain to this->current_block_->cont() [%0x]\n",
00184           remaining_chain));
00185 
00186     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00187           "Set original_blocks->cont(0)\n"));
00188 
00189     // At this point, we only know for sure that one block was
00190     // contributed by the element currently being visited.
00191     original_blocks->cont(0);
00192 
00193     unsigned int num_elem_blocks_sent = 0;
00194 
00195     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00196           "Set num_elem_blocks_sent to 0\n"));
00197 
00198     // The original_blocks_tail is a pointer to the last block in the
00199     // chain of blocks contributed by the element currently being visited.
00200     ACE_Message_Block* original_blocks_tail = original_blocks;
00201 
00202     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00203           "Set original_blocks_tail to original_blocks [%0x]\n",
00204           original_blocks_tail));
00205 
00206     // Find the block in the element_blocks that contributed the
00207     // block pointed to by the original_blocks_tail.
00208     ACE_Message_Block* contrib_block = element_blocks;
00209 
00210     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00211           "Set contrib_block to element_blocks [%0x]\n",
00212           contrib_block));
00213 
00214     // Loop through each block in the element_blocks until we either
00215     // find the contributing element block, or we have checked all of the
00216     // element_blocks, and never found the contributing element block.
00217     while (contrib_block != 0) {
00218       if (contrib_block->base() == original_blocks->base()) {
00219         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00220               "contrib_block->base() == original_blocks->base()\n"));
00221         // Ok.  We have found the source block.
00222         break;
00223       }
00224 
00225       // That wasn't a match.  Try the next contrib_block to see
00226       // if it is the contributing block for the block at the top of
00227       // the original_blocks chain (which is a chain of 1 at this point).
00228       contrib_block = contrib_block->cont();
00229       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00230             "Move contrib_block to contrib_block->cont() [%0x]\n",
00231             contrib_block));
00232       ++num_elem_blocks_sent;
00233       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00234             "num_elem_blocks_sent incremented to %d\n",
00235             num_elem_blocks_sent));
00236     }
00237 
00238     // Sanity check - make sure that we found the contributing block
00239     // in the current element (being visited) for the contributed block
00240     // that is the lone block in the original_blocks chain.
00241     if (contrib_block == 0) {
00242       ACE_ERROR((LM_ERROR,
00243                  "(%P|%t) ERROR: Element queue and unsent message block "
00244                  "chain is out-of-sync. source_block == 0.\n"));
00245 
00246       // Set the status to indicate a fatal error occurred.
00247       this->status_ = REMOVE_ERROR;
00248 
00249       // Stop vistation now.
00250       return 0;
00251     }
00252 
00253     // Now that we have identified the contributing block for the
00254     // single block in the original_blocks chain, we may need to add
00255     // more blocks to the original_blocks chain - one more block for
00256     // each additional block chained to the element's contributing block.
00257     // Note that this while loop doesn't do anything if the contrib_block
00258     // is the last contributing block in the element.  In this case, the
00259     // original_blocks contains the lone block that was contributed by
00260     // the lone (last) contributing block in the element - and the
00261     // remaining_chain properly points to the remaining blocks in the
00262     // packet.
00263     while (contrib_block->cont() != 0) {
00264       // The source element block indicates that it has a "next"
00265       // block that would have also contributed a block to the packet.
00266 
00267       // This means that there is a block at the front of the
00268       // remaining_chain of blocks that really should be part of the
00269       // original_blocks.
00270 
00271       // Sanity check - the remaining_chain better not be NULL (0).
00272       if (remaining_chain == 0) {
00273         ACE_ERROR((LM_ERROR,
00274                    "(%P|%t) ERROR: Element queue and unsent message block "
00275                    "chain is out-of-synch. remaining_chain == 0.\n"));
00276         this->status_ = REMOVE_ERROR;
00277         return 0;
00278       }
00279 
00280       // Extract/unchain the first block from the remaining_chain.
00281       ACE_Message_Block* additional_block = remaining_chain;
00282       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00283             "Extracted additional_block from remaining_chain [%0x]\n",
00284             additional_block));
00285 
00286       remaining_chain = remaining_chain->cont();
00287       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00288             "Move remaining_chain to remaining_chain->cont() [%0x]\n",
00289             remaining_chain));
00290 
00291       additional_block->cont(0);
00292       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00293             "Set additional_block->cont(0)\n"));
00294 
00295       // Attach the block to the end of the original_blocks chain.
00296       original_blocks_tail->cont(additional_block);
00297       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00298             "original_blocks_tail->cont(additional_block)\n"));
00299 
00300       original_blocks_tail = additional_block;
00301       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00302             "Set original_blocks_tail to additional_block [%0x]\n",
00303             original_blocks_tail));
00304 
00305       // Advance to the next contributing block.
00306       contrib_block = contrib_block->cont();
00307       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00308             "Move contrib_block to contrib_block->cont() [%0x]\n",
00309             contrib_block));
00310     }
00311 
00312     // Finally!At this point we have broken the unsent packet chain of
00313     // blocks into three seperate chains:
00314     //
00315     //   (1) this->previous_block_ is either 0, or it points to the block
00316     //       (from the unsent packet chain) that immediately preceded the
00317     //       first block (from the unsent packet chain) that was contributed
00318     //       by the sample (that we need to replace).
00319     //       this->previous_block_ is 0 when the contributed blocks from
00320     //       the sample (that we are replacing) are the first blocks from
00321     //       the unsent packet chain.
00322     //
00323     //       Thus, sub-chain (1) is either an empty chain (when
00324     //       this->previous_block_ is 0), or it is a chain that starts
00325     //       with the head_ block (the first block from the unsent packet
00326     //       chain), and ends with the this->previous_block_.
00327     //
00328     //   (2) original_blocks points to the first block (from the unsent
00329     //       packet chain) that was contributed by the sample (that we
00330     //       need to replace).
00331     //
00332     //       Thus, sub-chain (2) is a chain that starts with the block
00333     //       pointed to by original_blocks.
00334     //
00335     //   (3) remaining_chain points to the first block (from the unsent
00336     //       packet chain) that followed the last block that was
00337     //       contributed by the sample (that we need to replace).
00338     //
00339     //       Thus, sub-chain (3) is a chain that starts with the block
00340     //       pointed to by remaining_chain.  Note that this may be 0 if
00341     //       the sample being replaced is the last sample in the packet.
00342     //
00343     // If sub-chains (1), (2), and (3) were chained together (in that
00344     // order), we would end up with the original unsent packet chain.
00345     // Whew.
00346 
00347     // Now we can perform our replacement duties.
00348 
00349     // Save off the pointer to the original element
00350     TransportQueueElement* orig_elem = element;
00351 
00352     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00353           "Create the new TransportReplacedElement using the "
00354           "orig_elem [%0x]\n",
00355           orig_elem));
00356 
00357     // Create the replacement element for the original element.
00358     ACE_NEW_MALLOC_NORETURN(
00359       element,
00360       (TransportQueueElement*)this->replaced_element_allocator_.malloc(),
00361       TransportReplacedElement(orig_elem, &this->replaced_element_allocator_,
00362                                &this->replaced_element_mb_allocator_,
00363                                &this->replaced_element_db_allocator_));
00364     if (element == 0) {
00365       // Set fatal error and stop visitation.
00366       this->status_ = REMOVE_ERROR;
00367       return 0;
00368     }
00369 
00370     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00371           "The new TransportReplacedElement is [%0x]\n",
00372           element));
00373 
00374     // Now we have to deal with replacing the original_blocks chain
00375     // with duplicates from the msg() chain of the replacement element.
00376 
00377     ACE_Message_Block* replacement_element_blocks =
00378       const_cast<ACE_Message_Block*>(element->msg());
00379     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00380           "Set replacement_element_blocks to the replacement element's "
00381           "msg() [%0x]\n",
00382           replacement_element_blocks));
00383 
00384     // Move through the chain to account for the num_elem_blocks_sent
00385     for (unsigned int i = 0; i < num_elem_blocks_sent; i++) {
00386       replacement_element_blocks = replacement_element_blocks->cont();
00387 
00388       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00389             "Moved replacement_element_blocks to its cont() block "
00390             "[%0x]\n",
00391             replacement_element_blocks));
00392     }
00393 
00394     // Make a duplicate of the replacement_element_blocks chain
00395     ACE_Message_Block* replacement_blocks =
00396       replacement_element_blocks->duplicate();
00397 
00398     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00399           "Set replacement_blocks to duplicate of "
00400           "replacement_element_blocks [%0x]\n",
00401           replacement_blocks));
00402 
00403     // Now adjust the block at the front of the replacement_blocks chain
00404     // to match the block at the front of the original_blocks chain -
00405     // with respect to the difference between the rd_ptr() setting and
00406     // the base() setting.
00407     size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base();
00408 
00409     if (rd_offset > 0) {
00410       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00411             "Call replacement_blocks->rd_ptr(rd_offset) with "
00412             "rd_offset == [%d]\n",
00413             rd_offset));
00414       replacement_blocks->rd_ptr(rd_offset);
00415     }
00416 
00417     // Find the last block (the tail) in the replacement_blocks chain
00418     ACE_Message_Block* replacement_blocks_tail = replacement_blocks;
00419 
00420     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00421           "Set replacement_blocks_tail to replacement_blocks "
00422           "[%0x]\n",
00423           replacement_blocks_tail));
00424 
00425     while (replacement_blocks_tail->cont() != 0) {
00426       replacement_blocks_tail = replacement_blocks_tail->cont();
00427       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00428             "Moved replacement_blocks_tail to its cont() block "
00429             "[%0x]\n",
00430             replacement_blocks_tail));
00431     }
00432 
00433     // Now we can stitch the unsent packet chain back together using the
00434     // replacement blocks instead of the orig_blocks.
00435     replacement_blocks_tail->cont(remaining_chain);
00436 
00437     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00438           "Stitched replacement_blocks_tail to remaining_chain.\n"));
00439 
00440     if (this->previous_block_ == 0) {
00441       // Replacing blocks at the head of the unsent packet chain.
00442       this->head_ = replacement_blocks;
00443       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00444             "Replacing blocks at head of unsent packet chain.\n"));
00445 
00446     } else {
00447       // Replacing blocks not at the head of the unsent packet chain.
00448       this->previous_block_->cont(replacement_blocks);
00449       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00450             "Replacing blocks not at head of unsent packet chain.\n"));
00451     }
00452 
00453     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00454           "Release the original_blocks.\n"));
00455 
00456     // Release the chain of original blocks.
00457     original_blocks->release();
00458 
00459     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00460           "Tell original element that data_dropped().\n"));
00461 
00462     // Tell the original element (that we replaced), data_dropped()
00463     // by transport.
00464     // This visitor is used in TransportSendStrategy::do_remove_sample
00465     // and TransportSendBuffer::retain_all. In former case, the sample
00466     // is dropped as a result of writer's remove_sample call. In the
00467     // later case, the dropped_by_transport is not used as the sample
00468     // is retained sample and no callback is made to writer.
00469     this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND;
00470 
00471     if (this->status_ == REMOVE_RELEASED || this->match_.unique()) {
00472       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00473             "Return 0 to halt visitation.\n"));
00474       // Replace a single sample if one is specified, otherwise visit the
00475       // entire queue replacing each sample with the specified
00476       // publication Id value.
00477       return 0;
00478     }
00479   }
00480 
00481   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00482         "Return 1 to continue visitation.\n"));
00483 
00484   // Continue visitation.
00485   return 1;
00486 }
00487 
00488 }
00489 }

Generated on Fri Feb 12 20:05:24 2016 for OpenDDS by  doxygen 1.4.7