00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportSendStrategy.h"
00010 #include "RemoveAllVisitor.h"
00011 #include "TransportInst.h"
00012 #include "ThreadSynchStrategy.h"
00013 #include "ThreadSynchResource.h"
00014 #include "TransportQueueElement.h"
00015 #include "TransportSendElement.h"
00016 #include "TransportSendBuffer.h"
00017 #include "BuildChainVisitor.h"
00018 #include "QueueRemoveVisitor.h"
00019 #include "PacketRemoveVisitor.h"
00020 #include "TransportDefs.h"
00021 #include "DirectPriorityMapper.h"
00022 #include "dds/DCPS/DataSampleHeader.h"
00023 #include "dds/DCPS/DataSampleElement.h"
00024 #include "dds/DCPS/Service_Participant.h"
00025 #include "EntryExit.h"
00026
00027 #include "ace/Reverse_Lock_T.h"
00028
00029 #if !defined (__ACE_INLINE__)
00030 #include "TransportSendStrategy.inl"
00031 #endif
00032
00033 namespace OpenDDS {
00034 namespace DCPS {
00035
00036
00037
00038
00039
00040
00041 #define NUM_REPLACED_ELEMENT_CHUNKS 20
00042
00043 namespace {
00044
00045
00046
00047
00048 static const size_t MIN_FRAG = 68;
00049 }
00050
00051
00052
00053
00054
00055
00056 TransportSendStrategy::TransportSendStrategy(
00057 std::size_t id,
00058 const TransportInst_rch& transport_inst,
00059 ThreadSynchResource* synch_resource,
00060 Priority priority,
00061 const ThreadSynchStrategy_rch& thread_sync_strategy)
00062 : ThreadSynchWorker(id),
00063 max_samples_(transport_inst->max_samples_per_packet_),
00064 optimum_size_(transport_inst->optimum_packet_size_),
00065 max_size_(transport_inst->max_packet_size_),
00066 queue_(new QueueType(transport_inst->queue_messages_per_pool_,
00067 transport_inst->queue_initial_pools_)),
00068 max_header_size_(0),
00069 header_block_(0),
00070 elems_(new QueueType(1, transport_inst->max_samples_per_packet_)),
00071 pkt_chain_(0),
00072 header_complete_(false),
00073 start_counter_(0),
00074 mode_(MODE_DIRECT),
00075 mode_before_suspend_(MODE_NOT_SET),
00076 header_mb_allocator_(0),
00077 header_db_allocator_(0),
00078 synch_(0),
00079 lock_(),
00080 replaced_element_allocator_(NUM_REPLACED_ELEMENT_CHUNKS),
00081 replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00082 replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00083 retained_element_allocator_(0),
00084 transport_inst_(transport_inst),
00085 graceful_disconnecting_(false),
00086 link_released_(true),
00087 send_buffer_(0),
00088 transport_shutdown_(false)
00089 {
00090 DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00091
00092
00093 DirectPriorityMapper mapper(priority);
00094 this->synch_ = thread_sync_strategy->create_synch_object(
00095 synch_resource,
00096 #ifdef ACE_WIN32
00097 ACE_DEFAULT_THREAD_PRIORITY,
00098 #else
00099 mapper.thread_priority(),
00100 #endif
00101 TheServiceParticipant->scheduler());
00102
00103
00104
00105 this->max_header_size_ = TransportHeader::max_marshaled_size();
00106
00107 if (Transport_debug_level >= 2) {
00108 ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportSendStrategy replaced_element_allocator %x with %d chunks\n",
00109 &replaced_element_allocator_, NUM_REPLACED_ELEMENT_CHUNKS));
00110 }
00111
00112 delayed_delivered_notification_queue_.reserve(this->max_samples_);
00113 }
00114
00115 TransportSendStrategy::~TransportSendStrategy()
00116 {
00117 DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00118
00119 delete this->synch_;
00120
00121 this->delayed_delivered_notification_queue_.clear();
00122
00123 delete this->elems_;
00124 delete this->queue_;
00125 }
00126
00127 void
00128 TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
00129 {
00130 this->send_buffer_ = send_buffer;
00131
00132 if (this->send_buffer_ != 0) {
00133 this->send_buffer_->bind(this);
00134 }
00135 }
00136
00137 ThreadSynchWorker::WorkOutcome
00138 TransportSendStrategy::perform_work()
00139 {
00140 DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00141
00142 SendPacketOutcome outcome;
00143 bool no_more_work = false;
00144
00145 {
00146 GuardType guard(this->lock_);
00147
00148 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00149
00150 if (this->mode_ == MODE_TERMINATED) {
00151 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00152 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00153 "we lost connection and could not reconnect, just return "
00154 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00155 return WORK_OUTCOME_BROKEN_RESOURCE;
00156 }
00157
00158
00159
00160
00161
00162
00163
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173
00174 if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00175 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00176 "Entered perform_work() and mode_ is %C - just return "
00177 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00178 return WORK_OUTCOME_NO_MORE_TO_DO;
00179 }
00180
00181
00182
00183
00184
00185
00186
00187
00188
00189 const size_t header_length = this->header_.length_;
00190
00191 if (header_length == 0) {
00192 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00193 "The current packet doesn't have any unsent bytes - we "
00194 "need to 'populate' the current packet with elems from "
00195 "the queue.\n"), 5);
00196
00197
00198
00199
00200
00201
00202 if (this->queue_->size() == 0) {
00203 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00204 "But the queue is empty. We have cleared the "
00205 "backpressure situation.\n"),5);
00206
00207
00208
00209
00210
00211 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00212 "Flip mode to MODE_DIRECT, and return "
00213 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00214
00215
00216 this->mode_ = MODE_DIRECT;
00217
00218
00219
00220 return WORK_OUTCOME_NO_MORE_TO_DO;
00221 }
00222
00223 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00224 "There is at least one elem in the queue - get the packet "
00225 "elems from the queue.\n"), 5);
00226
00227
00228
00229 this->get_packet_elems_from_queue();
00230
00231 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00232 "Prepare the packet from the packet elems_.\n"), 5);
00233
00234
00235 this->prepare_packet();
00236
00237 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00238 "Packet has been prepared from packet elems_.\n"), 5);
00239
00240 } else {
00241 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00242 "We have a current packet that still has unsent bytes.\n"), 5);
00243 }
00244
00245 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00246 "Attempt to send the current packet.\n"), 5);
00247
00248
00249
00250
00251
00252
00253 outcome = this->send_packet();
00254
00255
00256
00257 if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_->size() == 0)) {
00258 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00259 "Flip the mode to MODE_DIRECT, and then return "
00260 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00261
00262
00263 this->mode_ = MODE_DIRECT;
00264 no_more_work = true;
00265 }
00266 }
00267
00268 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00269 "The outcome of the send_packet() was %d.\n", outcome), 5);
00270
00271 send_delayed_notifications();
00272
00273
00274
00275 if (no_more_work) {
00276 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00277 "We sent the whole packet, and there is nothing left on "
00278 "the queue now.\n"), 5);
00279
00280
00281
00282 return WORK_OUTCOME_NO_MORE_TO_DO;
00283 }
00284
00285 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00286 "We still have unsent bytes in the current packet AND/OR there "
00287 "are still elements in the queue.\n"), 5);
00288
00289 if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00290 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00291 "We lost our connection, or had some fatal connection "
00292 "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00293
00294 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00295 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00296
00297 bool do_suspend = true;
00298 this->relink(do_suspend);
00299
00300 if (this->mode_ == MODE_SUSPEND) {
00301 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00302 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00303 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00304
00305
00306 return WORK_OUTCOME_NO_MORE_TO_DO;
00307
00308 } else if (this->mode_ == MODE_TERMINATED) {
00309 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00310 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00311 return WORK_OUTCOME_BROKEN_RESOURCE;
00312
00313 } else {
00314 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00315 "Reconnect succeeded, Notify synch thread of work "
00316 "availability.\n"), 5);
00317
00318
00319
00320 this->synch_->work_available();
00321 }
00322 }
00323
00324 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00325 "We still have an 'unbroken' connection.\n"), 5);
00326
00327 if (outcome == OUTCOME_BACKPRESSURE) {
00328 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00329 "We experienced backpressure on our attempt to send the "
00330 "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00331
00332 return WORK_OUTCOME_CLOGGED_RESOURCE;
00333 }
00334
00335 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00336 "We may have sent the whole current packet, but still have "
00337 "elements on the queue.\n"), 5);
00338 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00339 "Or, we may have only partially sent the current packet.\n"), 5);
00340
00341 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00342 "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352 return WORK_OUTCOME_MORE_TO_DO;
00353 }
00354
00355
00356
00357
00358
00359
00360
00361
00362
00363
00364
00365
00366
00367
00368 int
00369 TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
00370 {
00371 DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00372
00373 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00374 "Adjusting the current packet because %d bytes of the packet "
00375 "have been sent.\n", num_bytes_sent));
00376
00377 ssize_t num_bytes_left = num_bytes_sent;
00378 ssize_t num_non_header_bytes_sent = 0;
00379
00380 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00381 "Set num_bytes_left to %d.\n", num_bytes_left));
00382 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00383 "Set num_non_header_bytes_sent to %d.\n",
00384 num_non_header_bytes_sent));
00385
00386 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00387 "Peek at the element at the front of the packet elems_.\n"));
00388
00389
00390 TransportQueueElement* element = this->elems_->peek();
00391
00392 if(!element){
00393 ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00394 } else {
00395 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00396 "Use the element's msg() to find the last block in "
00397 "the msg() chain.\n"));
00398
00399
00400 const ACE_Message_Block* elem_tail_block = element->msg();
00401
00402 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00403 "Start with tail block == element->msg().\n"));
00404
00405 while (elem_tail_block->cont() != 0) {
00406 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00407 "Set tail block to its cont() block (next in chain).\n"));
00408 elem_tail_block = elem_tail_block->cont();
00409 }
00410
00411 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00412 "Tail block now set (because tail block's cont() is 0).\n"));
00413
00414 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00415 "Start the 'while (num_bytes_left > 0)' loop.\n"));
00416
00417 while (num_bytes_left > 0) {
00418 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00419 "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
00420 num_bytes_left));
00421
00422 const int block_length = static_cast<int>(this->pkt_chain_->length());
00423
00424 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00425 "Length of block at front of pkt_chain_ is [%d].\n",
00426 block_length));
00427
00428 if (block_length <= num_bytes_left) {
00429 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00430 "The whole block at the front of pkt_chain_ was sent.\n"));
00431
00432
00433
00434
00435
00436 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00437 "Extract the fully sent block from the pkt_chain_.\n"));
00438
00439 ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00440
00441 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00442 "Set pkt_chain_ to pkt_chain_->cont().\n"));
00443
00444 this->pkt_chain_ = this->pkt_chain_->cont();
00445
00446 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00447 "Set the fully sent block's cont() to 0.\n"));
00448
00449 fully_sent_block->cont(0);
00450
00451
00452
00453 num_bytes_left -= block_length;
00454
00455 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00456 "Updated num_bytes_left to account for fully sent "
00457 "block (block_length == [%d]).\n", block_length));
00458 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00459 "Now, num_bytes_left == [%d].\n", num_bytes_left));
00460
00461 if (!this->header_complete_) {
00462 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00463 "Since the header_complete_ flag is false, it means "
00464 "that the packet header block was still in the "
00465 "pkt_chain_.\n"));
00466
00467 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00468 "Not anymore... Set the header_complete_ flag "
00469 "to true.\n"));
00470
00471
00472
00473 this->header_complete_ = true;
00474
00475 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00476 "Release the fully sent block.\n"));
00477
00478
00479 fully_sent_block->release();
00480
00481 } else {
00482 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00483 "Since the header_complete_ flag is true, it means "
00484 "that the packet header block was not in the "
00485 "pkt_chain_.\n"));
00486 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00487 "So, the fully sent block was part of an element.\n"));
00488
00489
00490
00491
00492
00493
00494 num_non_header_bytes_sent += block_length;
00495
00496 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00497 "Updated num_non_header_bytes_sent to account for "
00498 "fully sent block (block_length == [%d]).\n",
00499 block_length));
00500
00501 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00502 "Now, num_non_header_bytes_sent == [%d].\n",
00503 num_non_header_bytes_sent));
00504
00505 if (fully_sent_block->base() == elem_tail_block->base()) {
00506 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00507 "Ok. The fully sent block was a duplicate of "
00508 "the tail block of the element that is at the "
00509 "front of the packet elems_.\n"));
00510
00511 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00512 "This means that we have completely sent the "
00513 "element at the front of the packet elems_.\n"));
00514
00515
00516
00517
00518 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00519 "We can release the fully sent block now.\n"));
00520
00521
00522 fully_sent_block->release();
00523
00524 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00525 "We can extract the element from the front of "
00526 "the packet elems_ (we were just peeking).\n"));
00527
00528
00529 element = this->elems_->get();
00530
00531 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00532 "Tell the element that a decision has been made "
00533 "regarding its fate - data_delivered().\n"));
00534
00535
00536 this->add_delayed_notification(element);
00537
00538 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00539 "Peek at the next element in the packet "
00540 "elems_.\n"));
00541
00542
00543 element = this->elems_->peek();
00544
00545 if (element != 0) {
00546 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00547 "The is an element still in the packet "
00548 "elems_ (we are peeking at it now).\n"));
00549
00550 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00551 "We are going to find the tail block for the "
00552 "current element (we are peeking at).\n"));
00553
00554
00555
00556 elem_tail_block = element->msg();
00557
00558 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00559 "Start w/tail block == element->msg().\n"));
00560
00561 while (elem_tail_block->cont() != 0) {
00562 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00563 "Set tail block to next in chain.\n"));
00564 elem_tail_block = elem_tail_block->cont();
00565 }
00566
00567 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00568 "Done finding tail block.\n"));
00569 }
00570
00571 } else {
00572 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00573 "Ok. The fully sent block is *not* a "
00574 "duplicate of the tail block of the element "
00575 "at the front of the packet elems_.\n"));
00576
00577 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00578 "Thus, we have not completely sent the "
00579 "element yet.\n"));
00580
00581
00582
00583
00584 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00585 "We can release the fully_sent_block now.\n"));
00586
00587
00588 fully_sent_block->release();
00589 }
00590 }
00591
00592 } else {
00593 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00594 "Only part of the block at the front of pkt_chain_ "
00595 "was sent.\n"));
00596
00597 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00598 "Advance the rd_ptr() of the front block (of pkt_chain_) "
00599 "by the num_bytes_left (%d).\n", num_bytes_left));
00600
00601
00602 this->pkt_chain_->rd_ptr(num_bytes_left);
00603
00604 if (this->header_complete_) {
00605 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00606 "And since the packet header block has already been "
00607 "completely sent, add num_bytes_left to the "
00608 "num_non_header_bytes_sent.\n"));
00609
00610 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00611 "Before, num_non_header_bytes_sent == %d.\n",
00612 num_non_header_bytes_sent));
00613
00614
00615
00616
00617
00618 num_non_header_bytes_sent += num_bytes_left;
00619
00620 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00621 "After, num_non_header_bytes_sent == %d.\n",
00622 num_non_header_bytes_sent));
00623 }
00624
00625 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00626 "Set the num_bytes_left to 0 now.\n"));
00627
00628 num_bytes_left = 0;
00629 }
00630 }
00631 }
00632
00633 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00634 "The 'num_bytes_left' loop has completed.\n"));
00635
00636 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00637 "Adjust the header_.length_ to account for the "
00638 "num_non_header_bytes_sent.\n"));
00639 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00640 "Before, header_.length_ == %d.\n",
00641 this->header_.length_));
00642
00643
00644
00645 this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00646
00647 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00648 "After, header_.length_ == %d.\n",
00649 this->header_.length_));
00650
00651
00652 int rc = (this->header_.length_ == 0) ? 0 : 1;
00653
00654 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00655 "Adjustments all done. Returning [%d]. 0 means entire packet "
00656 "has been sent. 1 means otherwise.\n",
00657 rc));
00658
00659 return rc;
00660 }
00661
00662 bool
00663 TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
00664 {
00665 DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00666 TransportQueueElement* sample = 0;
00667 SendMode mode = MODE_NOT_SET;
00668
00669 OPENDDS_VECTOR(TQESendModePair) samples;
00670
00671 size_t num_delayed_notifications = 0;
00672 bool found_element = false;
00673
00674 {
00675 GuardType guard(lock_);
00676
00677 num_delayed_notifications = delayed_delivered_notification_queue_.size();
00678
00679 if (num_delayed_notifications == 0) {
00680 return false;
00681
00682 } else if (num_delayed_notifications == 1) {
00683
00684
00685 if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00686 found_element = true;
00687 sample = delayed_delivered_notification_queue_[0].first;
00688 mode = delayed_delivered_notification_queue_[0].second;
00689
00690 delayed_delivered_notification_queue_.clear();
00691 }
00692
00693 } else {
00694 OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00695 for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00696 sample = iter->first;
00697 mode = iter->second;
00698 if (!match || match->matches(*sample)) {
00699 found_element = true;
00700 samples.push_back(*iter);
00701 iter = delayed_delivered_notification_queue_.erase(iter);
00702 } else {
00703 ++iter;
00704 }
00705 }
00706 }
00707 }
00708
00709 if (!found_element)
00710 return false;
00711
00712 if (num_delayed_notifications == 1) {
00713
00714 if (mode == MODE_TERMINATED) {
00715 if (!transport_shutdown_ || sample->owned_by_transport()) {
00716 sample->data_dropped(true);
00717 }
00718 } else {
00719 if (!transport_shutdown_ || sample->owned_by_transport()) {
00720 sample->data_delivered();
00721 }
00722 }
00723
00724 } else {
00725 for (size_t i = 0; i < samples.size(); ++i) {
00726 if (samples[i].second == MODE_TERMINATED) {
00727 if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00728 samples[i].first->data_dropped(true);
00729 }
00730 } else {
00731 if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00732 samples[i].first->data_delivered();
00733 }
00734 }
00735 }
00736 }
00737 return true;
00738 }
00739
00740
00741 void
00742 TransportSendStrategy::terminate_send(bool graceful_disconnecting)
00743 {
00744 DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00745
00746 bool reset_flag = true;
00747
00748 {
00749 GuardType guard(this->lock_);
00750
00751
00752
00753
00754 if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00755 && !this->graceful_disconnecting_) {
00756 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00757 "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00758 reset_flag = false;
00759 }
00760 }
00761
00762 VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED \n"));
00763
00764 this->clear(MODE_TERMINATED);
00765
00766 if (reset_flag) {
00767 GuardType guard(this->lock_);
00768 this->graceful_disconnecting_ = graceful_disconnecting;
00769 }
00770 }
00771
00772 void
00773 TransportSendStrategy::clear(SendMode mode)
00774 {
00775 DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00776
00777 send_delayed_notifications();
00778 QueueType* elems = 0;
00779 QueueType* queue = 0;
00780 {
00781 GuardType guard(this->lock_);
00782
00783 if (this->header_.length_ > 0) {
00784
00785
00786 int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00787 int result = this->adjust_packet_after_send(num_bytes_left);
00788
00789 if (result == 0) {
00790 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00791 "The adjustment logic says that the packet is cleared.\n"));
00792
00793 } else {
00794 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00795 "The adjustment returned partial sent.\n"));
00796 }
00797 }
00798
00799 elems = this->elems_;
00800 this->elems_ = new QueueType(1, this->transport_inst_->max_samples_per_packet_);
00801 queue = this->queue_;
00802 this->queue_ = new QueueType(this->transport_inst_->queue_messages_per_pool_,
00803 this->transport_inst_->queue_initial_pools_);
00804
00805 this->header_.length_ = 0;
00806 this->pkt_chain_ = 0;
00807 this->header_complete_ = false;
00808 this->start_counter_ = 0;
00809 this->mode_ = mode;
00810 this->mode_before_suspend_ = MODE_NOT_SET;
00811 }
00812
00813
00814
00815
00816
00817
00818 RemoveAllVisitor remove_all_visitor;
00819
00820 elems->accept_remove_visitor(remove_all_visitor);
00821 queue->accept_remove_visitor(remove_all_visitor);
00822
00823 delete elems;
00824 delete queue;
00825 }
00826
00827 int
00828 TransportSendStrategy::start()
00829 {
00830 DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00831
00832 {
00833 GuardType guard(this->lock_);
00834
00835 if (!this->start_i()) {
00836 return -1;
00837 }
00838 }
00839
00840 size_t header_chunks(1);
00841
00842
00843
00844 if (this->send_buffer_ != 0) {
00845 header_chunks += this->send_buffer_->capacity();
00846
00847 } else {
00848 header_chunks += 1;
00849 }
00850
00851 ACE_NEW_RETURN(this->header_db_allocator_,
00852 TransportDataBlockAllocator(header_chunks),
00853 -1);
00854
00855 ACE_NEW_RETURN(this->header_mb_allocator_,
00856 TransportMessageBlockAllocator(header_chunks),
00857 -1);
00858
00859
00860
00861
00862
00863
00864
00865 this->_add_ref();
00866
00867 if (this->synch_->register_worker(this) == -1) {
00868
00869 this->_remove_ref();
00870 ACE_ERROR_RETURN((LM_ERROR,
00871 "(%P|%t) ERROR: TransportSendStrategy failed to register "
00872 "as a worker with the ThreadSynch object.\n"),
00873 -1);
00874 }
00875
00876 return 0;
00877 }
00878
00879 void
00880 TransportSendStrategy::stop()
00881 {
00882 DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00883
00884 if (this->header_block_ != 0) {
00885 this->header_block_->release ();
00886 this->header_block_ = 0;
00887 }
00888
00889 this->synch_->unregister_worker();
00890
00891
00892
00893 this->_remove_ref();
00894
00895 {
00896 GuardType guard(this->lock_);
00897
00898 if (this->pkt_chain_ != 0) {
00899 size_t size = this->pkt_chain_->total_length();
00900
00901 if (size > 0) {
00902 this->pkt_chain_->release();
00903 ACE_DEBUG((LM_WARNING,
00904 ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00905 ACE_TEXT("terminating with %d unsent bytes.\n"),
00906 size));
00907 }
00908 }
00909 }
00910
00911 delete this->header_mb_allocator_;
00912 delete this->header_db_allocator_;
00913
00914 {
00915 GuardType guard(this->lock_);
00916
00917 this->stop_i();
00918 }
00919
00920
00921
00922 }
00923
00924 void
00925 TransportSendStrategy::send(TransportQueueElement* element, bool relink)
00926 {
00927 if (Transport_debug_level > 9) {
00928 ACE_DEBUG((LM_DEBUG,
00929 ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00930 ACE_TEXT("sending data at 0x%x.\n"),
00931 id(), element));
00932 }
00933
00934 DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00935
00936 {
00937 GuardType guard(this->lock_);
00938
00939 if (this->link_released_) {
00940 this->add_delayed_notification(element);
00941
00942 } else {
00943 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00944 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00945 "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00946 "graceful disconnecting, so discard message.\n"));
00947 element->data_dropped(true);
00948 return;
00949 }
00950
00951 size_t element_length = element->msg()->total_length();
00952
00953 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00954 "Send element msg() has total_length() == [%d].\n",
00955 element_length));
00956
00957 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00958 "this->max_header_size_ == [%d].\n",
00959 this->max_header_size_));
00960
00961 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00962 "this->max_size_ == [%d].\n",
00963 this->max_size_));
00964
00965 const size_t max_message_size = this->max_message_size();
00966
00967
00968
00969
00970
00971
00972 if (max_message_size == 0 &&
00973 this->max_header_size_ + element_length > this->max_size_) {
00974 ACE_ERROR((LM_ERROR,
00975 "(%P|%t) ERROR: Element too large (%Q) "
00976 "- won't fit into packet.\n", ACE_UINT64(element_length)));
00977 return;
00978 }
00979
00980
00981 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00982 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00983 "this->mode_ == %C, so queue elem and leave.\n",
00984 mode_as_str(this->mode_)), 5);
00985
00986 this->queue_->put(element);
00987
00988 if (this->mode_ != MODE_SUSPEND) {
00989 this->synch_->work_available();
00990 }
00991
00992 return;
00993 }
00994
00995 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00996 "this->mode_ == MODE_DIRECT.\n"));
00997
00998
00999
01000
01001
01002
01003
01004
01005
01006
01007
01008
01009
01010
01011
01012
01013
01014
01015
01016
01017
01018
01019
01020 const bool exclusive = element->requires_exclusive_packet();
01021
01022 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01023 "The element %C require an exclusive packet.\n",
01024 (exclusive ? "DOES" : "does NOT")
01025 ));
01026
01027 const size_t space_needed =
01028 (max_message_size > 0)
01029 ? DataSampleHeader::max_marshaled_size() + MIN_FRAG
01030 : element_length;
01031
01032 if ((exclusive && (this->elems_->size() != 0))
01033 || (this->space_available() < space_needed)) {
01034
01035 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01036 "Element won't fit in current packet or requires exclusive"
01037 " - send current packet (directly) now.\n"));
01038
01039 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01040 "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01041 , this->max_header_size_, this->header_.length_, element_length));
01042
01043 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01044 "Tot possible length: %d, max_len: %d\n"
01045 , this->max_header_size_ + this->header_.length_ + element_length
01046 , this->max_size_));
01047 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01048 "current elem size: %d\n"
01049 , this->elems_->size()));
01050
01051
01052
01053
01054
01055
01056 this->direct_send(relink);
01057
01058
01059
01060
01061
01062
01063
01064
01065
01066 if (this->mode_ == MODE_QUEUE) {
01067 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01068 "We experienced backpressure on that direct send, as "
01069 "the mode_ is now MODE_QUEUE or MODE_SUSPEND. "
01070 "Queue elem and leave.\n"), 5);
01071 this->queue_->put(element);
01072 this->synch_->work_available();
01073
01074 return;
01075 }
01076 }
01077
01078
01079 bool first_pkt = true;
01080 for (TransportQueueElement* next_fragment = 0;
01081 (first_pkt || next_fragment)
01082 && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01083
01084
01085 if (next_fragment) {
01086 element = next_fragment;
01087 element_length = next_fragment->msg()->total_length();
01088 this->header_.first_fragment_ = false;
01089 }
01090
01091 this->header_.last_fragment_ = false;
01092 if (max_message_size) {
01093 const size_t avail = this->space_available();
01094 if (element_length > avail) {
01095 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting\n"), 0);
01096 ElementPair ep = element->fragment(avail);
01097 element = ep.first;
01098 element_length = element->msg()->total_length();
01099 next_fragment = ep.second;
01100 this->header_.first_fragment_ = first_pkt;
01101 } else if (next_fragment) {
01102
01103
01104 this->header_.last_fragment_ = true;
01105 next_fragment = 0;
01106 }
01107 }
01108 first_pkt = false;
01109
01110 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01111 "Start the 'append elem' to current packet logic.\n"));
01112
01113 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01114 "Put element into current packet elems_.\n"));
01115
01116
01117
01118
01119
01120
01121 this->elems_->put(element);
01122
01123 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01124 "Before, the header_.length_ == [%d].\n",
01125 this->header_.length_));
01126
01127
01128 this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01129 const size_t message_length = this->header_.length_;
01130
01131 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01132 "After adding element's length, the header_.length_ == [%d].\n",
01133 message_length));
01134
01135
01136
01137
01138
01139
01140
01141
01142
01143
01144
01145
01146
01147
01148
01149 if (next_fragment || (this->elems_->size() >= this->max_samples_)
01150 || (this->max_header_size_ + message_length > this->optimum_size_)
01151 || exclusive) {
01152 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01153 "Now the current packet looks full - send it (directly).\n"));
01154
01155 this->direct_send(relink);
01156
01157 if (next_fragment && this->mode_ != MODE_DIRECT) {
01158 if (this->mode_ == MODE_QUEUE) {
01159 this->queue_->put(next_fragment);
01160 this->synch_->work_available();
01161
01162 } else {
01163 next_fragment->data_dropped(true );
01164 }
01165 } else if (mode_ == MODE_QUEUE) {
01166
01167 this->synch_->work_available();
01168 }
01169
01170 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01171 "Back from the direct_send() attempt.\n"));
01172
01173 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01174 "And we %C as a result of the direct_send() call.\n",
01175 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01176 : "stayed in MODE_DIRECT")));
01177
01178 } else {
01179 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01180 "Packet not sent. Send conditions weren't satisfied.\n"));
01181 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01182 "elems_->size(): %d, max_samples_: %d\n",
01183 int(this->elems_->size()), int(this->max_samples_)));
01184 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01185 "header_size_: %d, optimum_size_: %d\n",
01186 int(this->max_header_size_ + message_length),
01187 int(this->optimum_size_)));
01188 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01189 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01190
01191 if (this->mode_ == MODE_QUEUE) {
01192 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01193 "We flipped into MODE_QUEUE.\n"));
01194
01195 } else {
01196 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01197 "We stayed in MODE_DIRECT.\n"));
01198 }
01199 }
01200 }
01201 }
01202 }
01203
01204 send_delayed_notifications();
01205 }
01206
01207 void
01208 TransportSendStrategy::send_stop(RepoId )
01209 {
01210 DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01211 {
01212 GuardType guard(this->lock_);
01213
01214 if (this->link_released_)
01215 return;
01216
01217 if (this->start_counter_ == 0) {
01218
01219 VDBG_LVL((LM_ERROR,
01220 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01221 return;
01222 }
01223
01224 --this->start_counter_;
01225
01226 if (this->start_counter_ != 0) {
01227
01228
01229
01230 return;
01231 }
01232
01233 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01234 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01235 "TransportSendStrategy::send_stop: dont try to send current packet "
01236 "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01237 return;
01238 }
01239
01240 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01241 "This is an 'important' send_stop() event since our "
01242 "start_counter_ is 0.\n"));
01243
01244
01245
01246
01247
01248
01249
01250
01251
01252
01253
01254
01255
01256 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01257 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01258 "But since we are in %C, we don't have to do "
01259 "anything more in this important send_stop().\n",
01260 mode_as_str(this->mode_)));
01261
01262 return;
01263 }
01264
01265 size_t header_length = this->header_.length_;
01266 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01267 "We are in MODE_DIRECT in an important send_stop() - "
01268 "header_.length_ == [%d].\n", header_length));
01269
01270
01271
01272 if ((header_length > 0) &&
01273
01274 (this->elems_->size() > 0)) {
01275 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01276 "There is something in the current packet - attempt to send "
01277 "it (directly) now.\n"));
01278
01279 this->direct_send(true);
01280 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01281 "Back from the attempt to send leftover packet directly.\n"));
01282
01283 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01284 "But we %C as a result.\n",
01285 ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01286 "stayed in MODE_DIRECT" )));
01287 if (this->mode_ == MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
01288 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01289 "Notify Synch thread of work availability\n"));
01290 this->synch_->work_available();
01291 }
01292 }
01293 }
01294
01295 send_delayed_notifications();
01296 }
01297
01298 void
01299 TransportSendStrategy::remove_all_msgs(RepoId pub_id)
01300 {
01301 DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01302
01303 const TransportQueueElement::MatchOnPubId match(pub_id);
01304 send_delayed_notifications(&match);
01305
01306 GuardType guard(this->lock_);
01307
01308 if (this->send_buffer_ != 0) {
01309
01310
01311 this->send_buffer_->retain_all(pub_id);
01312 }
01313
01314 do_remove_sample(pub_id, match);
01315 }
01316
01317 RemoveResult
01318 TransportSendStrategy::remove_sample(const DataSampleElement* sample)
01319 {
01320 DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01321
01322 VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5);
01323
01324
01325
01326
01327
01328
01329
01330
01331
01332
01333
01334 const char* const payload = sample->get_sample()->cont()->rd_ptr();
01335 RepoId pub_id = sample->get_pub_id();
01336 const TransportQueueElement::MatchOnDataPayload modp(payload);
01337 if (send_delayed_notifications(&modp)) {
01338 return REMOVE_RELEASED;
01339 }
01340
01341 GuardType guard(this->lock_);
01342 return do_remove_sample(pub_id, modp);
01343 }
01344
01345 RemoveResult
01346 TransportSendStrategy::do_remove_sample(const RepoId&,
01347 const TransportQueueElement::MatchCriteria& criteria)
01348 {
01349 DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01350
01351
01352
01353
01354
01355
01356 if ((this->mode_ == MODE_DIRECT)
01357 || ((this->pkt_chain_ == 0) && (this->queue_->size() == 0))) {
01358
01359
01360
01361 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01362 "The mode is MODE_DIRECT, or the queue is empty and no "
01363 "transport packet is in progress.\n"));
01364
01365 QueueRemoveVisitor simple_rem_vis(criteria);
01366 this->elems_->accept_remove_visitor(simple_rem_vis);
01367
01368 const RemoveResult status = simple_rem_vis.status();
01369
01370 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01371 this->header_.length_ -= simple_rem_vis.removed_bytes();
01372
01373 } else if (status == REMOVE_NOT_FOUND) {
01374 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01375 "Failed to find the sample to remove.\n"));
01376 }
01377
01378 return status;
01379 }
01380
01381 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01382 "Visit the queue_ with the RemoveElementVisitor.\n"));
01383
01384 QueueRemoveVisitor simple_rem_vis(criteria);
01385 this->queue_->accept_remove_visitor(simple_rem_vis);
01386
01387 RemoveResult status = simple_rem_vis.status();
01388
01389 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01390 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01391 "The sample was removed from the queue_.\n"));
01392
01393
01394
01395 return status;
01396 }
01397
01398 if (status == REMOVE_ERROR) {
01399 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01400 "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01401
01402
01403
01404 return status;
01405 }
01406
01407 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01408 "The RemoveElementVisitor did not find the sample in queue_.\n"));
01409
01410
01411
01412
01413
01414
01415
01416
01417
01418 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01419 "Visit our elems_ with the PacketRemoveVisitor.\n"));
01420
01421 PacketRemoveVisitor pac_rem_vis(criteria,
01422 this->pkt_chain_,
01423 this->header_block_,
01424 this->replaced_element_allocator_,
01425 this->replaced_element_mb_allocator_,
01426 this->replaced_element_db_allocator_);
01427
01428 this->elems_->accept_replace_visitor(pac_rem_vis);
01429
01430 status = pac_rem_vis.status();
01431
01432 if (status == REMOVE_ERROR) {
01433 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01434 "The PacketRemoveVisitor encountered a fatal error.\n"));
01435
01436 } else if (status == REMOVE_NOT_FOUND) {
01437 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01438 "The PacketRemoveVisitor didn't find the sample.\n"));
01439
01440 } else {
01441 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01442 "The PacketRemoveVisitor found the sample and removed it.\n"));
01443 }
01444
01445 return status;
01446 }
01447
01448 void
01449 TransportSendStrategy::direct_send(bool relink)
01450 {
01451 DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01452
01453 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01454 "Prepare the current packet for a direct send attempt.\n"));
01455
01456
01457 this->prepare_packet();
01458
01459 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01460 "Now attempt to send the packet.\n"));
01461
01462
01463
01464 while (true) {
01465
01466 const SendPacketOutcome outcome = this->send_packet();
01467
01468 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01469 "The outcome of the send_packet() was %d.\n", outcome));
01470
01471 if ((outcome == OUTCOME_BACKPRESSURE) ||
01472 (outcome == OUTCOME_PARTIAL_SEND)) {
01473 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01474 "The outcome of the send_packet() was either "
01475 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01476
01477 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01478 "Flip into the MODE_QUEUE mode_.\n"), 5);
01479
01480
01481 this->mode_ = MODE_QUEUE;
01482
01483 } else if ((outcome == OUTCOME_PEER_LOST) ||
01484 (outcome == OUTCOME_SEND_ERROR)) {
01485 if (outcome == OUTCOME_SEND_ERROR) {
01486 ACE_ERROR((LM_WARNING,
01487 ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01488 ACE_TEXT("send buffer management: %p.\n"),
01489 ACE_TEXT("send_bytes")));
01490
01491 if (Transport_debug_level > 0) {
01492 this->transport_inst_->dump();
01493 }
01494 } else {
01495 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01496 "The outcome of the send_packet() was "
01497 "OUTCOME_PEER_LOST.\n"));
01498 }
01499
01500 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01501 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01502
01503 if (this->mode_ != MODE_SUSPEND) {
01504 this->mode_before_suspend_ = this->mode_;
01505 this->mode_ = MODE_SUSPEND;
01506 }
01507
01508 if (relink) {
01509 bool do_suspend = false;
01510 this->relink(do_suspend);
01511
01512 if (this->mode_ == MODE_SUSPEND) {
01513 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01514 "The reconnect has not done yet and we are "
01515 "still in MODE_SUSPEND.\n"), 5);
01516
01517 } else if (this->mode_ == MODE_TERMINATED) {
01518 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01519 "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01520
01521 } else {
01522 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01523 "Try send the packet again since the connection "
01524 "is re-established.\n"), 5);
01525
01526
01527 continue;
01528 }
01529 }
01530
01531 } else {
01532 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01533 "The outcome of the send_packet() must have been "
01534 "OUTCOME_COMPLETE_SEND.\n"));
01535 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01536 "So, we will just stay in MODE_DIRECT.\n"));
01537 }
01538
01539 break;
01540 }
01541
01542
01543 }
01544
01545 void
01546 TransportSendStrategy::get_packet_elems_from_queue()
01547 {
01548 DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01549
01550 for (TransportQueueElement* element = this->queue_->peek(); element != 0;
01551 element = this->queue_->peek()) {
01552
01553
01554 size_t element_length = element->msg()->total_length();
01555
01556
01557 const bool exclusive_packet = element->requires_exclusive_packet();
01558
01559 const size_t avail = this->space_available();
01560
01561 bool frag = false;
01562 if (element_length > avail) {
01563
01564 if (this->max_message_size()) {
01565 this->header_.first_fragment_ = !element->is_fragment();
01566 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0);
01567 ElementPair ep = element->fragment(avail);
01568 element = ep.first;
01569 element_length = element->msg()->total_length();
01570 this->queue_->replace_head(ep.second);
01571 frag = true;
01572 } else {
01573 break;
01574 }
01575 }
01576
01577
01578
01579
01580 if ((exclusive_packet && this->elems_->size() == 0)
01581 || !exclusive_packet) {
01582
01583
01584
01585 this->elems_->put(frag ? element : this->queue_->get());
01586 if (this->header_.length_ == 0) {
01587 this->header_.last_fragment_ = !frag && element->is_fragment();
01588 }
01589 this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01590 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0);
01591 }
01592
01593
01594
01595
01596
01597
01598
01599
01600 if (exclusive_packet || frag
01601
01602
01603 || this->elems_->size() == this->max_samples_
01604
01605
01606 || this->header_.length_ >= this->optimum_size_) {
01607 break;
01608 }
01609 }
01610 }
01611
01612 void
01613 TransportSendStrategy::prepare_header()
01614 {
01615 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01616
01617
01618 this->header_.sequence_ = ++this->header_sequence_;
01619
01620
01621
01622 this->prepare_header_i();
01623 }
01624
01625 void
01626 TransportSendStrategy::prepare_header_i()
01627 {
01628 DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01629
01630
01631 }
01632
01633 void
01634 TransportSendStrategy::prepare_packet()
01635 {
01636 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01637
01638
01639 this->prepare_header();
01640
01641 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01642 "Marshal the packet header.\n"));
01643
01644 if (this->header_block_ != 0) {
01645 this->header_block_->release();
01646 }
01647
01648 ACE_NEW_MALLOC(this->header_block_,
01649 static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01650 ACE_Message_Block(this->max_header_size_,
01651 ACE_Message_Block::MB_DATA,
01652 0,
01653 0,
01654 0,
01655 0,
01656 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01657 ACE_Time_Value::zero,
01658 ACE_Time_Value::max_time,
01659 this->header_db_allocator_,
01660 this->header_mb_allocator_));
01661
01662 marshal_transport_header(this->header_block_);
01663
01664 this->pkt_chain_ = this->header_block_->duplicate();
01665
01666 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01667 "Use a BuildChainVisitor to visit the packet elems_.\n"));
01668
01669
01670
01671
01672 BuildChainVisitor visitor;
01673 this->elems_->accept_visitor(visitor);
01674
01675 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01676 "Attach the visitor's chain of blocks to the lone (packet "
01677 "header) block currently in the pkt_chain_.\n"));
01678
01679
01680 this->pkt_chain_->cont(visitor.chain());
01681
01682 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01683 "Increment header sequence for next packet.\n"));
01684
01685
01686
01687 this->prepare_packet_i();
01688
01689 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01690 "Set the header_complete_ flag to false.\n"));
01691
01692
01693
01694
01695 this->header_complete_ = false;
01696 }
01697
01698 void
01699 TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
01700 {
01701 *mb << this->header_;
01702 }
01703
01704 void
01705 TransportSendStrategy::prepare_packet_i()
01706 {
01707 DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01708
01709
01710 }
01711
01712 void
01713 TransportSendStrategy::set_graceful_disconnecting(bool flag)
01714 {
01715 this->graceful_disconnecting_ = flag;
01716 }
01717
01718 ssize_t
01719 TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
01720 {
01721 if (Transport_debug_level > 9) {
01722 ACE_DEBUG((LM_DEBUG,
01723 ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01724 ACE_TEXT("sending data at 0x%x.\n"),
01725 id(), packet));
01726 }
01727 DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01728
01729 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01730 "Populate the iovec array using the packet.\n"), 5);
01731
01732 iovec iov[MAX_SEND_BLOCKS];
01733
01734 int num_blocks = mb_to_iov(*packet, iov);
01735
01736 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01737 "There are [%d] number of entries in the iovec array.\n",
01738 num_blocks), 5);
01739
01740 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01741 "Attempt to send_bytes() now.\n"), 5);
01742
01743 const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01744
01745 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01746 "The send_bytes() said that num_bytes_sent == [%d].\n",
01747 num_bytes_sent), 5);
01748
01749 return num_bytes_sent;
01750 }
01751
01752 TransportSendStrategy::SendPacketOutcome
01753 TransportSendStrategy::send_packet()
01754 {
01755 DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01756
01757 int bp_flag = 0;
01758 const ssize_t num_bytes_sent =
01759 this->do_send_packet(this->pkt_chain_, bp_flag);
01760
01761 if (num_bytes_sent == 0) {
01762 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01763 "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01764
01765 return OUTCOME_PEER_LOST;
01766 }
01767
01768 if (num_bytes_sent < 0) {
01769 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01770 "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01771
01772
01773 if (bp_flag == 1) {
01774 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01775 "Since backpressure flag is true, return "
01776 "OUTCOME_BACKPRESSURE.\n"), 5);
01777
01778 return OUTCOME_BACKPRESSURE;
01779 }
01780
01781 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01782 "Since backpressure flag is false, return "
01783 "OUTCOME_SEND_ERROR.\n"), 5);
01784
01785
01786
01787
01788
01789
01790
01791 return OUTCOME_SEND_ERROR;
01792 }
01793
01794 if (this->send_buffer_ != 0) {
01795
01796
01797 this->send_buffer_->insert(this->header_.sequence_,
01798 this->elems_, this->pkt_chain_);
01799 }
01800
01801 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01802 "Since num_bytes_sent > 0, adjust the packet to account for "
01803 "the bytes that did get sent.\n"),5);
01804
01805
01806
01807 const int result =
01808 this->adjust_packet_after_send(num_bytes_sent);
01809
01810 if (result == 0) {
01811 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01812 "The adjustment logic says that the complete packet was "
01813 "sent. Return OUTCOME_COMPLETE_SEND.\n"));
01814 return OUTCOME_COMPLETE_SEND;
01815 }
01816
01817 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01818 "The adjustment logic says that only a part of the packet was "
01819 "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01820
01821 return OUTCOME_PARTIAL_SEND;
01822 }
01823
01824 ssize_t
01825 TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
01826 {
01827 int val = 0;
01828 ACE_HANDLE handle = this->get_handle();
01829
01830 if (handle == ACE_INVALID_HANDLE)
01831 return -1;
01832
01833 ACE::record_and_set_non_blocking_mode(handle, val);
01834
01835
01836 bp = 0;
01837
01838
01839 errno = 0;
01840
01841 ssize_t result = this->send_bytes_i(iov, n);
01842
01843 if (result == -1) {
01844 if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01845 VDBG((LM_DEBUG,"(%P|%t) DBG: "
01846 "Backpressure encountered.\n"));
01847
01848 bp = 1;
01849
01850 } else {
01851 VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01852 ACE_TEXT("sendv"), n),1);
01853
01854
01855
01856 for (int ii = 0; ii < n; ii++) {
01857 ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01858 ii, iov[ii].iov_len, iov[ii].iov_base));
01859 }
01860 }
01861 }
01862
01863 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
01864 "The sendv() returned [%d].\n", result), 5);
01865
01866 ACE::restore_non_blocking_mode(handle, val);
01867
01868 return result;
01869 }
01870
01871 void
01872 TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
01873 {
01874 if (Transport_debug_level) {
01875 size_t size = this->delayed_delivered_notification_queue_.size();
01876 if ((size > 0) && (size % this->max_samples_ == 0)) {
01877 ACE_DEBUG((LM_DEBUG,
01878 "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01879 size));
01880 }
01881 }
01882
01883 this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01884 }
01885
01886 size_t
01887 TransportSendStrategy::space_available() const
01888 {
01889 const size_t used = this->max_header_size_ + this->header_.length_,
01890 max_msg = this->max_message_size();
01891 if (max_msg) {
01892 return std::min(this->max_size_ - used, max_msg - used);
01893 }
01894 return this->max_size_ - used;
01895 }
01896
01897 int
01898 TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
01899 {
01900 int num_blocks = 0;
01901 #ifdef _MSC_VER
01902 #pragma warning(push)
01903
01904
01905 #pragma warning(disable : 4267)
01906 #endif
01907 for (const ACE_Message_Block* block = &msg;
01908 block && num_blocks < MAX_SEND_BLOCKS;
01909 block = block->cont()) {
01910 iov[num_blocks].iov_len = block->length();
01911 iov[num_blocks++].iov_base = block->rd_ptr();
01912 }
01913 #ifdef _MSC_VER
01914 #pragma warning(pop)
01915 #endif
01916 return num_blocks;
01917 }
01918
01919
01920 }
01921 }