00001
00002
00003
00004
00005
00006
00007
00008 #include "DCPS/DdsDcps_pch.h"
00009 #include "TransportSendStrategy.h"
00010 #include "RemoveAllVisitor.h"
00011 #include "TransportInst.h"
00012 #include "ThreadSynchStrategy.h"
00013 #include "ThreadSynchResource.h"
00014 #include "TransportQueueElement.h"
00015 #include "TransportSendElement.h"
00016 #include "TransportSendBuffer.h"
00017 #include "BuildChainVisitor.h"
00018 #include "QueueRemoveVisitor.h"
00019 #include "PacketRemoveVisitor.h"
00020 #include "TransportDefs.h"
00021 #include "DirectPriorityMapper.h"
00022 #include "dds/DCPS/DataSampleHeader.h"
00023 #include "dds/DCPS/DataSampleElement.h"
00024 #include "dds/DCPS/Service_Participant.h"
00025 #include "EntryExit.h"
00026
00027 #include "ace/Reverse_Lock_T.h"
00028
00029 #if !defined (__ACE_INLINE__)
00030 #include "TransportSendStrategy.inl"
00031 #endif
00032
00033 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00034
00035 namespace OpenDDS {
00036 namespace DCPS {
00037
00038
00039
00040
00041
00042
00043 #define NUM_REPLACED_ELEMENT_CHUNKS 20
00044
00045 namespace {
00046
00047
00048
00049
00050 static const size_t MIN_FRAG = 68;
00051 }
00052
00053
00054
00055
00056
00057
00058 TransportSendStrategy::TransportSendStrategy(
00059 std::size_t id,
00060 TransportImpl& transport,
00061 ThreadSynchResource* synch_resource,
00062 Priority priority,
00063 const ThreadSynchStrategy_rch& thread_sync_strategy)
00064 : ThreadSynchWorker(id),
00065 max_samples_(transport.config().max_samples_per_packet_),
00066 optimum_size_(transport.config().optimum_packet_size_),
00067 max_size_(transport.config().max_packet_size_),
00068 max_header_size_(0),
00069 header_block_(0),
00070 pkt_chain_(0),
00071 header_complete_(false),
00072 start_counter_(0),
00073 mode_(MODE_DIRECT),
00074 mode_before_suspend_(MODE_NOT_SET),
00075 lock_(),
00076 replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00077 replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00078 transport_(transport),
00079 graceful_disconnecting_(false),
00080 link_released_(true),
00081 send_buffer_(0)
00082 {
00083 DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00084
00085
00086 DirectPriorityMapper mapper(priority);
00087 this->synch_.reset(thread_sync_strategy->create_synch_object(
00088 synch_resource,
00089 #ifdef ACE_WIN32
00090 ACE_DEFAULT_THREAD_PRIORITY,
00091 #else
00092 mapper.thread_priority(),
00093 #endif
00094 TheServiceParticipant->scheduler()));
00095
00096
00097
00098 this->max_header_size_ = TransportHeader::max_marshaled_size();
00099
00100 delayed_delivered_notification_queue_.reserve(this->max_samples_);
00101 }
00102
00103 TransportSendStrategy::~TransportSendStrategy()
00104 {
00105 DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00106
00107
00108 this->delayed_delivered_notification_queue_.clear();
00109 }
00110
00111 void
00112 TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
00113 {
00114 this->send_buffer_ = send_buffer;
00115
00116 if (this->send_buffer_ != 0) {
00117 this->send_buffer_->bind(this);
00118 }
00119 }
00120
00121 ThreadSynchWorker::WorkOutcome
00122 TransportSendStrategy::perform_work()
00123 {
00124 DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00125
00126 SendPacketOutcome outcome;
00127 bool no_more_work = false;
00128
00129 {
00130 GuardType guard(this->lock_);
00131
00132 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00133
00134 if (this->mode_ == MODE_TERMINATED) {
00135 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00136 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00137 "we lost connection and could not reconnect, just return "
00138 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00139 return WORK_OUTCOME_BROKEN_RESOURCE;
00140 }
00141
00142
00143
00144
00145
00146
00147
00148
00149
00150
00151
00152
00153
00154
00155
00156
00157
00158 if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00159 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00160 "Entered perform_work() and mode_ is %C - just return "
00161 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00162 return WORK_OUTCOME_NO_MORE_TO_DO;
00163 }
00164
00165
00166
00167
00168
00169
00170
00171
00172
00173 const size_t header_length = this->header_.length_;
00174
00175 if (header_length == 0) {
00176 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00177 "The current packet doesn't have any unsent bytes - we "
00178 "need to 'populate' the current packet with elems from "
00179 "the queue.\n"), 5);
00180
00181
00182
00183
00184
00185
00186 if (this->queue_.size() == 0) {
00187 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00188 "But the queue is empty. We have cleared the "
00189 "backpressure situation.\n"),5);
00190
00191
00192
00193
00194
00195 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00196 "Flip mode to MODE_DIRECT, and return "
00197 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00198
00199
00200 this->mode_ = MODE_DIRECT;
00201
00202
00203
00204 return WORK_OUTCOME_NO_MORE_TO_DO;
00205 }
00206
00207 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00208 "There is at least one elem in the queue - get the packet "
00209 "elems from the queue.\n"), 5);
00210
00211
00212
00213 this->get_packet_elems_from_queue();
00214
00215 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00216 "Prepare the packet from the packet elems_.\n"), 5);
00217
00218
00219 this->prepare_packet();
00220
00221 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00222 "Packet has been prepared from packet elems_.\n"), 5);
00223
00224 } else {
00225 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00226 "We have a current packet that still has unsent bytes.\n"), 5);
00227 }
00228
00229 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00230 "Attempt to send the current packet.\n"), 5);
00231
00232
00233
00234
00235
00236
00237 outcome = this->send_packet();
00238
00239
00240
00241 if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_.size() == 0)) {
00242 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00243 "Flip the mode to MODE_DIRECT, and then return "
00244 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00245
00246
00247 this->mode_ = MODE_DIRECT;
00248 no_more_work = true;
00249 }
00250 }
00251
00252 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00253 "The outcome of the send_packet() was %d.\n", outcome), 5);
00254
00255 send_delayed_notifications();
00256
00257
00258
00259 if (no_more_work) {
00260 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00261 "We sent the whole packet, and there is nothing left on "
00262 "the queue now.\n"), 5);
00263
00264
00265
00266 return WORK_OUTCOME_NO_MORE_TO_DO;
00267 }
00268
00269 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00270 "We still have unsent bytes in the current packet AND/OR there "
00271 "are still elements in the queue.\n"), 5);
00272
00273 if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00274 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00275 "We lost our connection, or had some fatal connection "
00276 "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00277
00278 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00279 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00280
00281 bool do_suspend = true;
00282 this->relink(do_suspend);
00283
00284 if (this->mode_ == MODE_SUSPEND) {
00285 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00286 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00287 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00288
00289
00290 return WORK_OUTCOME_NO_MORE_TO_DO;
00291
00292 } else if (this->mode_ == MODE_TERMINATED) {
00293 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00294 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00295 return WORK_OUTCOME_BROKEN_RESOURCE;
00296
00297 } else {
00298 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00299 "Reconnect succeeded, Notify synch thread of work "
00300 "availability.\n"), 5);
00301
00302
00303
00304 this->synch_->work_available();
00305 }
00306 }
00307
00308 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00309 "We still have an 'unbroken' connection.\n"), 5);
00310
00311 if (outcome == OUTCOME_BACKPRESSURE) {
00312 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00313 "We experienced backpressure on our attempt to send the "
00314 "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00315
00316 return WORK_OUTCOME_CLOGGED_RESOURCE;
00317 }
00318
00319 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00320 "We may have sent the whole current packet, but still have "
00321 "elements on the queue.\n"), 5);
00322 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00323 "Or, we may have only partially sent the current packet.\n"), 5);
00324
00325 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00326 "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00327
00328
00329
00330
00331
00332
00333
00334
00335
00336 return WORK_OUTCOME_MORE_TO_DO;
00337 }
00338
00339
00340
00341
00342
00343
00344
00345
00346
00347
00348
00349
00350
00351
00352 int
00353 TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
00354 {
00355 DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00356
00357 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00358 "Adjusting the current packet because %d bytes of the packet "
00359 "have been sent.\n", num_bytes_sent));
00360
00361 ssize_t num_bytes_left = num_bytes_sent;
00362 ssize_t num_non_header_bytes_sent = 0;
00363
00364 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00365 "Set num_bytes_left to %d.\n", num_bytes_left));
00366 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00367 "Set num_non_header_bytes_sent to %d.\n",
00368 num_non_header_bytes_sent));
00369
00370 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00371 "Peek at the element at the front of the packet elems_.\n"));
00372
00373
00374 TransportQueueElement* element = this->elems_.peek();
00375
00376 if(!element){
00377 ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00378 } else {
00379 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00380 "Use the element's msg() to find the last block in "
00381 "the msg() chain.\n"));
00382
00383
00384 const ACE_Message_Block* elem_tail_block = element->msg();
00385
00386 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00387 "Start with tail block == element->msg().\n"));
00388
00389 while (elem_tail_block->cont() != 0) {
00390 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00391 "Set tail block to its cont() block (next in chain).\n"));
00392 elem_tail_block = elem_tail_block->cont();
00393 }
00394
00395 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00396 "Tail block now set (because tail block's cont() is 0).\n"));
00397
00398 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00399 "Start the 'while (num_bytes_left > 0)' loop.\n"));
00400
00401 while (num_bytes_left > 0) {
00402 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00403 "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
00404 num_bytes_left));
00405
00406 const int block_length = static_cast<int>(this->pkt_chain_->length());
00407
00408 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00409 "Length of block at front of pkt_chain_ is [%d].\n",
00410 block_length));
00411
00412 if (block_length <= num_bytes_left) {
00413 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00414 "The whole block at the front of pkt_chain_ was sent.\n"));
00415
00416
00417
00418
00419
00420 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00421 "Extract the fully sent block from the pkt_chain_.\n"));
00422
00423 ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00424
00425 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00426 "Set pkt_chain_ to pkt_chain_->cont().\n"));
00427
00428 this->pkt_chain_ = this->pkt_chain_->cont();
00429
00430 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00431 "Set the fully sent block's cont() to 0.\n"));
00432
00433 fully_sent_block->cont(0);
00434
00435
00436
00437 num_bytes_left -= block_length;
00438
00439 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00440 "Updated num_bytes_left to account for fully sent "
00441 "block (block_length == [%d]).\n", block_length));
00442 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00443 "Now, num_bytes_left == [%d].\n", num_bytes_left));
00444
00445 if (!this->header_complete_) {
00446 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00447 "Since the header_complete_ flag is false, it means "
00448 "that the packet header block was still in the "
00449 "pkt_chain_.\n"));
00450
00451 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00452 "Not anymore... Set the header_complete_ flag "
00453 "to true.\n"));
00454
00455
00456
00457 this->header_complete_ = true;
00458
00459 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00460 "Release the fully sent block.\n"));
00461
00462
00463 fully_sent_block->release();
00464
00465 } else {
00466 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00467 "Since the header_complete_ flag is true, it means "
00468 "that the packet header block was not in the "
00469 "pkt_chain_.\n"));
00470 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00471 "So, the fully sent block was part of an element.\n"));
00472
00473
00474
00475
00476
00477
00478 num_non_header_bytes_sent += block_length;
00479
00480 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00481 "Updated num_non_header_bytes_sent to account for "
00482 "fully sent block (block_length == [%d]).\n",
00483 block_length));
00484
00485 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00486 "Now, num_non_header_bytes_sent == [%d].\n",
00487 num_non_header_bytes_sent));
00488
00489 if (fully_sent_block->base() == elem_tail_block->base()) {
00490 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00491 "Ok. The fully sent block was a duplicate of "
00492 "the tail block of the element that is at the "
00493 "front of the packet elems_.\n"));
00494
00495 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00496 "This means that we have completely sent the "
00497 "element at the front of the packet elems_.\n"));
00498
00499
00500
00501
00502 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00503 "We can release the fully sent block now.\n"));
00504
00505
00506 fully_sent_block->release();
00507
00508 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00509 "We can extract the element from the front of "
00510 "the packet elems_ (we were just peeking).\n"));
00511
00512
00513 element = this->elems_.get();
00514
00515 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00516 "Tell the element that a decision has been made "
00517 "regarding its fate - data_delivered().\n"));
00518
00519
00520 this->add_delayed_notification(element);
00521
00522 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00523 "Peek at the next element in the packet "
00524 "elems_.\n"));
00525
00526
00527 element = this->elems_.peek();
00528
00529 if (element != 0) {
00530 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00531 "The is an element still in the packet "
00532 "elems_ (we are peeking at it now).\n"));
00533
00534 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00535 "We are going to find the tail block for the "
00536 "current element (we are peeking at).\n"));
00537
00538
00539
00540 elem_tail_block = element->msg();
00541
00542 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00543 "Start w/tail block == element->msg().\n"));
00544
00545 while (elem_tail_block->cont() != 0) {
00546 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00547 "Set tail block to next in chain.\n"));
00548 elem_tail_block = elem_tail_block->cont();
00549 }
00550
00551 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00552 "Done finding tail block.\n"));
00553 }
00554
00555 } else {
00556 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00557 "Ok. The fully sent block is *not* a "
00558 "duplicate of the tail block of the element "
00559 "at the front of the packet elems_.\n"));
00560
00561 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00562 "Thus, we have not completely sent the "
00563 "element yet.\n"));
00564
00565
00566
00567
00568 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00569 "We can release the fully_sent_block now.\n"));
00570
00571
00572 fully_sent_block->release();
00573 }
00574 }
00575
00576 } else {
00577 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00578 "Only part of the block at the front of pkt_chain_ "
00579 "was sent.\n"));
00580
00581 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00582 "Advance the rd_ptr() of the front block (of pkt_chain_) "
00583 "by the num_bytes_left (%d).\n", num_bytes_left));
00584
00585
00586 this->pkt_chain_->rd_ptr(num_bytes_left);
00587
00588 if (this->header_complete_) {
00589 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00590 "And since the packet header block has already been "
00591 "completely sent, add num_bytes_left to the "
00592 "num_non_header_bytes_sent.\n"));
00593
00594 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00595 "Before, num_non_header_bytes_sent == %d.\n",
00596 num_non_header_bytes_sent));
00597
00598
00599
00600
00601
00602 num_non_header_bytes_sent += num_bytes_left;
00603
00604 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00605 "After, num_non_header_bytes_sent == %d.\n",
00606 num_non_header_bytes_sent));
00607 }
00608
00609 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00610 "Set the num_bytes_left to 0 now.\n"));
00611
00612 num_bytes_left = 0;
00613 }
00614 }
00615 }
00616
00617 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00618 "The 'num_bytes_left' loop has completed.\n"));
00619
00620 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00621 "Adjust the header_.length_ to account for the "
00622 "num_non_header_bytes_sent.\n"));
00623 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00624 "Before, header_.length_ == %d.\n",
00625 this->header_.length_));
00626
00627
00628
00629 this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00630
00631 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00632 "After, header_.length_ == %d.\n",
00633 this->header_.length_));
00634
00635
00636 int rc = (this->header_.length_ == 0) ? 0 : 1;
00637
00638 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00639 "Adjustments all done. Returning [%d]. 0 means entire packet "
00640 "has been sent. 1 means otherwise.\n",
00641 rc));
00642
00643 return rc;
00644 }
00645
00646 bool
00647 TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
00648 {
00649 DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00650 TransportQueueElement* sample = 0;
00651 SendMode mode = MODE_NOT_SET;
00652
00653 OPENDDS_VECTOR(TQESendModePair) samples;
00654
00655 size_t num_delayed_notifications = 0;
00656 bool found_element = false;
00657
00658 {
00659 GuardType guard(lock_);
00660
00661 num_delayed_notifications = delayed_delivered_notification_queue_.size();
00662
00663 if (num_delayed_notifications == 0) {
00664 return false;
00665
00666 } else if (num_delayed_notifications == 1) {
00667
00668
00669 if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00670 found_element = true;
00671 sample = delayed_delivered_notification_queue_[0].first;
00672 mode = delayed_delivered_notification_queue_[0].second;
00673
00674 delayed_delivered_notification_queue_.clear();
00675 }
00676
00677 } else {
00678 OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00679 for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00680 sample = iter->first;
00681 mode = iter->second;
00682 if (!match || match->matches(*sample)) {
00683 found_element = true;
00684 samples.push_back(*iter);
00685 iter = delayed_delivered_notification_queue_.erase(iter);
00686 } else {
00687 ++iter;
00688 }
00689 }
00690 }
00691 }
00692
00693 if (!found_element)
00694 return false;
00695
00696 bool transport_shutdown = this->transport_.is_shut_down();
00697
00698 if (num_delayed_notifications == 1) {
00699
00700 if (mode == MODE_TERMINATED) {
00701 if (!transport_shutdown || sample->owned_by_transport()) {
00702 sample->data_dropped(true);
00703 }
00704 } else {
00705 if (!transport_shutdown || sample->owned_by_transport()) {
00706 sample->data_delivered();
00707 }
00708 }
00709
00710 } else {
00711 for (size_t i = 0; i < samples.size(); ++i) {
00712 if (samples[i].second == MODE_TERMINATED) {
00713 if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00714 samples[i].first->data_dropped(true);
00715 }
00716 } else {
00717 if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00718 samples[i].first->data_delivered();
00719 }
00720 }
00721 }
00722 }
00723 return true;
00724 }
00725
00726
00727 void
00728 TransportSendStrategy::terminate_send(bool graceful_disconnecting)
00729 {
00730 DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00731
00732 bool reset_flag = true;
00733
00734 {
00735 GuardType guard(this->lock_);
00736
00737
00738
00739
00740 if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00741 && !this->graceful_disconnecting_) {
00742 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00743 "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00744 reset_flag = false;
00745 }
00746 }
00747
00748 VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED \n"));
00749
00750 this->clear(MODE_TERMINATED);
00751
00752 if (reset_flag) {
00753 GuardType guard(this->lock_);
00754 this->graceful_disconnecting_ = graceful_disconnecting;
00755 }
00756 }
00757
00758 void
00759 TransportSendStrategy::clear(SendMode mode)
00760 {
00761 DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00762
00763 send_delayed_notifications();
00764 QueueType elems;
00765 QueueType queue;
00766 {
00767 GuardType guard(this->lock_);
00768
00769 if (this->header_.length_ > 0) {
00770
00771
00772 int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00773 int result = this->adjust_packet_after_send(num_bytes_left);
00774
00775 if (result == 0) {
00776 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00777 "The adjustment logic says that the packet is cleared.\n"));
00778
00779 } else {
00780 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00781 "The adjustment returned partial sent.\n"));
00782 }
00783 }
00784
00785 elems.swap(this->elems_);
00786 queue.swap(this->queue_);
00787
00788 this->header_.length_ = 0;
00789 this->pkt_chain_ = 0;
00790 this->header_complete_ = false;
00791 this->start_counter_ = 0;
00792 this->mode_ = mode;
00793 this->mode_before_suspend_ = MODE_NOT_SET;
00794 }
00795
00796
00797
00798
00799
00800
00801 RemoveAllVisitor remove_all_visitor;
00802
00803 elems.accept_remove_visitor(remove_all_visitor);
00804 queue.accept_remove_visitor(remove_all_visitor);
00805 }
00806
00807 int
00808 TransportSendStrategy::start()
00809 {
00810 DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00811
00812 {
00813 GuardType guard(this->lock_);
00814
00815 if (!this->start_i()) {
00816 return -1;
00817 }
00818 }
00819
00820 size_t header_chunks(1);
00821
00822
00823
00824 if (this->send_buffer_ != 0) {
00825 header_chunks += this->send_buffer_->capacity();
00826
00827 } else {
00828 header_chunks += 1;
00829 }
00830
00831 this->header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
00832 this->header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
00833
00834
00835
00836
00837
00838
00839
00840 if (this->synch_->register_worker(*this) == -1) {
00841
00842 ACE_ERROR_RETURN((LM_ERROR,
00843 "(%P|%t) ERROR: TransportSendStrategy failed to register "
00844 "as a worker with the ThreadSynch object.\n"),
00845 -1);
00846 }
00847
00848 return 0;
00849 }
00850
00851 void
00852 TransportSendStrategy::stop()
00853 {
00854 DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00855
00856 if (this->header_block_ != 0) {
00857 this->header_block_->release ();
00858 this->header_block_ = 0;
00859 }
00860
00861 this->synch_->unregister_worker();
00862
00863 {
00864 GuardType guard(this->lock_);
00865
00866 if (this->pkt_chain_ != 0) {
00867 size_t size = this->pkt_chain_->total_length();
00868
00869 if (size > 0) {
00870 this->pkt_chain_->release();
00871 ACE_DEBUG((LM_WARNING,
00872 ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00873 ACE_TEXT("terminating with %d unsent bytes.\n"),
00874 size));
00875 }
00876 }
00877 }
00878
00879 {
00880 GuardType guard(this->lock_);
00881
00882 this->stop_i();
00883 }
00884
00885
00886
00887 }
00888
00889 void
00890 TransportSendStrategy::send(TransportQueueElement* element, bool relink)
00891 {
00892 if (Transport_debug_level > 9) {
00893 ACE_DEBUG((LM_DEBUG,
00894 ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00895 ACE_TEXT("sending data at 0x%x.\n"),
00896 id(), element));
00897 }
00898
00899 DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00900
00901 {
00902 GuardType guard(this->lock_);
00903
00904 if (this->link_released_) {
00905 this->add_delayed_notification(element);
00906
00907 } else {
00908 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00909 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00910 "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00911 "graceful disconnecting, so discard message.\n"));
00912 element->data_dropped(true);
00913 return;
00914 }
00915
00916 size_t element_length = element->msg()->total_length();
00917
00918 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00919 "Send element msg() has total_length() == [%d].\n",
00920 element_length));
00921
00922 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00923 "this->max_header_size_ == [%d].\n",
00924 this->max_header_size_));
00925
00926 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00927 "this->max_size_ == [%d].\n",
00928 this->max_size_));
00929
00930 const size_t max_message_size = this->max_message_size();
00931
00932
00933
00934
00935
00936
00937 if (max_message_size == 0 &&
00938 this->max_header_size_ + element_length > this->max_size_) {
00939 ACE_ERROR((LM_ERROR,
00940 "(%P|%t) ERROR: Element too large (%Q) "
00941 "- won't fit into packet.\n", ACE_UINT64(element_length)));
00942 return;
00943 }
00944
00945
00946 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00947 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
00948 "this->mode_ == %C, so queue elem and leave.\n",
00949 mode_as_str(this->mode_)), 5);
00950
00951 this->queue_.put(element);
00952
00953 if (this->mode_ != MODE_SUSPEND) {
00954 this->synch_->work_available();
00955 }
00956
00957 return;
00958 }
00959
00960 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00961 "this->mode_ == MODE_DIRECT.\n"));
00962
00963
00964
00965
00966
00967
00968
00969
00970
00971
00972
00973
00974
00975
00976
00977
00978
00979
00980
00981
00982
00983
00984
00985 const bool exclusive = element->requires_exclusive_packet();
00986
00987 VDBG((LM_DEBUG, "(%P|%t) DBG: "
00988 "The element %C require an exclusive packet.\n",
00989 (exclusive ? "DOES" : "does NOT")
00990 ));
00991
00992 const size_t space_needed =
00993 (max_message_size > 0)
00994 ? DataSampleHeader::max_marshaled_size() + MIN_FRAG
00995 : element_length;
00996
00997 if ((exclusive && (this->elems_.size() != 0))
00998 || (this->space_available() < space_needed)) {
00999
01000 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01001 "Element won't fit in current packet or requires exclusive"
01002 " - send current packet (directly) now.\n"));
01003
01004 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01005 "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01006 , this->max_header_size_, this->header_.length_, element_length));
01007
01008 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01009 "Tot possible length: %d, max_len: %d\n"
01010 , this->max_header_size_ + this->header_.length_ + element_length
01011 , this->max_size_));
01012 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01013 "current elem size: %d\n"
01014 , this->elems_.size()));
01015
01016
01017
01018
01019
01020
01021 this->direct_send(relink);
01022
01023
01024
01025
01026
01027
01028
01029
01030
01031 if (this->mode_ == MODE_QUEUE) {
01032 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01033 "We experienced backpressure on that direct send, as "
01034 "the mode_ is now MODE_QUEUE or MODE_SUSPEND. "
01035 "Queue elem and leave.\n"), 5);
01036 this->queue_.put(element);
01037 this->synch_->work_available();
01038
01039 return;
01040 }
01041 }
01042
01043
01044 bool first_pkt = true;
01045 for (TransportQueueElement* next_fragment = 0;
01046 (first_pkt || next_fragment)
01047 && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01048
01049
01050 if (next_fragment) {
01051 element = next_fragment;
01052 element_length = next_fragment->msg()->total_length();
01053 this->header_.first_fragment_ = false;
01054 }
01055
01056 this->header_.last_fragment_ = false;
01057 if (max_message_size) {
01058 const size_t avail = this->space_available();
01059 if (element_length > avail) {
01060 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting\n"), 0);
01061 ElementPair ep = element->fragment(avail);
01062 element = ep.first;
01063 element_length = element->msg()->total_length();
01064 next_fragment = ep.second;
01065 this->header_.first_fragment_ = first_pkt;
01066 } else if (next_fragment) {
01067
01068
01069 this->header_.last_fragment_ = true;
01070 next_fragment = 0;
01071 }
01072 }
01073 first_pkt = false;
01074
01075 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01076 "Start the 'append elem' to current packet logic.\n"));
01077
01078 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01079 "Put element into current packet elems_.\n"));
01080
01081
01082
01083
01084
01085
01086 this->elems_.put(element);
01087
01088 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01089 "Before, the header_.length_ == [%d].\n",
01090 this->header_.length_));
01091
01092
01093 this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01094 const size_t message_length = this->header_.length_;
01095
01096 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01097 "After adding element's length, the header_.length_ == [%d].\n",
01098 message_length));
01099
01100
01101
01102
01103
01104
01105
01106
01107
01108
01109
01110
01111
01112
01113
01114 if (next_fragment || (this->elems_.size() >= this->max_samples_)
01115 || (this->max_header_size_ + message_length > this->optimum_size_)
01116 || exclusive) {
01117 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01118 "Now the current packet looks full - send it (directly).\n"));
01119
01120 this->direct_send(relink);
01121
01122 if (next_fragment && this->mode_ != MODE_DIRECT) {
01123 if (this->mode_ == MODE_QUEUE) {
01124 this->queue_.put(next_fragment);
01125 this->synch_->work_available();
01126
01127 } else {
01128 next_fragment->data_dropped(true );
01129 }
01130 } else if (mode_ == MODE_QUEUE) {
01131
01132 this->synch_->work_available();
01133 }
01134
01135 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01136 "Back from the direct_send() attempt.\n"));
01137
01138 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01139 "And we %C as a result of the direct_send() call.\n",
01140 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01141 : "stayed in MODE_DIRECT")));
01142
01143 } else {
01144 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01145 "Packet not sent. Send conditions weren't satisfied.\n"));
01146 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01147 "elems_.size(): %d, max_samples_: %d\n",
01148 int(this->elems_.size()), int(this->max_samples_)));
01149 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01150 "header_size_: %d, optimum_size_: %d\n",
01151 int(this->max_header_size_ + message_length),
01152 int(this->optimum_size_)));
01153 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01154 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01155
01156 if (this->mode_ == MODE_QUEUE) {
01157 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01158 "We flipped into MODE_QUEUE.\n"));
01159
01160 } else {
01161 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01162 "We stayed in MODE_DIRECT.\n"));
01163 }
01164 }
01165 }
01166 }
01167 }
01168
01169 send_delayed_notifications();
01170 }
01171
01172 void
01173 TransportSendStrategy::send_stop(RepoId )
01174 {
01175 DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01176 {
01177 GuardType guard(this->lock_);
01178
01179 if (this->link_released_)
01180 return;
01181
01182 if (this->start_counter_ == 0) {
01183
01184 VDBG_LVL((LM_ERROR,
01185 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01186 return;
01187 }
01188
01189 --this->start_counter_;
01190
01191 if (this->start_counter_ != 0) {
01192
01193
01194
01195 return;
01196 }
01197
01198 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01199 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01200 "TransportSendStrategy::send_stop: dont try to send current packet "
01201 "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01202 return;
01203 }
01204
01205 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01206 "This is an 'important' send_stop() event since our "
01207 "start_counter_ is 0.\n"));
01208
01209
01210
01211
01212
01213
01214
01215
01216
01217
01218
01219
01220
01221 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01222 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01223 "But since we are in %C, we don't have to do "
01224 "anything more in this important send_stop().\n",
01225 mode_as_str(this->mode_)));
01226
01227 return;
01228 }
01229
01230 size_t header_length = this->header_.length_;
01231 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01232 "We are in MODE_DIRECT in an important send_stop() - "
01233 "header_.length_ == [%d].\n", header_length));
01234
01235
01236
01237 if ((header_length > 0) &&
01238
01239 (this->elems_.size() > 0)) {
01240 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01241 "There is something in the current packet - attempt to send "
01242 "it (directly) now.\n"));
01243
01244 this->direct_send(true);
01245 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01246 "Back from the attempt to send leftover packet directly.\n"));
01247
01248 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01249 "But we %C as a result.\n",
01250 ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01251 "stayed in MODE_DIRECT" )));
01252 if (this->mode_ == MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
01253 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01254 "Notify Synch thread of work availability\n"));
01255 this->synch_->work_available();
01256 }
01257 }
01258 }
01259
01260 send_delayed_notifications();
01261 }
01262
01263 void
01264 TransportSendStrategy::remove_all_msgs(RepoId pub_id)
01265 {
01266 DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01267
01268 const TransportQueueElement::MatchOnPubId match(pub_id);
01269 send_delayed_notifications(&match);
01270
01271 GuardType guard(this->lock_);
01272
01273 if (this->send_buffer_ != 0) {
01274
01275
01276 this->send_buffer_->retain_all(pub_id);
01277 }
01278
01279 do_remove_sample(pub_id, match, 0);
01280 }
01281
01282 RemoveResult
01283 TransportSendStrategy::remove_sample(const DataSampleElement* sample, void* context)
01284 {
01285 DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01286
01287 VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5);
01288
01289
01290
01291
01292
01293
01294
01295
01296
01297
01298
01299 const char* const payload = sample->get_sample()->cont()->rd_ptr();
01300 RepoId pub_id = sample->get_pub_id();
01301 const TransportQueueElement::MatchOnDataPayload modp(payload);
01302 if (send_delayed_notifications(&modp)) {
01303 return REMOVE_RELEASED;
01304 }
01305
01306 GuardType guard(this->lock_);
01307 return do_remove_sample(pub_id, modp, context);
01308 }
01309
01310 RemoveResult
01311 TransportSendStrategy::do_remove_sample(const RepoId&,
01312 const TransportQueueElement::MatchCriteria& criteria,
01313 void*)
01314 {
01315 DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01316
01317
01318
01319
01320
01321
01322 if ((this->mode_ == MODE_DIRECT)
01323 || ((this->pkt_chain_ == 0) && (this->queue_.size() == 0))) {
01324
01325
01326
01327 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01328 "The mode is MODE_DIRECT, or the queue is empty and no "
01329 "transport packet is in progress.\n"));
01330
01331 QueueRemoveVisitor simple_rem_vis(criteria);
01332 this->elems_.accept_remove_visitor(simple_rem_vis);
01333
01334 const RemoveResult status = simple_rem_vis.status();
01335
01336 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01337 this->header_.length_ -= simple_rem_vis.removed_bytes();
01338
01339 } else if (status == REMOVE_NOT_FOUND) {
01340 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01341 "Failed to find the sample to remove.\n"));
01342 }
01343
01344 return status;
01345 }
01346
01347 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01348 "Visit the queue_ with the RemoveElementVisitor.\n"));
01349
01350 QueueRemoveVisitor simple_rem_vis(criteria);
01351 this->queue_.accept_remove_visitor(simple_rem_vis);
01352
01353 RemoveResult status = simple_rem_vis.status();
01354
01355 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01356 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01357 "The sample was removed from the queue_.\n"));
01358
01359
01360
01361 return status;
01362 }
01363
01364 if (status == REMOVE_ERROR) {
01365 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01366 "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01367
01368
01369
01370 return status;
01371 }
01372
01373 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01374 "The RemoveElementVisitor did not find the sample in queue_.\n"));
01375
01376
01377
01378
01379
01380
01381
01382
01383
01384 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01385 "Visit our elems_ with the PacketRemoveVisitor.\n"));
01386
01387 PacketRemoveVisitor pac_rem_vis(criteria,
01388 this->pkt_chain_,
01389 this->header_block_,
01390 this->replaced_element_mb_allocator_,
01391 this->replaced_element_db_allocator_);
01392
01393 this->elems_.accept_replace_visitor(pac_rem_vis);
01394
01395 status = pac_rem_vis.status();
01396
01397 if (status == REMOVE_ERROR) {
01398 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01399 "The PacketRemoveVisitor encountered a fatal error.\n"));
01400
01401 } else if (status == REMOVE_NOT_FOUND) {
01402 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01403 "The PacketRemoveVisitor didn't find the sample.\n"));
01404
01405 } else {
01406 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01407 "The PacketRemoveVisitor found the sample and removed it.\n"));
01408 }
01409
01410 return status;
01411 }
01412
01413 void
01414 TransportSendStrategy::direct_send(bool relink)
01415 {
01416 DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01417
01418 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01419 "Prepare the current packet for a direct send attempt.\n"));
01420
01421
01422 this->prepare_packet();
01423
01424 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01425 "Now attempt to send the packet.\n"));
01426
01427
01428
01429 while (true) {
01430
01431 const SendPacketOutcome outcome = this->send_packet();
01432
01433 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01434 "The outcome of the send_packet() was %d.\n", outcome));
01435
01436 if ((outcome == OUTCOME_BACKPRESSURE) ||
01437 (outcome == OUTCOME_PARTIAL_SEND)) {
01438 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01439 "The outcome of the send_packet() was either "
01440 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01441
01442 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01443 "Flip into the MODE_QUEUE mode_.\n"), 5);
01444
01445
01446 this->mode_ = MODE_QUEUE;
01447
01448 } else if ((outcome == OUTCOME_PEER_LOST) ||
01449 (outcome == OUTCOME_SEND_ERROR)) {
01450 if (outcome == OUTCOME_SEND_ERROR) {
01451 ACE_ERROR((LM_WARNING,
01452 ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01453 ACE_TEXT("send buffer management: %p.\n"),
01454 ACE_TEXT("send_bytes")));
01455
01456 if (Transport_debug_level > 0) {
01457 this->transport_.config().dump();
01458 }
01459 } else {
01460 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01461 "The outcome of the send_packet() was "
01462 "OUTCOME_PEER_LOST.\n"));
01463 }
01464
01465 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01466 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01467
01468 if (this->mode_ != MODE_SUSPEND) {
01469 this->mode_before_suspend_ = this->mode_;
01470 this->mode_ = MODE_SUSPEND;
01471 }
01472
01473 if (relink) {
01474 bool do_suspend = false;
01475 this->relink(do_suspend);
01476
01477 if (this->mode_ == MODE_SUSPEND) {
01478 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01479 "The reconnect has not done yet and we are "
01480 "still in MODE_SUSPEND.\n"), 5);
01481
01482 } else if (this->mode_ == MODE_TERMINATED) {
01483 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01484 "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01485
01486 } else {
01487 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01488 "Try send the packet again since the connection "
01489 "is re-established.\n"), 5);
01490
01491
01492 continue;
01493 }
01494 }
01495
01496 } else {
01497 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01498 "The outcome of the send_packet() must have been "
01499 "OUTCOME_COMPLETE_SEND.\n"));
01500 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01501 "So, we will just stay in MODE_DIRECT.\n"));
01502 }
01503
01504 break;
01505 }
01506
01507
01508 }
01509
01510 void
01511 TransportSendStrategy::get_packet_elems_from_queue()
01512 {
01513 DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01514
01515 for (TransportQueueElement* element = this->queue_.peek(); element != 0;
01516 element = this->queue_.peek()) {
01517
01518
01519 size_t element_length = element->msg()->total_length();
01520
01521
01522 const bool exclusive_packet = element->requires_exclusive_packet();
01523
01524 const size_t avail = this->space_available();
01525
01526 bool frag = false;
01527 if (element_length > avail) {
01528
01529 if (this->max_message_size()) {
01530 this->header_.first_fragment_ = !element->is_fragment();
01531 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0);
01532 ElementPair ep = element->fragment(avail);
01533 element = ep.first;
01534 element_length = element->msg()->total_length();
01535 this->queue_.replace_head(ep.second);
01536 frag = true;
01537 } else {
01538 break;
01539 }
01540 }
01541
01542
01543
01544
01545 if ((exclusive_packet && this->elems_.size() == 0)
01546 || !exclusive_packet) {
01547
01548
01549
01550 this->elems_.put(frag ? element : this->queue_.get());
01551 if (this->header_.length_ == 0) {
01552 this->header_.last_fragment_ = !frag && element->is_fragment();
01553 }
01554 this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01555 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0);
01556 }
01557
01558
01559
01560
01561
01562
01563
01564
01565 if (exclusive_packet || frag
01566
01567
01568 || this->elems_.size() == this->max_samples_
01569
01570
01571 || this->header_.length_ >= this->optimum_size_) {
01572 break;
01573 }
01574 }
01575 }
01576
01577 void
01578 TransportSendStrategy::prepare_header()
01579 {
01580 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01581
01582
01583 this->header_.sequence_ = ++this->header_sequence_;
01584
01585
01586
01587 this->prepare_header_i();
01588 }
01589
01590 void
01591 TransportSendStrategy::prepare_header_i()
01592 {
01593 DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01594
01595
01596 }
01597
01598 void
01599 TransportSendStrategy::prepare_packet()
01600 {
01601 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01602
01603
01604 this->prepare_header();
01605
01606 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01607 "Marshal the packet header.\n"));
01608
01609 if (this->header_block_ != 0) {
01610 this->header_block_->release();
01611 }
01612
01613 ACE_NEW_MALLOC(this->header_block_,
01614 static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01615 ACE_Message_Block(this->max_header_size_,
01616 ACE_Message_Block::MB_DATA,
01617 0,
01618 0,
01619 0,
01620 0,
01621 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01622 ACE_Time_Value::zero,
01623 ACE_Time_Value::max_time,
01624 this->header_db_allocator_.get(),
01625 this->header_mb_allocator_.get()));
01626
01627 marshal_transport_header(this->header_block_);
01628
01629 this->pkt_chain_ = this->header_block_->duplicate();
01630
01631 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01632 "Use a BuildChainVisitor to visit the packet elems_.\n"));
01633
01634
01635
01636
01637 BuildChainVisitor visitor;
01638 this->elems_.accept_visitor(visitor);
01639
01640 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01641 "Attach the visitor's chain of blocks to the lone (packet "
01642 "header) block currently in the pkt_chain_.\n"));
01643
01644
01645 this->pkt_chain_->cont(visitor.chain());
01646
01647 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01648 "Increment header sequence for next packet.\n"));
01649
01650
01651
01652 this->prepare_packet_i();
01653
01654 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01655 "Set the header_complete_ flag to false.\n"));
01656
01657
01658
01659
01660 this->header_complete_ = false;
01661 }
01662
01663 bool
01664 TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
01665 {
01666 return *mb << this->header_;
01667 }
01668
01669 void
01670 TransportSendStrategy::prepare_packet_i()
01671 {
01672 DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01673
01674
01675 }
01676
01677 void
01678 TransportSendStrategy::set_graceful_disconnecting(bool flag)
01679 {
01680 this->graceful_disconnecting_ = flag;
01681 }
01682
01683 ssize_t
01684 TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
01685 {
01686 if (Transport_debug_level > 9) {
01687 ACE_DEBUG((LM_DEBUG,
01688 ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01689 ACE_TEXT("sending data at 0x%x.\n"),
01690 id(), packet));
01691 }
01692 DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01693
01694 #if defined(OPENDDS_SECURITY)
01695
01696
01697 Message_Block_Ptr substitute(pre_send_packet(packet));
01698 #endif
01699
01700 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01701 "Populate the iovec array using the packet.\n"), 5);
01702
01703 iovec iov[MAX_SEND_BLOCKS];
01704
01705 #if defined(OPENDDS_SECURITY)
01706 const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
01707 #else
01708 const int num_blocks = mb_to_iov(*packet, iov);
01709 #endif
01710
01711 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01712 "There are [%d] number of entries in the iovec array.\n",
01713 num_blocks), 5);
01714
01715 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01716 "Attempt to send_bytes() now.\n"), 5);
01717
01718 const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01719
01720 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01721 "The send_bytes() said that num_bytes_sent == [%d].\n",
01722 num_bytes_sent), 5);
01723
01724 #if defined(OPENDDS_SECURITY)
01725 if (substitute && num_bytes_sent > 0) {
01726
01727
01728
01729
01730 return packet->total_length();
01731 }
01732 #endif
01733
01734 return num_bytes_sent;
01735 }
01736
01737 TransportSendStrategy::SendPacketOutcome
01738 TransportSendStrategy::send_packet()
01739 {
01740 DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01741
01742 int bp_flag = 0;
01743 const ssize_t num_bytes_sent =
01744 this->do_send_packet(this->pkt_chain_, bp_flag);
01745
01746 if (num_bytes_sent == 0) {
01747 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01748 "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01749
01750 return OUTCOME_PEER_LOST;
01751 }
01752
01753 if (num_bytes_sent < 0) {
01754 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01755 "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01756
01757
01758 if (bp_flag == 1) {
01759 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01760 "Since backpressure flag is true, return "
01761 "OUTCOME_BACKPRESSURE.\n"), 5);
01762
01763 return OUTCOME_BACKPRESSURE;
01764 }
01765
01766 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01767 "Since backpressure flag is false, return "
01768 "OUTCOME_SEND_ERROR.\n"), 5);
01769
01770
01771
01772
01773
01774
01775
01776 return OUTCOME_SEND_ERROR;
01777 }
01778
01779 if (this->send_buffer_ != 0) {
01780
01781
01782 this->send_buffer_->insert(this->header_.sequence_,
01783 &this->elems_, this->pkt_chain_);
01784 }
01785
01786 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
01787 "Since num_bytes_sent > 0, adjust the packet to account for "
01788 "the bytes that did get sent.\n"),5);
01789
01790
01791
01792 const int result =
01793 this->adjust_packet_after_send(num_bytes_sent);
01794
01795 if (result == 0) {
01796 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01797 "The adjustment logic says that the complete packet was "
01798 "sent. Return OUTCOME_COMPLETE_SEND.\n"));
01799 return OUTCOME_COMPLETE_SEND;
01800 }
01801
01802 VDBG((LM_DEBUG, "(%P|%t) DBG: "
01803 "The adjustment logic says that only a part of the packet was "
01804 "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01805
01806 return OUTCOME_PARTIAL_SEND;
01807 }
01808
01809 ssize_t
01810 TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
01811 {
01812 int val = 0;
01813 ACE_HANDLE handle = this->get_handle();
01814
01815 if (handle == ACE_INVALID_HANDLE)
01816 return -1;
01817
01818 ACE::record_and_set_non_blocking_mode(handle, val);
01819
01820
01821 bp = 0;
01822
01823
01824 errno = 0;
01825
01826 ssize_t result = this->send_bytes_i(iov, n);
01827
01828 if (result == -1) {
01829 if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01830 VDBG((LM_DEBUG,"(%P|%t) DBG: "
01831 "Backpressure encountered.\n"));
01832
01833 bp = 1;
01834
01835 } else {
01836 VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01837 ACE_TEXT("sendv"), n),1);
01838
01839
01840
01841 for (int ii = 0; ii < n; ii++) {
01842 ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01843 ii, iov[ii].iov_len, iov[ii].iov_base));
01844 }
01845 }
01846 }
01847
01848 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
01849 "The sendv() returned [%d].\n", result), 5);
01850
01851 ACE::restore_non_blocking_mode(handle, val);
01852
01853 return result;
01854 }
01855
01856 void
01857 TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
01858 {
01859 if (Transport_debug_level) {
01860 size_t size = this->delayed_delivered_notification_queue_.size();
01861 if ((size > 0) && (size % this->max_samples_ == 0)) {
01862 ACE_DEBUG((LM_DEBUG,
01863 "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01864 size));
01865 }
01866 }
01867
01868 this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01869 }
01870
01871 void
01872 OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request(TransportQueueElement* element)
01873 {
01874 GuardType guard(this->lock_);
01875 element->data_delivered();
01876 }
01877
01878 size_t
01879 TransportSendStrategy::space_available() const
01880 {
01881 const size_t used = this->max_header_size_ + this->header_.length_,
01882 max_msg = this->max_message_size();
01883 if (max_msg) {
01884 return std::min(this->max_size_ - used, max_msg - used);
01885 }
01886 return this->max_size_ - used;
01887 }
01888
01889 int
01890 TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
01891 {
01892 int num_blocks = 0;
01893 #ifdef _MSC_VER
01894 #pragma warning(push)
01895
01896
01897 #pragma warning(disable : 4267)
01898 #endif
01899 for (const ACE_Message_Block* block = &msg;
01900 block && num_blocks < MAX_SEND_BLOCKS;
01901 block = block->cont()) {
01902 iov[num_blocks].iov_len = block->length();
01903 iov[num_blocks++].iov_base = block->rd_ptr();
01904 }
01905 #ifdef _MSC_VER
01906 #pragma warning(pop)
01907 #endif
01908 return num_blocks;
01909 }
01910
01911
01912 }
01913 }
01914
01915 OPENDDS_END_VERSIONED_NAMESPACE_DECL