Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
9 : #include "PacketRemoveVisitor.h"
10 : #include "TransportRetainedElement.h"
11 :
12 : #include <ace/Message_Block.h>
13 :
14 : #if !defined (__ACE_INLINE__)
15 : #include "PacketRemoveVisitor.inl"
16 : #endif /* __ACE_INLINE__ */
17 :
18 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
19 :
20 : namespace OpenDDS {
21 : namespace DCPS {
22 :
23 0 : PacketRemoveVisitor::PacketRemoveVisitor(
24 : const TransportQueueElement::MatchCriteria& match,
25 : ACE_Message_Block*& unsent_head_block,
26 : ACE_Message_Block* header_block,
27 : MessageBlockAllocator& mb_allocator,
28 : DataBlockAllocator& db_allocator,
29 0 : bool remove_all)
30 0 : : match_(match)
31 0 : , head_(unsent_head_block)
32 0 : , header_block_(header_block)
33 0 : , status_(REMOVE_NOT_FOUND)
34 0 : , current_block_(0)
35 0 : , previous_block_(0)
36 0 : , replaced_element_mb_allocator_(mb_allocator)
37 0 : , replaced_element_db_allocator_(db_allocator)
38 0 : , remove_all_(remove_all)
39 : {
40 : DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6);
41 0 : }
42 :
43 0 : PacketRemoveVisitor::~PacketRemoveVisitor()
44 : {
45 : DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6);
46 0 : }
47 :
48 : int
49 0 : PacketRemoveVisitor::visit_element_ref(TransportQueueElement*& element)
50 : {
51 : DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6);
52 :
53 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
54 : "Obtain the element_blocks using element->msg()\n"));
55 :
56 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
57 : "The element is [%0x]\n",
58 : element));
59 :
60 0 : if (element->is_retained_replaced()) {
61 0 : status_ = REMOVE_FOUND;
62 0 : return 0;
63 : }
64 :
65 : // These is the head of the chain of "source" blocks from the element
66 : // currently being visited.
67 : ACE_Message_Block* element_blocks =
68 0 : const_cast<ACE_Message_Block*>(element->msg());
69 :
70 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
71 : "element_blocks == [%0x]\n", element_blocks));
72 :
73 : // As we visit an element, we also adjust our current_block_ and
74 : // previous_block_ data members such that we can correlate the current
75 : // element (being visited) with the message blocks that it contributed
76 : // to the unsent packet blocks. Our head_ data member was set to the
77 : // first unsent block in the chain of blocks that make up the remaining
78 : // portions of the "packet".
79 :
80 0 : if (this->current_block_ == 0) {
81 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
82 : "this->current_block_ == 0. Visiting first element.\n"));
83 :
84 : // This must be our first visit_element() call. Set up the
85 : // current_block_ and previous_block_ data members appropriately.
86 0 : this->current_block_ = this->head_;
87 0 : this->previous_block_ = 0;
88 :
89 : // If the current_block_ is still zero, there is nothing to do here,
90 : // so cancel the visitation
91 0 : if (this->current_block_ == 0) {
92 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
93 : "No blocks to iterate through, ending visitation.\n"));
94 0 : return 0;
95 : }
96 :
97 : // There is a chance that the head_ block (and thus the current_block_)
98 : // is actually a duplicate of the packet header_block_. If so, we
99 : // need to adjust the current_block_ and previous_block_ appropriately.
100 0 : if (this->header_block_->base() == this->current_block_->base()) {
101 : // Yup. Just what we thought may be the case.
102 0 : this->previous_block_ = this->current_block_;
103 0 : this->current_block_ = this->previous_block_->cont();
104 : }
105 :
106 : } else {
107 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
108 : "this->current_block_ != 0. Visiting element other than "
109 : "the first element.\n"));
110 :
111 : // We are visiting an element that is not the very first element in
112 : // the packet.
113 :
114 : // Let's get the previous_block_ data member set to point to the
115 : // block in the packet chain that is the predecessor to the first
116 : // block in the packet chain that was contributed from the current
117 : // element.
118 0 : this->previous_block_ = this->current_block_;
119 :
120 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
121 : "Set previous_block_ to the current_block_ [%0x]\n",
122 : this->previous_block_));
123 :
124 : // Keep changing the previous_block_ to the next block in the chain
125 : // until we know that the next block in the chain is a duplicate of
126 : // the block at the front of the element_blocks chain.
127 0 : while (this->previous_block_->cont()->base() != element_blocks->base()) {
128 0 : this->previous_block_ = this->previous_block_->cont();
129 :
130 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
131 : "Moved previous_block_ to its cont() block [%0x]\n",
132 : this->previous_block_));
133 : }
134 :
135 : // At this point, we know that the previous_block_ is the block
136 : // that immediately precedes the first block contributed by the
137 : // element that we are currently visiting. Set the current_block_
138 : // to point to the first block contributed by the element.
139 0 : this->current_block_ = this->previous_block_->cont();
140 :
141 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
142 : "Set current_block_ to the previous_block_->cont() [%0x]\n",
143 : this->current_block_));
144 : }
145 :
146 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
147 : "Does the current element match the sample to be replaced?\n"));
148 :
149 : // Does the current element (being visited) match the sample that we
150 : // need to remove?
151 0 : if (this->match_.matches(*element)) {
152 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
153 : "YES - The element matches the sample\n"));
154 :
155 : // We get inside here if the element we are currently visiting is
156 : // the element that "matches" the sample that needs to be removed.
157 :
158 : // At this point, the current_block_ points to the first block in
159 : // the packet that is known to have been contributed by the
160 : // element that we are currently visiting.
161 :
162 : // The previous_block_ is either pointing to the block in the packet
163 : // that immediately precedes the current_block_, or the previous_block_
164 : // is set to 0 indicating that the current_block_ also happens to
165 : // be the head_ block in the packet (and the element we are visiting
166 : // is the first element (remaining) in the packet).
167 :
168 : // Our goal now is to extract the blocks from the packet that have
169 : // been contributed by the element that we are currently visiting.
170 : // Then, we will need to replace those blocks in the packet with
171 : // our own blocks. How the replacement blocks are created depends
172 : // upon whether or not we are visiting the first element in the packet.
173 :
174 : // The original_blocks will end up being a chain of blocks
175 : // extracted from the packet, all of which were contributed by
176 : // the current element being visited.
177 0 : ACE_Message_Block* original_blocks = this->current_block_;
178 :
179 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
180 : "Set original_blocks to this->current_block_ [%0x]\n",
181 : original_blocks));
182 :
183 : // The remaining_chain will end up being the chain of blocks that
184 : // followed the original blocks in the packet. There is always
185 : // the possibility that the remaining chain will be 0, meaning that
186 : // we are currently visiting (and removing) the last element in the
187 : // packet.
188 0 : ACE_Message_Block* remaining_chain = this->current_block_->cont();
189 :
190 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
191 : "Set remaining_chain to this->current_block_->cont() [%0x]\n",
192 : remaining_chain));
193 :
194 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
195 : "Set original_blocks->cont(0)\n"));
196 :
197 : // At this point, we only know for sure that one block was
198 : // contributed by the element currently being visited.
199 0 : original_blocks->cont(0);
200 :
201 0 : unsigned int num_elem_blocks_sent = 0;
202 :
203 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
204 : "Set num_elem_blocks_sent to 0\n"));
205 :
206 : // The original_blocks_tail is a pointer to the last block in the
207 : // chain of blocks contributed by the element currently being visited.
208 0 : ACE_Message_Block* original_blocks_tail = original_blocks;
209 :
210 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
211 : "Set original_blocks_tail to original_blocks [%0x]\n",
212 : original_blocks_tail));
213 :
214 : // Find the block in the element_blocks that contributed the
215 : // block pointed to by the original_blocks_tail.
216 0 : ACE_Message_Block* contrib_block = element_blocks;
217 :
218 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
219 : "Set contrib_block to element_blocks [%0x]\n",
220 : contrib_block));
221 :
222 : // Loop through each block in the element_blocks until we either
223 : // find the contributing element block, or we have checked all of the
224 : // element_blocks, and never found the contributing element block.
225 0 : while (contrib_block != 0) {
226 0 : if (contrib_block->base() == original_blocks->base()) {
227 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
228 : "contrib_block->base() == original_blocks->base()\n"));
229 : // Ok. We have found the source block.
230 0 : break;
231 : }
232 :
233 : // That wasn't a match. Try the next contrib_block to see
234 : // if it is the contributing block for the block at the top of
235 : // the original_blocks chain (which is a chain of 1 at this point).
236 0 : contrib_block = contrib_block->cont();
237 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
238 : "Move contrib_block to contrib_block->cont() [%0x]\n",
239 : contrib_block));
240 0 : ++num_elem_blocks_sent;
241 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
242 : "num_elem_blocks_sent incremented to %d\n",
243 : num_elem_blocks_sent));
244 : }
245 :
246 : // Sanity check - make sure that we found the contributing block
247 : // in the current element (being visited) for the contributed block
248 : // that is the lone block in the original_blocks chain.
249 0 : if (contrib_block == 0) {
250 0 : ACE_ERROR((LM_ERROR,
251 : "(%P|%t) ERROR: Element queue and unsent message block "
252 : "chain is out-of-sync. source_block == 0.\n"));
253 :
254 : // Set the status to indicate a fatal error occurred.
255 0 : this->status_ = REMOVE_ERROR;
256 :
257 : // Stop vistation now.
258 0 : return 0;
259 : }
260 :
261 : // Now that we have identified the contributing block for the
262 : // single block in the original_blocks chain, we may need to add
263 : // more blocks to the original_blocks chain - one more block for
264 : // each additional block chained to the element's contributing block.
265 : // Note that this while loop doesn't do anything if the contrib_block
266 : // is the last contributing block in the element. In this case, the
267 : // original_blocks contains the lone block that was contributed by
268 : // the lone (last) contributing block in the element - and the
269 : // remaining_chain properly points to the remaining blocks in the
270 : // packet.
271 0 : while (contrib_block->cont() != 0) {
272 : // The source element block indicates that it has a "next"
273 : // block that would have also contributed a block to the packet.
274 :
275 : // This means that there is a block at the front of the
276 : // remaining_chain of blocks that really should be part of the
277 : // original_blocks.
278 :
279 : // Sanity check - the remaining_chain better not be NULL (0).
280 0 : if (remaining_chain == 0) {
281 0 : ACE_ERROR((LM_ERROR,
282 : "(%P|%t) ERROR: Element queue and unsent message block "
283 : "chain is out-of-synch. remaining_chain == 0.\n"));
284 0 : this->status_ = REMOVE_ERROR;
285 0 : return 0;
286 : }
287 :
288 : // Extract/unchain the first block from the remaining_chain.
289 0 : ACE_Message_Block* additional_block = remaining_chain;
290 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
291 : "Extracted additional_block from remaining_chain [%0x]\n",
292 : additional_block));
293 :
294 0 : remaining_chain = remaining_chain->cont();
295 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
296 : "Move remaining_chain to remaining_chain->cont() [%0x]\n",
297 : remaining_chain));
298 :
299 0 : additional_block->cont(0);
300 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
301 : "Set additional_block->cont(0)\n"));
302 :
303 : // Attach the block to the end of the original_blocks chain.
304 0 : original_blocks_tail->cont(additional_block);
305 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
306 : "original_blocks_tail->cont(additional_block)\n"));
307 :
308 0 : original_blocks_tail = additional_block;
309 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
310 : "Set original_blocks_tail to additional_block [%0x]\n",
311 : original_blocks_tail));
312 :
313 : // Advance to the next contributing block.
314 0 : contrib_block = contrib_block->cont();
315 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
316 : "Move contrib_block to contrib_block->cont() [%0x]\n",
317 : contrib_block));
318 : }
319 :
320 : // Finally!At this point we have broken the unsent packet chain of
321 : // blocks into three separate chains:
322 : //
323 : // (1) this->previous_block_ is either 0, or it points to the block
324 : // (from the unsent packet chain) that immediately preceded the
325 : // first block (from the unsent packet chain) that was contributed
326 : // by the sample (that we need to replace).
327 : // this->previous_block_ is 0 when the contributed blocks from
328 : // the sample (that we are replacing) are the first blocks from
329 : // the unsent packet chain.
330 : //
331 : // Thus, sub-chain (1) is either an empty chain (when
332 : // this->previous_block_ is 0), or it is a chain that starts
333 : // with the head_ block (the first block from the unsent packet
334 : // chain), and ends with the this->previous_block_.
335 : //
336 : // (2) original_blocks points to the first block (from the unsent
337 : // packet chain) that was contributed by the sample (that we
338 : // need to replace).
339 : //
340 : // Thus, sub-chain (2) is a chain that starts with the block
341 : // pointed to by original_blocks.
342 : //
343 : // (3) remaining_chain points to the first block (from the unsent
344 : // packet chain) that followed the last block that was
345 : // contributed by the sample (that we need to replace).
346 : //
347 : // Thus, sub-chain (3) is a chain that starts with the block
348 : // pointed to by remaining_chain. Note that this may be 0 if
349 : // the sample being replaced is the last sample in the packet.
350 : //
351 : // If sub-chains (1), (2), and (3) were chained together (in that
352 : // order), we would end up with the original unsent packet chain.
353 : // Whew.
354 :
355 : // Now we can perform our replacement duties.
356 :
357 : // Save off the pointer to the original element
358 0 : TransportQueueElement* orig_elem = element;
359 :
360 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
361 : "Create the new TransportReplacedElement using the "
362 : "orig_elem [%0x]\n",
363 : orig_elem));
364 :
365 : // Create the replacement element for the original element.
366 0 : element = new
367 : TransportReplacedElement(orig_elem,
368 0 : &this->replaced_element_mb_allocator_,
369 0 : &this->replaced_element_db_allocator_);
370 :
371 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
372 : "The new TransportReplacedElement is [%0x]\n",
373 : element));
374 :
375 : // Now we have to deal with replacing the original_blocks chain
376 : // with duplicates from the msg() chain of the replacement element.
377 :
378 : ACE_Message_Block* replacement_element_blocks =
379 0 : const_cast<ACE_Message_Block*>(element->msg());
380 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
381 : "Set replacement_element_blocks to the replacement element's "
382 : "msg() [%0x]\n",
383 : replacement_element_blocks));
384 :
385 : // Move through the chain to account for the num_elem_blocks_sent
386 0 : for (unsigned int i = 0; i < num_elem_blocks_sent; i++) {
387 0 : replacement_element_blocks = replacement_element_blocks->cont();
388 :
389 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
390 : "Moved replacement_element_blocks to its cont() block "
391 : "[%0x]\n",
392 : replacement_element_blocks));
393 : }
394 :
395 : // Make a duplicate of the replacement_element_blocks chain
396 : ACE_Message_Block* replacement_blocks =
397 0 : replacement_element_blocks->duplicate();
398 :
399 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
400 : "Set replacement_blocks to duplicate of "
401 : "replacement_element_blocks [%0x]\n",
402 : replacement_blocks));
403 :
404 : // Now adjust the block at the front of the replacement_blocks chain
405 : // to match the block at the front of the original_blocks chain -
406 : // with respect to the difference between the rd_ptr() setting and
407 : // the base() setting.
408 0 : size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base();
409 :
410 0 : if (rd_offset > 0) {
411 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
412 : "Call replacement_blocks->rd_ptr(rd_offset) with "
413 : "rd_offset == [%d]\n",
414 : rd_offset));
415 0 : replacement_blocks->rd_ptr(rd_offset);
416 : }
417 :
418 : // Find the last block (the tail) in the replacement_blocks chain
419 0 : ACE_Message_Block* replacement_blocks_tail = replacement_blocks;
420 :
421 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
422 : "Set replacement_blocks_tail to replacement_blocks "
423 : "[%0x]\n",
424 : replacement_blocks_tail));
425 :
426 0 : while (replacement_blocks_tail->cont() != 0) {
427 0 : replacement_blocks_tail = replacement_blocks_tail->cont();
428 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
429 : "Moved replacement_blocks_tail to its cont() block "
430 : "[%0x]\n",
431 : replacement_blocks_tail));
432 : }
433 :
434 : // Now we can stitch the unsent packet chain back together using the
435 : // replacement blocks instead of the orig_blocks.
436 0 : replacement_blocks_tail->cont(remaining_chain);
437 :
438 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
439 : "Stitched replacement_blocks_tail to remaining_chain.\n"));
440 :
441 0 : if (this->previous_block_ == 0) {
442 : // Replacing blocks at the head of the unsent packet chain.
443 0 : this->head_ = replacement_blocks;
444 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
445 : "Replacing blocks at head of unsent packet chain.\n"));
446 :
447 : } else {
448 : // Replacing blocks not at the head of the unsent packet chain.
449 0 : this->previous_block_->cont(replacement_blocks);
450 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
451 : "Replacing blocks not at head of unsent packet chain.\n"));
452 : }
453 :
454 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
455 : "Release the original_blocks.\n"));
456 :
457 : // Release the chain of original blocks.
458 : Message_Block_Deleter deleter;
459 0 : deleter(original_blocks);
460 :
461 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
462 : "Tell original element that data_dropped().\n"));
463 :
464 : // Tell the original element (that we replaced), data_dropped()
465 : // by transport.
466 : // This visitor is used in TransportSendStrategy::do_remove_sample
467 : // and TransportSendBuffer::retain_all. In former case, the sample
468 : // is dropped as a result of writer's remove_sample call. In the
469 : // later case, the dropped_by_transport is not used as the sample
470 : // is retained sample and no callback is made to writer.
471 0 : this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND;
472 :
473 0 : if ((!remove_all_ && status_ == REMOVE_RELEASED) || match_.unique()) {
474 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
475 : "Return 0 to halt visitation.\n"));
476 : // Replace a single sample if one is specified, otherwise visit the
477 : // entire queue replacing each sample with the specified
478 : // publication Id value.
479 0 : return 0;
480 : }
481 : }
482 :
483 0 : VDBG((LM_DEBUG, "(%P|%t) DBG: "
484 : "Return 1 to continue visitation.\n"));
485 :
486 : // Continue visitation.
487 0 : return 1;
488 : }
489 :
490 : }
491 : }
492 :
493 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
|