00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PacketRemoveVisitor.h"
00010 #include "TransportRetainedElement.h"
00011 #include "ace/Message_Block.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "PacketRemoveVisitor.inl"
00015 #endif
00016
00017 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00018
00019 namespace OpenDDS {
00020 namespace DCPS {
00021
00022 PacketRemoveVisitor::PacketRemoveVisitor(
00023 const TransportQueueElement::MatchCriteria& match,
00024 ACE_Message_Block*& unsent_head_block,
00025 ACE_Message_Block* header_block,
00026 MessageBlockAllocator& mb_allocator,
00027 DataBlockAllocator& db_allocator)
00028 : match_(match)
00029 , head_(unsent_head_block)
00030 , header_block_(header_block)
00031 , status_(REMOVE_NOT_FOUND)
00032 , current_block_(0)
00033 , previous_block_(0)
00034 , replaced_element_mb_allocator_(mb_allocator)
00035 , replaced_element_db_allocator_(db_allocator)
00036 {
00037 DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6);
00038 }
00039
00040 PacketRemoveVisitor::~PacketRemoveVisitor()
00041 {
00042 DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6);
00043 }
00044
00045 int
00046 PacketRemoveVisitor::visit_element_ref(TransportQueueElement*& element)
00047 {
00048 DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6);
00049
00050 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00051 "Obtain the element_blocks using element->msg()\n"));
00052
00053 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00054 "The element is [%0x]\n",
00055 element));
00056
00057
00058
00059 ACE_Message_Block* element_blocks =
00060 const_cast<ACE_Message_Block*>(element->msg());
00061
00062 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00063 "element_blocks == [%0x]\n", element_blocks));
00064
00065
00066
00067
00068
00069
00070
00071
00072 if (this->current_block_ == 0) {
00073 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00074 "this->current_block_ == 0. Visiting first element.\n"));
00075
00076
00077
00078 this->current_block_ = this->head_;
00079 this->previous_block_ = 0;
00080
00081
00082
00083 if (this->current_block_ == 0) {
00084 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00085 "No blocks to iterate through, ending visitation.\n"));
00086 return 0;
00087 }
00088
00089
00090
00091
00092 if (this->header_block_->base() == this->current_block_->base()) {
00093
00094 this->previous_block_ = this->current_block_;
00095 this->current_block_ = this->previous_block_->cont();
00096 }
00097
00098 } else {
00099 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00100 "this->current_block_ != 0. Visiting element other than "
00101 "the first element.\n"));
00102
00103
00104
00105
00106
00107
00108
00109
00110 this->previous_block_ = this->current_block_;
00111
00112 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00113 "Set previous_block_ to the current_block_ [%0x]\n",
00114 this->previous_block_));
00115
00116
00117
00118
00119 while (this->previous_block_->cont()->base() != element_blocks->base()) {
00120 this->previous_block_ = this->previous_block_->cont();
00121
00122 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00123 "Moved previous_block_ to its cont() block [%0x]\n",
00124 this->previous_block_));
00125 }
00126
00127
00128
00129
00130
00131 this->current_block_ = this->previous_block_->cont();
00132
00133 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00134 "Set current_block_ to the previous_block_->cont() [%0x]\n",
00135 this->current_block_));
00136 }
00137
00138 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00139 "Does the current element match the sample to be replaced?\n"));
00140
00141
00142
00143 if (this->match_.matches(*element)) {
00144 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00145 "YES - The element matches the sample\n"));
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 ACE_Message_Block* original_blocks = this->current_block_;
00170
00171 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00172 "Set original_blocks to this->current_block_ [%0x]\n",
00173 original_blocks));
00174
00175
00176
00177
00178
00179
00180 ACE_Message_Block* remaining_chain = this->current_block_->cont();
00181
00182 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00183 "Set remaining_chain to this->current_block_->cont() [%0x]\n",
00184 remaining_chain));
00185
00186 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00187 "Set original_blocks->cont(0)\n"));
00188
00189
00190
00191 original_blocks->cont(0);
00192
00193 unsigned int num_elem_blocks_sent = 0;
00194
00195 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00196 "Set num_elem_blocks_sent to 0\n"));
00197
00198
00199
00200 ACE_Message_Block* original_blocks_tail = original_blocks;
00201
00202 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00203 "Set original_blocks_tail to original_blocks [%0x]\n",
00204 original_blocks_tail));
00205
00206
00207
00208 ACE_Message_Block* contrib_block = element_blocks;
00209
00210 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00211 "Set contrib_block to element_blocks [%0x]\n",
00212 contrib_block));
00213
00214
00215
00216
00217 while (contrib_block != 0) {
00218 if (contrib_block->base() == original_blocks->base()) {
00219 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00220 "contrib_block->base() == original_blocks->base()\n"));
00221
00222 break;
00223 }
00224
00225
00226
00227
00228 contrib_block = contrib_block->cont();
00229 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00230 "Move contrib_block to contrib_block->cont() [%0x]\n",
00231 contrib_block));
00232 ++num_elem_blocks_sent;
00233 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00234 "num_elem_blocks_sent incremented to %d\n",
00235 num_elem_blocks_sent));
00236 }
00237
00238
00239
00240
00241 if (contrib_block == 0) {
00242 ACE_ERROR((LM_ERROR,
00243 "(%P|%t) ERROR: Element queue and unsent message block "
00244 "chain is out-of-sync. source_block == 0.\n"));
00245
00246
00247 this->status_ = REMOVE_ERROR;
00248
00249
00250 return 0;
00251 }
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263 while (contrib_block->cont() != 0) {
00264
00265
00266
00267
00268
00269
00270
00271
00272 if (remaining_chain == 0) {
00273 ACE_ERROR((LM_ERROR,
00274 "(%P|%t) ERROR: Element queue and unsent message block "
00275 "chain is out-of-synch. remaining_chain == 0.\n"));
00276 this->status_ = REMOVE_ERROR;
00277 return 0;
00278 }
00279
00280
00281 ACE_Message_Block* additional_block = remaining_chain;
00282 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00283 "Extracted additional_block from remaining_chain [%0x]\n",
00284 additional_block));
00285
00286 remaining_chain = remaining_chain->cont();
00287 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00288 "Move remaining_chain to remaining_chain->cont() [%0x]\n",
00289 remaining_chain));
00290
00291 additional_block->cont(0);
00292 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00293 "Set additional_block->cont(0)\n"));
00294
00295
00296 original_blocks_tail->cont(additional_block);
00297 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00298 "original_blocks_tail->cont(additional_block)\n"));
00299
00300 original_blocks_tail = additional_block;
00301 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00302 "Set original_blocks_tail to additional_block [%0x]\n",
00303 original_blocks_tail));
00304
00305
00306 contrib_block = contrib_block->cont();
00307 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00308 "Move contrib_block to contrib_block->cont() [%0x]\n",
00309 contrib_block));
00310 }
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350 TransportQueueElement* orig_elem = element;
00351
00352 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00353 "Create the new TransportReplacedElement using the "
00354 "orig_elem [%0x]\n",
00355 orig_elem));
00356
00357
00358 element = new
00359 TransportReplacedElement(orig_elem,
00360 &this->replaced_element_mb_allocator_,
00361 &this->replaced_element_db_allocator_);
00362
00363 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00364 "The new TransportReplacedElement is [%0x]\n",
00365 element));
00366
00367
00368
00369
00370 ACE_Message_Block* replacement_element_blocks =
00371 const_cast<ACE_Message_Block*>(element->msg());
00372 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00373 "Set replacement_element_blocks to the replacement element's "
00374 "msg() [%0x]\n",
00375 replacement_element_blocks));
00376
00377
00378 for (unsigned int i = 0; i < num_elem_blocks_sent; i++) {
00379 replacement_element_blocks = replacement_element_blocks->cont();
00380
00381 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00382 "Moved replacement_element_blocks to its cont() block "
00383 "[%0x]\n",
00384 replacement_element_blocks));
00385 }
00386
00387
00388 ACE_Message_Block* replacement_blocks =
00389 replacement_element_blocks->duplicate();
00390
00391 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00392 "Set replacement_blocks to duplicate of "
00393 "replacement_element_blocks [%0x]\n",
00394 replacement_blocks));
00395
00396
00397
00398
00399
00400 size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base();
00401
00402 if (rd_offset > 0) {
00403 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00404 "Call replacement_blocks->rd_ptr(rd_offset) with "
00405 "rd_offset == [%d]\n",
00406 rd_offset));
00407 replacement_blocks->rd_ptr(rd_offset);
00408 }
00409
00410
00411 ACE_Message_Block* replacement_blocks_tail = replacement_blocks;
00412
00413 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00414 "Set replacement_blocks_tail to replacement_blocks "
00415 "[%0x]\n",
00416 replacement_blocks_tail));
00417
00418 while (replacement_blocks_tail->cont() != 0) {
00419 replacement_blocks_tail = replacement_blocks_tail->cont();
00420 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00421 "Moved replacement_blocks_tail to its cont() block "
00422 "[%0x]\n",
00423 replacement_blocks_tail));
00424 }
00425
00426
00427
00428 replacement_blocks_tail->cont(remaining_chain);
00429
00430 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00431 "Stitched replacement_blocks_tail to remaining_chain.\n"));
00432
00433 if (this->previous_block_ == 0) {
00434
00435 this->head_ = replacement_blocks;
00436 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00437 "Replacing blocks at head of unsent packet chain.\n"));
00438
00439 } else {
00440
00441 this->previous_block_->cont(replacement_blocks);
00442 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00443 "Replacing blocks not at head of unsent packet chain.\n"));
00444 }
00445
00446 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00447 "Release the original_blocks.\n"));
00448
00449
00450 original_blocks->release();
00451
00452 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00453 "Tell original element that data_dropped().\n"));
00454
00455
00456
00457
00458
00459
00460
00461
00462 this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND;
00463
00464 if (this->status_ == REMOVE_RELEASED || this->match_.unique()) {
00465 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00466 "Return 0 to halt visitation.\n"));
00467
00468
00469
00470 return 0;
00471 }
00472 }
00473
00474 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00475 "Return 1 to continue visitation.\n"));
00476
00477
00478 return 1;
00479 }
00480
00481 }
00482 }
00483
00484 OPENDDS_END_VERSIONED_NAMESPACE_DECL