00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "PacketRemoveVisitor.h"
00010 #include "TransportRetainedElement.h"
00011 #include "ace/Message_Block.h"
00012
00013 #if !defined (__ACE_INLINE__)
00014 #include "PacketRemoveVisitor.inl"
00015 #endif
00016
00017 namespace OpenDDS {
00018 namespace DCPS {
00019
00020 PacketRemoveVisitor::PacketRemoveVisitor(
00021 const TransportQueueElement::MatchCriteria& match,
00022 ACE_Message_Block*& unsent_head_block,
00023 ACE_Message_Block* header_block,
00024 TransportReplacedElementAllocator& allocator,
00025 MessageBlockAllocator& mb_allocator,
00026 DataBlockAllocator& db_allocator)
00027 : match_(match)
00028 , head_(unsent_head_block)
00029 , header_block_(header_block)
00030 , status_(REMOVE_NOT_FOUND)
00031 , current_block_(0)
00032 , previous_block_(0)
00033 , replaced_element_allocator_(allocator)
00034 , replaced_element_mb_allocator_(mb_allocator)
00035 , replaced_element_db_allocator_(db_allocator)
00036 {
00037 DBG_ENTRY_LVL("PacketRemoveVisitor", "PacketRemoveVisitor", 6);
00038 }
00039
00040 PacketRemoveVisitor::~PacketRemoveVisitor()
00041 {
00042 DBG_ENTRY_LVL("PacketRemoveVisitor", "~PacketRemoveVisitor", 6);
00043 }
00044
00045 int
00046 PacketRemoveVisitor::visit_element_ref(TransportQueueElement*& element)
00047 {
00048 DBG_ENTRY_LVL("PacketRemoveVisitor", "visit_element_ref", 6);
00049
00050 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00051 "Obtain the element_blocks using element->msg()\n"));
00052
00053 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00054 "The element is [%0x]\n",
00055 element));
00056
00057
00058
00059 ACE_Message_Block* element_blocks =
00060 const_cast<ACE_Message_Block*>(element->msg());
00061
00062 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00063 "element_blocks == [%0x]\n", element_blocks));
00064
00065
00066
00067
00068
00069
00070
00071
00072 if (this->current_block_ == 0) {
00073 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00074 "this->current_block_ == 0. Visiting first element.\n"));
00075
00076
00077
00078 this->current_block_ = this->head_;
00079 this->previous_block_ = 0;
00080
00081
00082
00083 if (this->current_block_ == 0) {
00084 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00085 "No blocks to iterate through, ending visitation.\n"));
00086 return 0;
00087 }
00088
00089
00090
00091
00092 if (this->header_block_->base() == this->current_block_->base()) {
00093
00094 this->previous_block_ = this->current_block_;
00095 this->current_block_ = this->previous_block_->cont();
00096 }
00097
00098 } else {
00099 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00100 "this->current_block_ != 0. Visiting element other than "
00101 "the first element.\n"));
00102
00103
00104
00105
00106
00107
00108
00109
00110 this->previous_block_ = this->current_block_;
00111
00112 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00113 "Set previous_block_ to the current_block_ [%0x]\n",
00114 this->previous_block_));
00115
00116
00117
00118
00119 while (this->previous_block_->cont()->base() != element_blocks->base()) {
00120 this->previous_block_ = this->previous_block_->cont();
00121
00122 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00123 "Moved previous_block_ to its cont() block [%0x]\n",
00124 this->previous_block_));
00125 }
00126
00127
00128
00129
00130
00131 this->current_block_ = this->previous_block_->cont();
00132
00133 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00134 "Set current_block_ to the previous_block_->cont() [%0x]\n",
00135 this->current_block_));
00136 }
00137
00138 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00139 "Does the current element match the sample to be replaced?\n"));
00140
00141
00142
00143 if (this->match_.matches(*element)) {
00144 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00145 "YES - The element matches the sample\n"));
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169 ACE_Message_Block* original_blocks = this->current_block_;
00170
00171 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00172 "Set original_blocks to this->current_block_ [%0x]\n",
00173 original_blocks));
00174
00175
00176
00177
00178
00179
00180 ACE_Message_Block* remaining_chain = this->current_block_->cont();
00181
00182 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00183 "Set remaining_chain to this->current_block_->cont() [%0x]\n",
00184 remaining_chain));
00185
00186 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00187 "Set original_blocks->cont(0)\n"));
00188
00189
00190
00191 original_blocks->cont(0);
00192
00193 unsigned int num_elem_blocks_sent = 0;
00194
00195 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00196 "Set num_elem_blocks_sent to 0\n"));
00197
00198
00199
00200 ACE_Message_Block* original_blocks_tail = original_blocks;
00201
00202 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00203 "Set original_blocks_tail to original_blocks [%0x]\n",
00204 original_blocks_tail));
00205
00206
00207
00208 ACE_Message_Block* contrib_block = element_blocks;
00209
00210 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00211 "Set contrib_block to element_blocks [%0x]\n",
00212 contrib_block));
00213
00214
00215
00216
00217 while (contrib_block != 0) {
00218 if (contrib_block->base() == original_blocks->base()) {
00219 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00220 "contrib_block->base() == original_blocks->base()\n"));
00221
00222 break;
00223 }
00224
00225
00226
00227
00228 contrib_block = contrib_block->cont();
00229 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00230 "Move contrib_block to contrib_block->cont() [%0x]\n",
00231 contrib_block));
00232 ++num_elem_blocks_sent;
00233 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00234 "num_elem_blocks_sent incremented to %d\n",
00235 num_elem_blocks_sent));
00236 }
00237
00238
00239
00240
00241 if (contrib_block == 0) {
00242 ACE_ERROR((LM_ERROR,
00243 "(%P|%t) ERROR: Element queue and unsent message block "
00244 "chain is out-of-sync. source_block == 0.\n"));
00245
00246
00247 this->status_ = REMOVE_ERROR;
00248
00249
00250 return 0;
00251 }
00252
00253
00254
00255
00256
00257
00258
00259
00260
00261
00262
00263 while (contrib_block->cont() != 0) {
00264
00265
00266
00267
00268
00269
00270
00271
00272 if (remaining_chain == 0) {
00273 ACE_ERROR((LM_ERROR,
00274 "(%P|%t) ERROR: Element queue and unsent message block "
00275 "chain is out-of-synch. remaining_chain == 0.\n"));
00276 this->status_ = REMOVE_ERROR;
00277 return 0;
00278 }
00279
00280
00281 ACE_Message_Block* additional_block = remaining_chain;
00282 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00283 "Extracted additional_block from remaining_chain [%0x]\n",
00284 additional_block));
00285
00286 remaining_chain = remaining_chain->cont();
00287 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00288 "Move remaining_chain to remaining_chain->cont() [%0x]\n",
00289 remaining_chain));
00290
00291 additional_block->cont(0);
00292 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00293 "Set additional_block->cont(0)\n"));
00294
00295
00296 original_blocks_tail->cont(additional_block);
00297 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00298 "original_blocks_tail->cont(additional_block)\n"));
00299
00300 original_blocks_tail = additional_block;
00301 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00302 "Set original_blocks_tail to additional_block [%0x]\n",
00303 original_blocks_tail));
00304
00305
00306 contrib_block = contrib_block->cont();
00307 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00308 "Move contrib_block to contrib_block->cont() [%0x]\n",
00309 contrib_block));
00310 }
00311
00312
00313
00314
00315
00316
00317
00318
00319
00320
00321
00322
00323
00324
00325
00326
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336
00337
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350 TransportQueueElement* orig_elem = element;
00351
00352 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00353 "Create the new TransportReplacedElement using the "
00354 "orig_elem [%0x]\n",
00355 orig_elem));
00356
00357
00358 ACE_NEW_MALLOC_NORETURN(
00359 element,
00360 (TransportQueueElement*)this->replaced_element_allocator_.malloc(),
00361 TransportReplacedElement(orig_elem, &this->replaced_element_allocator_,
00362 &this->replaced_element_mb_allocator_,
00363 &this->replaced_element_db_allocator_));
00364 if (element == 0) {
00365
00366 this->status_ = REMOVE_ERROR;
00367 return 0;
00368 }
00369
00370 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00371 "The new TransportReplacedElement is [%0x]\n",
00372 element));
00373
00374
00375
00376
00377 ACE_Message_Block* replacement_element_blocks =
00378 const_cast<ACE_Message_Block*>(element->msg());
00379 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00380 "Set replacement_element_blocks to the replacement element's "
00381 "msg() [%0x]\n",
00382 replacement_element_blocks));
00383
00384
00385 for (unsigned int i = 0; i < num_elem_blocks_sent; i++) {
00386 replacement_element_blocks = replacement_element_blocks->cont();
00387
00388 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00389 "Moved replacement_element_blocks to its cont() block "
00390 "[%0x]\n",
00391 replacement_element_blocks));
00392 }
00393
00394
00395 ACE_Message_Block* replacement_blocks =
00396 replacement_element_blocks->duplicate();
00397
00398 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00399 "Set replacement_blocks to duplicate of "
00400 "replacement_element_blocks [%0x]\n",
00401 replacement_blocks));
00402
00403
00404
00405
00406
00407 size_t rd_offset = original_blocks->rd_ptr() - original_blocks->base();
00408
00409 if (rd_offset > 0) {
00410 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00411 "Call replacement_blocks->rd_ptr(rd_offset) with "
00412 "rd_offset == [%d]\n",
00413 rd_offset));
00414 replacement_blocks->rd_ptr(rd_offset);
00415 }
00416
00417
00418 ACE_Message_Block* replacement_blocks_tail = replacement_blocks;
00419
00420 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00421 "Set replacement_blocks_tail to replacement_blocks "
00422 "[%0x]\n",
00423 replacement_blocks_tail));
00424
00425 while (replacement_blocks_tail->cont() != 0) {
00426 replacement_blocks_tail = replacement_blocks_tail->cont();
00427 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00428 "Moved replacement_blocks_tail to its cont() block "
00429 "[%0x]\n",
00430 replacement_blocks_tail));
00431 }
00432
00433
00434
00435 replacement_blocks_tail->cont(remaining_chain);
00436
00437 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00438 "Stitched replacement_blocks_tail to remaining_chain.\n"));
00439
00440 if (this->previous_block_ == 0) {
00441
00442 this->head_ = replacement_blocks;
00443 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00444 "Replacing blocks at head of unsent packet chain.\n"));
00445
00446 } else {
00447
00448 this->previous_block_->cont(replacement_blocks);
00449 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00450 "Replacing blocks not at head of unsent packet chain.\n"));
00451 }
00452
00453 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00454 "Release the original_blocks.\n"));
00455
00456
00457 original_blocks->release();
00458
00459 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00460 "Tell original element that data_dropped().\n"));
00461
00462
00463
00464
00465
00466
00467
00468
00469 this->status_ = orig_elem->data_dropped() ? REMOVE_RELEASED : REMOVE_FOUND;
00470
00471 if (this->status_ == REMOVE_RELEASED || this->match_.unique()) {
00472 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00473 "Return 0 to halt visitation.\n"));
00474
00475
00476
00477 return 0;
00478 }
00479 }
00480
00481 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00482 "Return 1 to continue visitation.\n"));
00483
00484
00485 return 1;
00486 }
00487
00488 }
00489 }