LCOV - code coverage report
Current view: top level - DCPS/transport/framework - PacketRemoveVisitor.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 122 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 4 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : #include "PacketRemoveVisitor.h"
      10             : #include "TransportRetainedElement.h"
      11             : 
      12             : #include <ace/Message_Block.h>
      13             : 
      14             : #if !defined (__ACE_INLINE__)
      15             : #include "PacketRemoveVisitor.inl"
      16             : #endif /* __ACE_INLINE__ */
      17             : 
      18             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      19             : 
      20             : namespace OpenDDS {
      21             : namespace DCPS {
      22             : 
      23           0 : PacketRemoveVisitor::PacketRemoveVisitor(
      24             :   const TransportQueueElement::MatchCriteria& match,
      25             :   ACE_Message_Block*& unsent_head_block,
      26             :   ACE_Message_Block* header_block,
      27             :   MessageBlockAllocator& mb_allocator,
      28             :   DataBlockAllocator& db_allocator,
      29           0 :   bool remove_all)
      30           0 :   : match_(match)
      31           0 :   , head_(unsent_head_block)
      32           0 :   , header_block_(header_block)
      33           0 :   , status_(REMOVE_NOT_FOUND)
      34           0 :   , current_block_(0)
      35           0 :   , previous_block_(0)
      36           0 :   , replaced_element_mb_allocator_(mb_allocator)
      37           0 :   , replaced_element_db_allocator_(db_allocator)
      38           0 :   , remove_all_(remove_all)
      39             : {
      40             :   DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6);
      41           0 : }
      42             : 
      43           0 : PacketRemoveVisitor::~PacketRemoveVisitor()
      44             : {
      45             :   DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6);
      46           0 : }
      47             : 
      48             : int
      49           0 : PacketRemoveVisitor::visit_element_ref(TransportQueueElement*& element)
      50             : {
      51             :   DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6);
      52             : 
      53           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
      54             :         "Obtain the element_blocks using element->msg()\n"));
      55             : 
      56           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
      57             :         "The element is [%0x]\n",
      58             :         element));
      59             : 
      60           0 :   if (element->is_retained_replaced()) {
      61           0 :     status_ = REMOVE_FOUND;
      62           0 :     return 0;
      63             :   }
      64             : 
      65             :   // These is the head of the chain of "source" blocks from the element
      66             :   // currently being visited.
      67             :   ACE_Message_Block* element_blocks =
      68           0 :     const_cast<ACE_Message_Block*>(element->msg());
      69             : 
      70           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
      71             :         "element_blocks == [%0x]\n", element_blocks));
      72             : 
      73             :   // As we visit an element, we also adjust our current_block_ and
      74             :   // previous_block_ data members such that we can correlate the current
      75             :   // element (being visited) with the message blocks that it contributed
      76             :   // to the unsent packet blocks.  Our head_ data member was set to the
      77             :   // first unsent block in the chain of blocks that make up the remaining
      78             :   // portions of the "packet".
      79             : 
      80           0 :   if (this->current_block_ == 0) {
      81           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
      82             :           "this->current_block_ == 0.  Visiting first element.\n"));
      83             : 
      84             :     // This must be our first visit_element() call.  Set up the
      85             :     // current_block_ and previous_block_ data members appropriately.
      86           0 :     this->current_block_ = this->head_;
      87           0 :     this->previous_block_ = 0;
      88             : 
      89             :     // If the current_block_ is still zero, there is nothing to do here,
      90             :     // so cancel the visitation
      91           0 :     if (this->current_block_ == 0) {
      92           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
      93             :             "No blocks to iterate through, ending visitation.\n"));
      94           0 :       return 0;
      95             :     }
      96             : 
      97             :     // There is a chance that the head_ block (and thus the current_block_)
      98             :     // is actually a duplicate of the packet header_block_.  If so, we
      99             :     // need to adjust the current_block_ and previous_block_ appropriately.
     100           0 :     if (this->header_block_->base() == this->current_block_->base()) {
     101             :       // Yup.  Just what we thought may be the case.
     102           0 :       this->previous_block_ = this->current_block_;
     103           0 :       this->current_block_ = this->previous_block_->cont();
     104             :     }
     105             : 
     106             :   } else {
     107           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     108             :           "this->current_block_ != 0.  Visiting element other than "
     109             :           "the first element.\n"));
     110             : 
     111             :     // We are visiting an element that is not the very first element in
     112             :     // the packet.
     113             : 
     114             :     // Let's get the previous_block_ data member set to point to the
     115             :     // block in the packet chain that is the predecessor to the first
     116             :     // block in the packet chain that was contributed from the current
     117             :     // element.
     118           0 :     this->previous_block_ = this->current_block_;
     119             : 
     120           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     121             :           "Set previous_block_ to the current_block_ [%0x]\n",
     122             :           this->previous_block_));
     123             : 
     124             :     // Keep changing the previous_block_ to the next block in the chain
     125             :     // until we know that the next block in the chain is a duplicate of
     126             :     // the block at the front of the element_blocks chain.
     127           0 :     while (this->previous_block_->cont()->base() != element_blocks->base()) {
     128           0 :       this->previous_block_ = this->previous_block_->cont();
     129             : 
     130           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     131             :             "Moved previous_block_ to its cont() block [%0x]\n",
     132             :             this->previous_block_));
     133             :     }
     134             : 
     135             :     // At this point, we know that the previous_block_ is the block
     136             :     // that immediately precedes the first block contributed by the
     137             :     // element that we are currently visiting.  Set the current_block_
     138             :     // to point to the first block contributed by the element.
     139           0 :     this->current_block_ = this->previous_block_->cont();
     140             : 
     141           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     142             :           "Set current_block_ to the previous_block_->cont() [%0x]\n",
     143             :           this->current_block_));
     144             :   }
     145             : 
     146           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     147             :         "Does the current element match the sample to be replaced?\n"));
     148             : 
     149             :   // Does the current element (being visited) match the sample that we
     150             :   // need to remove?
     151           0 :   if (this->match_.matches(*element)) {
     152           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     153             :           "YES - The element matches the sample\n"));
     154             : 
     155             :     // We get inside here if the element we are currently visiting is
     156             :     // the element that "matches" the sample that needs to be removed.
     157             : 
     158             :     // At this point, the current_block_ points to the first block in
     159             :     // the packet that is known to have been contributed by the
     160             :     // element that we are currently visiting.
     161             : 
     162             :     // The previous_block_ is either pointing to the block in the packet
     163             :     // that immediately precedes the current_block_, or the previous_block_
     164             :     // is set to 0 indicating that the current_block_ also happens to
     165             :     // be the head_ block in the packet (and the element we are visiting
     166             :     // is the first element (remaining) in the packet).
     167             : 
     168             :     // Our goal now is to extract the blocks from the packet that have
     169             :     // been contributed by the element that we are currently visiting.
     170             :     // Then, we will need to replace those blocks in the packet with
     171             :     // our own blocks.  How the replacement blocks are created depends
     172             :     // upon whether or not we are visiting the first element in the packet.
     173             : 
     174             :     // The original_blocks will end up being a chain of blocks
     175             :     // extracted from the packet, all of which were contributed by
     176             :     // the current element being visited.
     177           0 :     ACE_Message_Block* original_blocks = this->current_block_;
     178             : 
     179           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     180             :           "Set original_blocks to this->current_block_ [%0x]\n",
     181             :           original_blocks));
     182             : 
     183             :     // The remaining_chain will end up being the chain of blocks that
     184             :     // followed the original blocks in the packet.  There is always
     185             :     // the possibility that the remaining chain will be 0, meaning that
     186             :     // we are currently visiting (and removing) the last element in the
     187             :     // packet.
     188           0 :     ACE_Message_Block* remaining_chain = this->current_block_->cont();
     189             : 
     190           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     191             :           "Set remaining_chain to this->current_block_->cont() [%0x]\n",
     192             :           remaining_chain));
     193             : 
     194           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     195             :           "Set original_blocks->cont(0)\n"));
     196             : 
     197             :     // At this point, we only know for sure that one block was
     198             :     // contributed by the element currently being visited.
     199           0 :     original_blocks->cont(0);
     200             : 
     201           0 :     unsigned int num_elem_blocks_sent = 0;
     202             : 
     203           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     204             :           "Set num_elem_blocks_sent to 0\n"));
     205             : 
     206             :     // The original_blocks_tail is a pointer to the last block in the
     207             :     // chain of blocks contributed by the element currently being visited.
     208           0 :     ACE_Message_Block* original_blocks_tail = original_blocks;
     209             : 
     210           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     211             :           "Set original_blocks_tail to original_blocks [%0x]\n",
     212             :           original_blocks_tail));
     213             : 
     214             :     // Find the block in the element_blocks that contributed the
     215             :     // block pointed to by the original_blocks_tail.
     216           0 :     ACE_Message_Block* contrib_block = element_blocks;
     217             : 
     218           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     219             :           "Set contrib_block to element_blocks [%0x]\n",
     220             :           contrib_block));
     221             : 
     222             :     // Loop through each block in the element_blocks until we either
     223             :     // find the contributing element block, or we have checked all of the
     224             :     // element_blocks, and never found the contributing element block.
     225           0 :     while (contrib_block != 0) {
     226           0 :       if (contrib_block->base() == original_blocks->base()) {
     227           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     228             :               "contrib_block->base() == original_blocks->base()\n"));
     229             :         // Ok.  We have found the source block.
     230           0 :         break;
     231             :       }
     232             : 
     233             :       // That wasn't a match.  Try the next contrib_block to see
     234             :       // if it is the contributing block for the block at the top of
     235             :       // the original_blocks chain (which is a chain of 1 at this point).
     236           0 :       contrib_block = contrib_block->cont();
     237           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     238             :             "Move contrib_block to contrib_block->cont() [%0x]\n",
     239             :             contrib_block));
     240           0 :       ++num_elem_blocks_sent;
     241           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     242             :             "num_elem_blocks_sent incremented to %d\n",
     243             :             num_elem_blocks_sent));
     244             :     }
     245             : 
     246             :     // Sanity check - make sure that we found the contributing block
     247             :     // in the current element (being visited) for the contributed block
     248             :     // that is the lone block in the original_blocks chain.
     249           0 :     if (contrib_block == 0) {
     250           0 :       ACE_ERROR((LM_ERROR,
     251             :                  "(%P|%t) ERROR: Element queue and unsent message block "
     252             :                  "chain is out-of-sync. source_block == 0.\n"));
     253             : 
     254             :       // Set the status to indicate a fatal error occurred.
     255           0 :       this->status_ = REMOVE_ERROR;
     256             : 
     257             :       // Stop vistation now.
     258           0 :       return 0;
     259             :     }
     260             : 
     261             :     // Now that we have identified the contributing block for the
     262             :     // single block in the original_blocks chain, we may need to add
     263             :     // more blocks to the original_blocks chain - one more block for
     264             :     // each additional block chained to the element's contributing block.
     265             :     // Note that this while loop doesn't do anything if the contrib_block
     266             :     // is the last contributing block in the element.  In this case, the
     267             :     // original_blocks contains the lone block that was contributed by
     268             :     // the lone (last) contributing block in the element - and the
     269             :     // remaining_chain properly points to the remaining blocks in the
     270             :     // packet.
     271           0 :     while (contrib_block->cont() != 0) {
     272             :       // The source element block indicates that it has a "next"
     273             :       // block that would have also contributed a block to the packet.
     274             : 
     275             :       // This means that there is a block at the front of the
     276             :       // remaining_chain of blocks that really should be part of the
     277             :       // original_blocks.
     278             : 
     279             :       // Sanity check - the remaining_chain better not be NULL (0).
     280           0 :       if (remaining_chain == 0) {
     281           0 :         ACE_ERROR((LM_ERROR,
     282             :                    "(%P|%t) ERROR: Element queue and unsent message block "
     283             :                    "chain is out-of-synch. remaining_chain == 0.\n"));
     284           0 :         this->status_ = REMOVE_ERROR;
     285           0 :         return 0;
     286             :       }
     287             : 
     288             :       // Extract/unchain the first block from the remaining_chain.
     289           0 :       ACE_Message_Block* additional_block = remaining_chain;
     290           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     291             :             "Extracted additional_block from remaining_chain [%0x]\n",
     292             :             additional_block));
     293             : 
     294           0 :       remaining_chain = remaining_chain->cont();
     295           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     296             :             "Move remaining_chain to remaining_chain->cont() [%0x]\n",
     297             :             remaining_chain));
     298             : 
     299           0 :       additional_block->cont(0);
     300           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     301             :             "Set additional_block->cont(0)\n"));
     302             : 
     303             :       // Attach the block to the end of the original_blocks chain.
     304           0 :       original_blocks_tail->cont(additional_block);
     305           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     306             :             "original_blocks_tail->cont(additional_block)\n"));
     307             : 
     308           0 :       original_blocks_tail = additional_block;
     309           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     310             :             "Set original_blocks_tail to additional_block [%0x]\n",
     311             :             original_blocks_tail));
     312             : 
     313             :       // Advance to the next contributing block.
     314           0 :       contrib_block = contrib_block->cont();
     315           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     316             :             "Move contrib_block to contrib_block->cont() [%0x]\n",
     317             :             contrib_block));
     318             :     }
     319             : 
     320             :     // Finally!At this point we have broken the unsent packet chain of
     321             :     // blocks into three separate chains:
     322             :     //
     323             :     //   (1) this->previous_block_ is either 0, or it points to the block
     324             :     //       (from the unsent packet chain) that immediately preceded the
     325             :     //       first block (from the unsent packet chain) that was contributed
     326             :     //       by the sample (that we need to replace).
     327             :     //       this->previous_block_ is 0 when the contributed blocks from
     328             :     //       the sample (that we are replacing) are the first blocks from
     329             :     //       the unsent packet chain.
     330             :     //
     331             :     //       Thus, sub-chain (1) is either an empty chain (when
     332             :     //       this->previous_block_ is 0), or it is a chain that starts
     333             :     //       with the head_ block (the first block from the unsent packet
     334             :     //       chain), and ends with the this->previous_block_.
     335             :     //
     336             :     //   (2) original_blocks points to the first block (from the unsent
     337             :     //       packet chain) that was contributed by the sample (that we
     338             :     //       need to replace).
     339             :     //
     340             :     //       Thus, sub-chain (2) is a chain that starts with the block
     341             :     //       pointed to by original_blocks.
     342             :     //
     343             :     //   (3) remaining_chain points to the first block (from the unsent
     344             :     //       packet chain) that followed the last block that was
     345             :     //       contributed by the sample (that we need to replace).
     346             :     //
     347             :     //       Thus, sub-chain (3) is a chain that starts with the block
     348             :     //       pointed to by remaining_chain.  Note that this may be 0 if
     349             :     //       the sample being replaced is the last sample in the packet.
     350             :     //
     351             :     // If sub-chains (1), (2), and (3) were chained together (in that
     352             :     // order), we would end up with the original unsent packet chain.
     353             :     // Whew.
     354             : 
     355             :     // Now we can perform our replacement duties.
     356             : 
     357             :     // Save off the pointer to the original element
     358           0 :     TransportQueueElement* orig_elem = element;
     359             : 
     360           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     361             :           "Create the new TransportReplacedElement using the "
     362             :           "orig_elem [%0x]\n",
     363             :           orig_elem));
     364             : 
     365             :     // Create the replacement element for the original element.
     366           0 :     element = new
     367             :       TransportReplacedElement(orig_elem,
     368           0 :                                &this->replaced_element_mb_allocator_,
     369           0 :                                &this->replaced_element_db_allocator_);
     370             : 
     371           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     372             :           "The new TransportReplacedElement is [%0x]\n",
     373             :           element));
     374             : 
     375             :     // Now we have to deal with replacing the original_blocks chain
     376             :     // with duplicates from the msg() chain of the replacement element.
     377             : 
     378             :     ACE_Message_Block* replacement_element_blocks =
     379           0 :       const_cast<ACE_Message_Block*>(element->msg());
     380           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     381             :           "Set replacement_element_blocks to the replacement element's "
     382             :           "msg() [%0x]\n",
     383             :           replacement_element_blocks));
     384             : 
     385             :     // Move through the chain to account for the num_elem_blocks_sent
     386           0 :     for (unsigned int i = 0; i < num_elem_blocks_sent; i++) {
     387           0 :       replacement_element_blocks = replacement_element_blocks->cont();
     388             : 
     389           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     390             :             "Moved replacement_element_blocks to its cont() block "
     391             :             "[%0x]\n",
     392             :             replacement_element_blocks));
     393             :     }
     394             : 
     395             :     // Make a duplicate of the replacement_element_blocks chain
     396             :     ACE_Message_Block* replacement_blocks =
     397           0 :       replacement_element_blocks->duplicate();
     398             : 
     399           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     400             :           "Set replacement_blocks to duplicate of "
     401             :           "replacement_element_blocks [%0x]\n",
     402             :           replacement_blocks));
     403             : 
     404             :     // Now adjust the block at the front of the replacement_blocks chain
     405             :     // to match the block at the front of the original_blocks chain -
     406             :     // with respect to the difference between the rd_ptr() setting and
     407             :     // the base() setting.
     408           0 :     size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base();
     409             : 
     410           0 :     if (rd_offset > 0) {
     411           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     412             :             "Call replacement_blocks->rd_ptr(rd_offset) with "
     413             :             "rd_offset == [%d]\n",
     414             :             rd_offset));
     415           0 :       replacement_blocks->rd_ptr(rd_offset);
     416             :     }
     417             : 
     418             :     // Find the last block (the tail) in the replacement_blocks chain
     419           0 :     ACE_Message_Block* replacement_blocks_tail = replacement_blocks;
     420             : 
     421           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     422             :           "Set replacement_blocks_tail to replacement_blocks "
     423             :           "[%0x]\n",
     424             :           replacement_blocks_tail));
     425             : 
     426           0 :     while (replacement_blocks_tail->cont() != 0) {
     427           0 :       replacement_blocks_tail = replacement_blocks_tail->cont();
     428           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     429             :             "Moved replacement_blocks_tail to its cont() block "
     430             :             "[%0x]\n",
     431             :             replacement_blocks_tail));
     432             :     }
     433             : 
     434             :     // Now we can stitch the unsent packet chain back together using the
     435             :     // replacement blocks instead of the orig_blocks.
     436           0 :     replacement_blocks_tail->cont(remaining_chain);
     437             : 
     438           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     439             :           "Stitched replacement_blocks_tail to remaining_chain.\n"));
     440             : 
     441           0 :     if (this->previous_block_ == 0) {
     442             :       // Replacing blocks at the head of the unsent packet chain.
     443           0 :       this->head_ = replacement_blocks;
     444           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     445             :             "Replacing blocks at head of unsent packet chain.\n"));
     446             : 
     447             :     } else {
     448             :       // Replacing blocks not at the head of the unsent packet chain.
     449           0 :       this->previous_block_->cont(replacement_blocks);
     450           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     451             :             "Replacing blocks not at head of unsent packet chain.\n"));
     452             :     }
     453             : 
     454           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     455             :           "Release the original_blocks.\n"));
     456             : 
     457             :     // Release the chain of original blocks.
     458             :     Message_Block_Deleter deleter;
     459           0 :     deleter(original_blocks);
     460             : 
     461           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     462             :           "Tell original element that data_dropped().\n"));
     463             : 
     464             :     // Tell the original element (that we replaced), data_dropped()
     465             :     // by transport.
     466             :     // This visitor is used in TransportSendStrategy::do_remove_sample
     467             :     // and TransportSendBuffer::retain_all. In former case, the sample
     468             :     // is dropped as a result of writer's remove_sample call. In the
     469             :     // later case, the dropped_by_transport is not used as the sample
     470             :     // is retained sample and no callback is made to writer.
     471           0 :     this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND;
     472             : 
     473           0 :     if ((!remove_all_ && status_ == REMOVE_RELEASED) || match_.unique()) {
     474           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     475             :             "Return 0 to halt visitation.\n"));
     476             :       // Replace a single sample if one is specified, otherwise visit the
     477             :       // entire queue replacing each sample with the specified
     478             :       // publication Id value.
     479           0 :       return 0;
     480             :     }
     481             :   }
     482             : 
     483           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     484             :         "Return 1 to continue visitation.\n"));
     485             : 
     486             :   // Continue visitation.
     487           0 :   return 1;
     488             : }
     489             : 
     490             : }
     491             : }
     492             : 
     493             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16