TransportSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportSendStrategy.h"
00010 #include "RemoveAllVisitor.h"
00011 #include "TransportInst.h"
00012 #include "ThreadSynchStrategy.h"
00013 #include "ThreadSynchResource.h"
00014 #include "TransportQueueElement.h"
00015 #include "TransportSendElement.h"
00016 #include "TransportSendBuffer.h"
00017 #include "BuildChainVisitor.h"
00018 #include "QueueRemoveVisitor.h"
00019 #include "PacketRemoveVisitor.h"
00020 #include "TransportDefs.h"
00021 #include "DirectPriorityMapper.h"
00022 #include "dds/DCPS/DataSampleHeader.h"
00023 #include "dds/DCPS/DataSampleElement.h"
00024 #include "dds/DCPS/Service_Participant.h"
00025 #include "EntryExit.h"
00026 
00027 #include "ace/Reverse_Lock_T.h"
00028 
00029 #if !defined (__ACE_INLINE__)
00030 #include "TransportSendStrategy.inl"
00031 #endif /* __ACE_INLINE__ */
00032 
00033 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00034 
00035 namespace OpenDDS {
00036 namespace DCPS {
00037 
00038 //TBD: The number of chunks of the replace element allocator
00039 //     is hard coded for now. This will be configurable when
00040 //     we implement the dds configurations. This value should
00041 //     be the number of marshalled DataSampleHeader that a
00042 //     packet could contain.
00043 #define NUM_REPLACED_ELEMENT_CHUNKS 20
00044 
00045 namespace {
00046   /// Arbitrary small constant that represents the minimum
00047   /// amount of payload data we'll have in one fragment.
00048   /// In this case "payload data" includes the content-filtering
00049   /// GUID sequence, so this is chosen to be 4 + (16 * N).
00050   static const size_t MIN_FRAG = 68;
00051 }
00052 
00053 // I think 2 chunks for the header message block is enough
00054 // - one for the original copy and one for duplicate which
00055 // occurs every packet and is released after packet is sent.
00056 // The data block only needs 1 chunk since the duplicate()
00057 // just increases the ref count.
00058 TransportSendStrategy::TransportSendStrategy(
00059   std::size_t id,
00060   TransportImpl& transport,
00061   ThreadSynchResource* synch_resource,
00062   Priority priority,
00063   const ThreadSynchStrategy_rch& thread_sync_strategy)
00064   : ThreadSynchWorker(id),
00065     max_samples_(transport.config().max_samples_per_packet_),
00066     optimum_size_(transport.config().optimum_packet_size_),
00067     max_size_(transport.config().max_packet_size_),
00068     max_header_size_(0),
00069     header_block_(0),
00070     pkt_chain_(0),
00071     header_complete_(false),
00072     start_counter_(0),
00073     mode_(MODE_DIRECT),
00074     mode_before_suspend_(MODE_NOT_SET),
00075     lock_(),
00076     replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00077     replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00078     transport_(transport),
00079     graceful_disconnecting_(false),
00080     link_released_(true),
00081     send_buffer_(0)
00082 {
00083   DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00084 
00085   // Create a ThreadSynch object just for us.
00086   DirectPriorityMapper mapper(priority);
00087   this->synch_.reset(thread_sync_strategy->create_synch_object(
00088                    synch_resource,
00089 #ifdef ACE_WIN32
00090                    ACE_DEFAULT_THREAD_PRIORITY,
00091 #else
00092                    mapper.thread_priority(),
00093 #endif
00094                    TheServiceParticipant->scheduler()));
00095 
00096   // We cache this value in data member since it doesn't change, and we
00097   // don't want to keep asking for it over and over.
00098   this->max_header_size_ = TransportHeader::max_marshaled_size();
00099 
00100   delayed_delivered_notification_queue_.reserve(this->max_samples_);
00101 }
00102 
00103 TransportSendStrategy::~TransportSendStrategy()
00104 {
00105   DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00106 
00107 
00108   this->delayed_delivered_notification_queue_.clear();
00109 }
00110 
00111 void
00112 TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
00113 {
00114   this->send_buffer_ = send_buffer;
00115 
00116   if (this->send_buffer_ != 0) {
00117     this->send_buffer_->bind(this);
00118   }
00119 }
00120 
00121 ThreadSynchWorker::WorkOutcome
00122 TransportSendStrategy::perform_work()
00123 {
00124   DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00125 
00126   SendPacketOutcome outcome;
00127   bool no_more_work = false;
00128 
00129   { // scope for the guard(this->lock_);
00130     GuardType guard(this->lock_);
00131 
00132     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00133 
00134     if (this->mode_ == MODE_TERMINATED) {
00135       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00136                 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00137                 "we lost connection and could not reconnect, just return "
00138                 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00139       return WORK_OUTCOME_BROKEN_RESOURCE;
00140     }
00141 
00142     // The perform_work() is called by our synch_ object using
00143     // a thread designated to call this method when it thinks
00144     // we need to be called in order to "service" the queue_ and/or
00145     // deal with a partially sent current packet.
00146     //
00147     // We will return a 0 if we don't see a need to have our perform_work()
00148     // called again, and we will return a 1 if we do see the need to have our
00149     // perform_work() method called again.
00150 
00151     // First, make sure that the mode_ indicates that we are, indeed, in
00152     // the MODE_QUEUE mode.  If we are not in MODE_QUEUE mode (meaning we are
00153     // in MODE_DIRECT), then it means we didn't need to have this perform_work()
00154     // method called - in this case, do nothing other than return
00155     // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
00156     // see a need for it to call our perform_work() again (at least not
00157     // right now).
00158     if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00159       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00160                 "Entered perform_work() and mode_ is %C - just return "
00161                 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00162       return WORK_OUTCOME_NO_MORE_TO_DO;
00163     }
00164 
00165     // Check the "state" of the current packet.  We will either find that the
00166     // current packet is in a state of being "partially sent", or we will find
00167     // it in a state of being "empty".  When the current packet is "empty", it
00168     // means that it is time to build up the current packet using elements
00169     // extracted from the queue_, and then we will attempt to send the
00170     // packet.  When we find the current packet in the "partially sent" state,
00171     // we will not touch the queue_ - we will just try to send the unsent
00172     // bytes in the current (partially sent) packet.
00173     const size_t header_length = this->header_.length_;
00174 
00175     if (header_length == 0) {
00176       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00177                 "The current packet doesn't have any unsent bytes - we "
00178                 "need to 'populate' the current packet with elems from "
00179                 "the queue.\n"), 5);
00180 
00181       // The current packet is "empty".  Build up the current packet using
00182       // elements from the queue_, and prepare the current packet to be sent.
00183 
00184       // Before we build the packet from the queue_, let's make sure that
00185       // there is actually something on the queue_ to build from.
00186       if (this->queue_.size() == 0) {
00187         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00188                   "But the queue is empty.  We have cleared the "
00189                   "backpressure situation.\n"),5);
00190         // We are here because the queue_ is empty, and there isn't
00191         // any "partial packet" bytes left to send.  We have overcome
00192         // the backpressure situation and don't have anything to do
00193         // right now.
00194 
00195         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00196                   "Flip mode to MODE_DIRECT, and return "
00197                   "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00198 
00199         // Flip the mode back to MODE_DIRECT.
00200         this->mode_ = MODE_DIRECT;
00201 
00202         // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
00203         // perform_work() doesn't need to be called again (at this time).
00204         return WORK_OUTCOME_NO_MORE_TO_DO;
00205       }
00206 
00207       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00208                 "There is at least one elem in the queue - get the packet "
00209                 "elems from the queue.\n"), 5);
00210 
00211       // There is stuff in the queue_ if we get to this point in the logic.
00212       // Build-up the current packet using element(s) from the queue_.
00213       this->get_packet_elems_from_queue();
00214 
00215       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00216                 "Prepare the packet from the packet elems_.\n"), 5);
00217 
00218       // Now we can prepare the new packet to be sent.
00219       this->prepare_packet();
00220 
00221       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00222                 "Packet has been prepared from packet elems_.\n"), 5);
00223 
00224     } else {
00225       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00226                 "We have a current packet that still has unsent bytes.\n"), 5);
00227     }
00228 
00229     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00230               "Attempt to send the current packet.\n"), 5);
00231 
00232     // Now we can attempt to send the current packet - whether it is
00233     // a "partially sent" packet or one that we just built-up using elements
00234     // from the queue_ (and subsequently prepared for sending) - it doesn't
00235     // matter.  Just attempt to send as many of the "unsent" bytes in the
00236     // packet as possible.
00237     outcome = this->send_packet();
00238 
00239     // If we sent the whole packet (eg, partial_send is false), and the queue_
00240     // is now empty, then we've cleared the backpressure situation.
00241     if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_.size() == 0)) {
00242       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00243                 "Flip the mode to MODE_DIRECT, and then return "
00244                 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00245 
00246       // Revert back to MODE_DIRECT mode.
00247       this->mode_ = MODE_DIRECT;
00248       no_more_work = true;
00249     }
00250   } // End of scope for guard(this->lock_);
00251 
00252   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00253             "The outcome of the send_packet() was %d.\n", outcome), 5);
00254 
00255   send_delayed_notifications();
00256 
00257   // If we sent the whole packet (eg, partial_send is false), and the queue_
00258   // is now empty, then we've cleared the backpressure situation.
00259   if (no_more_work) {
00260     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00261               "We sent the whole packet, and there is nothing left on "
00262               "the queue now.\n"), 5);
00263 
00264     // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00265     // don't desire another call to this perform_work() method.
00266     return WORK_OUTCOME_NO_MORE_TO_DO;
00267   }
00268 
00269   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00270             "We still have unsent bytes in the current packet AND/OR there "
00271             "are still elements in the queue.\n"), 5);
00272 
00273   if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00274     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00275               "We lost our connection, or had some fatal connection "
00276               "error.  Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00277 
00278     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00279               "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00280 
00281     bool do_suspend = true;
00282     this->relink(do_suspend);
00283 
00284     if (this->mode_ == MODE_SUSPEND) {
00285       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00286                 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00287                 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00288       // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00289       // don't desire another call to this perform_work() method.
00290       return WORK_OUTCOME_NO_MORE_TO_DO;
00291 
00292     } else if (this->mode_ == MODE_TERMINATED) {
00293       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00294                 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00295       return WORK_OUTCOME_BROKEN_RESOURCE;
00296 
00297     } else {
00298       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00299                 "Reconnect succeeded, Notify synch thread of work "
00300                 "availability.\n"), 5);
00301       // If the datalink is re-established then notify the synch
00302       // thread to perform work.  We do not hold the object lock at
00303       // this point.
00304       this->synch_->work_available();
00305     }
00306   }
00307 
00308   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00309             "We still have an 'unbroken' connection.\n"), 5);
00310 
00311   if (outcome == OUTCOME_BACKPRESSURE) {
00312     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00313               "We experienced backpressure on our attempt to send the "
00314               "packet.  Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00315     // We have a "clogged resource".
00316     return WORK_OUTCOME_CLOGGED_RESOURCE;
00317   }
00318 
00319   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00320             "We may have sent the whole current packet, but still have "
00321             "elements on the queue.\n"), 5);
00322   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00323             "Or, we may have only partially sent the current packet.\n"), 5);
00324 
00325   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00326             "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00327 
00328   // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
00329   // in the queue_ to be sent.  *OR* we have have had an OUTCOME_PARTIAL_SEND,
00330   // which equates to the same thing - we still have work to do.
00331 
00332   // We are still in MODE_QUEUE mode, thus there is still work to be
00333   // done to service the queue_ and/or a partially sent current packet.
00334   // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
00335   // want it to call this perform_work() method.
00336   return WORK_OUTCOME_MORE_TO_DO;
00337 }
00338 
00339 // Now we need to "peel off" those message blocks that were fully
00340 // sent, adjust the first message block with an unsent byte to have
00341 // its rd_ptr() pointing to that first unsent byte, and set the
00342 // pkt_chain_ to that first message block with an unsent byte.
00343 // As we "peel off" fully sent message blocks, we need to also deal with
00344 // fully sent elements by removing them from the elems_ and
00345 // calling their data_delivered() method.  In addition, as we peel off
00346 // the message blocks that are fully sent, we need to untie them from
00347 // the chain and release them.
00348 // And finally, don't forget to adjust the header_.length_ to
00349 // account for the num_bytes_sent (beware that some of the num_bytes_sent
00350 // may be packet header bytes and shouldn't affect the header_.length_
00351 // which doesn't include the packet header bytes.
00352 int
00353 TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
00354 {
00355   DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00356 
00357   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00358         "Adjusting the current packet because %d bytes of the packet "
00359         "have been sent.\n", num_bytes_sent));
00360 
00361   ssize_t num_bytes_left = num_bytes_sent;
00362   ssize_t num_non_header_bytes_sent = 0;
00363 
00364   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00365         "Set num_bytes_left to %d.\n", num_bytes_left));
00366   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00367         "Set num_non_header_bytes_sent to %d.\n",
00368         num_non_header_bytes_sent));
00369 
00370   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00371         "Peek at the element at the front of the packet elems_.\n"));
00372 
00373   // This is the element currently at the front of elems_.
00374   TransportQueueElement* element = this->elems_.peek();
00375 
00376   if(!element){
00377     ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00378   } else {
00379     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00380           "Use the element's msg() to find the last block in "
00381           "the msg() chain.\n"));
00382 
00383     // Get a pointer to the last message block in the element.
00384     const ACE_Message_Block* elem_tail_block = element->msg();
00385 
00386     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00387           "Start with tail block == element->msg().\n"));
00388 
00389     while (elem_tail_block->cont() != 0) {
00390       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00391             "Set tail block to its cont() block (next in chain).\n"));
00392       elem_tail_block = elem_tail_block->cont();
00393     }
00394 
00395     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00396           "Tail block now set (because tail block's cont() is 0).\n"));
00397 
00398     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00399           "Start the 'while (num_bytes_left > 0)' loop.\n"));
00400 
00401     while (num_bytes_left > 0) {
00402       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00403             "At top of 'num bytes left' loop.  num_bytes_left == [%d].\n",
00404             num_bytes_left));
00405 
00406       const int block_length = static_cast<int>(this->pkt_chain_->length());
00407 
00408       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00409             "Length of block at front of pkt_chain_ is [%d].\n",
00410             block_length));
00411 
00412       if (block_length <= num_bytes_left) {
00413         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00414               "The whole block at the front of pkt_chain_ was sent.\n"));
00415 
00416         // The entire message block at the front of the chain has been sent.
00417         // Detach the head message block from the chain and adjust
00418         // the pkt_chain_ to point to the next block (if any) in
00419         // the chain.
00420         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00421               "Extract the fully sent block from the pkt_chain_.\n"));
00422 
00423         ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00424 
00425         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00426               "Set pkt_chain_ to pkt_chain_->cont().\n"));
00427 
00428         this->pkt_chain_ = this->pkt_chain_->cont();
00429 
00430         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00431               "Set the fully sent block's cont() to 0.\n"));
00432 
00433         fully_sent_block->cont(0);
00434 
00435         // Update the num_bytes_left to indicate that we have
00436         // processed the entire length of the block.
00437         num_bytes_left -= block_length;
00438 
00439         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00440               "Updated num_bytes_left to account for fully sent "
00441               "block (block_length == [%d]).\n", block_length));
00442         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00443               "Now, num_bytes_left == [%d].\n", num_bytes_left));
00444 
00445         if (!this->header_complete_) {
00446           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00447                 "Since the header_complete_ flag is false, it means "
00448                 "that the packet header block was still in the "
00449                 "pkt_chain_.\n"));
00450 
00451           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00452                 "Not anymore...  Set the header_complete_ flag "
00453                 "to true.\n"));
00454 
00455           // That was the packet header block.  And now we know that it
00456           // has been completely sent.
00457           this->header_complete_ = true;
00458 
00459           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00460                 "Release the fully sent block.\n"));
00461 
00462           // Release the fully_sent_block
00463           fully_sent_block->release();
00464 
00465         } else {
00466           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00467                 "Since the header_complete_ flag is true, it means "
00468                 "that the packet header block was not in the "
00469                 "pkt_chain_.\n"));
00470           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00471                 "So, the fully sent block was part of an element.\n"));
00472 
00473           // That wasn't the packet header block.  It was from the
00474           // element currently at the front of the elems_
00475           // collection.  If it was the last block from the
00476           // element, then we need to extract the element from the
00477           // elems_ collection and invoke data_delivered() on it.
00478           num_non_header_bytes_sent += block_length;
00479 
00480           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00481                 "Updated num_non_header_bytes_sent to account for "
00482                 "fully sent block (block_length == [%d]).\n",
00483                 block_length));
00484 
00485           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00486                 "Now, num_non_header_bytes_sent == [%d].\n",
00487                 num_non_header_bytes_sent));
00488 
00489           if (fully_sent_block->base() == elem_tail_block->base()) {
00490             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00491                   "Ok.  The fully sent block was a duplicate of "
00492                   "the tail block of the element that is at the "
00493                   "front of the packet elems_.\n"));
00494 
00495             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00496                   "This means that we have completely sent the "
00497                   "element at the front of the packet elems_.\n"));
00498 
00499             // This means that we have completely sent the element
00500             // that is currently at the front of the elems_ collection.
00501 
00502             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00503                   "We can release the fully sent block now.\n"));
00504 
00505             // Release the fully_sent_block
00506             fully_sent_block->release();
00507 
00508             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00509                   "We can extract the element from the front of "
00510                   "the packet elems_ (we were just peeking).\n"));
00511 
00512             // Extract the element from the elems_ collection
00513             element = this->elems_.get();
00514 
00515             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00516                   "Tell the element that a decision has been made "
00517                   "regarding its fate - data_delivered().\n"));
00518 
00519             // Inform the element that the data has been delivered.
00520             this->add_delayed_notification(element);
00521 
00522             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00523                   "Peek at the next element in the packet "
00524                   "elems_.\n"));
00525 
00526             // Set up for the next element in elems_ by peek()'ing.
00527             element = this->elems_.peek();
00528 
00529             if (element != 0) {
00530               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00531                     "The is an element still in the packet "
00532                     "elems_ (we are peeking at it now).\n"));
00533 
00534               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00535                     "We are going to find the tail block for the "
00536                     "current element (we are peeking at).\n"));
00537 
00538               // There was a "next element".  Determine the
00539               // elem_tail_block for it.
00540               elem_tail_block = element->msg();
00541 
00542               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00543                     "Start w/tail block == element->msg().\n"));
00544 
00545               while (elem_tail_block->cont() != 0) {
00546                 VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00547                       "Set tail block to next in chain.\n"));
00548                 elem_tail_block = elem_tail_block->cont();
00549               }
00550 
00551               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00552                     "Done finding tail block.\n"));
00553             }
00554 
00555           } else {
00556             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00557                   "Ok.  The fully sent block is *not* a "
00558                   "duplicate of the tail block of the element "
00559                   "at the front of the packet elems_.\n"));
00560 
00561             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00562                   "Thus, we have not completely sent the "
00563                   "element yet.\n"));
00564 
00565             // We didn't completely send the element - it has more
00566             // message blocks that haven't been sent (that we know of).
00567 
00568             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00569                   "We can release the fully_sent_block now.\n"));
00570 
00571             // Release the fully_sent_block
00572             fully_sent_block->release();
00573           }
00574         }
00575 
00576       } else {
00577         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00578               "Only part of the block at the front of pkt_chain_ "
00579               "was sent.\n"));
00580 
00581         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00582               "Advance the rd_ptr() of the front block (of pkt_chain_) "
00583               "by the num_bytes_left (%d).\n", num_bytes_left));
00584 
00585         // Only part of the current block was sent.
00586         this->pkt_chain_->rd_ptr(num_bytes_left);
00587 
00588         if (this->header_complete_) {
00589           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00590                 "And since the packet header block has already been "
00591                 "completely sent, add num_bytes_left to the "
00592                 "num_non_header_bytes_sent.\n"));
00593 
00594           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00595                 "Before, num_non_header_bytes_sent == %d.\n",
00596                 num_non_header_bytes_sent));
00597 
00598           // We know that the current block isn't the packet header
00599           // block because the packet header block has already been
00600           // completely sent.  We need to count these bytes in the
00601           // num_non_header_bytes_sent.
00602           num_non_header_bytes_sent += num_bytes_left;
00603 
00604           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00605                 "After, num_non_header_bytes_sent == %d.\n",
00606                 num_non_header_bytes_sent));
00607         }
00608 
00609         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00610               "Set the num_bytes_left to 0 now.\n"));
00611 
00612         num_bytes_left = 0;
00613       }
00614     }
00615   }
00616 
00617   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00618         "The 'num_bytes_left' loop has completed.\n"));
00619 
00620   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00621         "Adjust the header_.length_ to account for the "
00622         "num_non_header_bytes_sent.\n"));
00623   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00624         "Before, header_.length_ == %d.\n",
00625         this->header_.length_));
00626 
00627   // Adjust the packet header_.length_ to indicate how many non header
00628   // bytes are left to send.
00629   this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00630 
00631   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00632         "After, header_.length_ == %d.\n",
00633         this->header_.length_));
00634 
00635   // Returns 0 if the entire packet was sent, and returns 1 otherwise.
00636   int rc = (this->header_.length_ == 0) ? 0 : 1;
00637 
00638   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00639         "Adjustments all done.  Returning [%d].  0 means entire packet "
00640         "has been sent.  1 means otherwise.\n",
00641         rc));
00642 
00643   return rc;
00644 }
00645 
00646 bool
00647 TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
00648 {
00649   DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00650   TransportQueueElement* sample = 0;
00651   SendMode mode = MODE_NOT_SET;
00652 
00653   OPENDDS_VECTOR(TQESendModePair) samples;
00654 
00655   size_t num_delayed_notifications = 0;
00656   bool found_element = false;
00657 
00658   {
00659     GuardType guard(lock_);
00660 
00661     num_delayed_notifications = delayed_delivered_notification_queue_.size();
00662 
00663     if (num_delayed_notifications == 0) {
00664       return false;
00665 
00666     } else if (num_delayed_notifications == 1) {
00667       // Optimization for the most common case (doesn't need vectors)
00668 
00669       if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00670         found_element = true;
00671         sample = delayed_delivered_notification_queue_[0].first;
00672         mode = delayed_delivered_notification_queue_[0].second;
00673 
00674         delayed_delivered_notification_queue_.clear();
00675       }
00676 
00677     } else {
00678       OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00679       for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00680         sample = iter->first;
00681         mode = iter->second;
00682         if (!match || match->matches(*sample)) {
00683           found_element = true;
00684           samples.push_back(*iter);
00685           iter = delayed_delivered_notification_queue_.erase(iter);
00686         } else {
00687           ++iter;
00688         }
00689       }
00690     }
00691   }
00692 
00693   if (!found_element)
00694     return false;
00695 
00696   bool transport_shutdown = this->transport_.is_shut_down();
00697 
00698   if (num_delayed_notifications == 1) {
00699     // optimization for the common case
00700     if (mode == MODE_TERMINATED) {
00701       if (!transport_shutdown || sample->owned_by_transport()) {
00702         sample->data_dropped(true);
00703       }
00704     } else {
00705       if (!transport_shutdown || sample->owned_by_transport()) {
00706         sample->data_delivered();
00707       }
00708     }
00709 
00710   } else {
00711     for (size_t i = 0; i < samples.size(); ++i) {
00712       if (samples[i].second == MODE_TERMINATED) {
00713         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00714           samples[i].first->data_dropped(true);
00715         }
00716       } else {
00717         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00718           samples[i].first->data_delivered();
00719         }
00720       }
00721     }
00722   }
00723   return true;
00724 }
00725 
00726 /// Remove all samples in the backpressure queue and packet queue.
00727 void
00728 TransportSendStrategy::terminate_send(bool graceful_disconnecting)
00729 {
00730   DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00731 
00732   bool reset_flag = true;
00733 
00734   {
00735     GuardType guard(this->lock_);
00736 
00737     // If the terminate_send call due to a non-graceful disconnection before
00738     // a datalink shutdown then we will not try to send the graceful disconnect
00739     // message.
00740     if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00741         && !this->graceful_disconnecting_) {
00742       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00743             "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00744       reset_flag = false;
00745     }
00746   }
00747 
00748   VDBG((LM_DEBUG, "(%P|%t) DBG:  Now flip to MODE_TERMINATED \n"));
00749 
00750   this->clear(MODE_TERMINATED);
00751 
00752   if (reset_flag) {
00753     GuardType guard(this->lock_);
00754     this->graceful_disconnecting_ = graceful_disconnecting;
00755   }
00756 }
00757 
00758 void
00759 TransportSendStrategy::clear(SendMode mode)
00760 {
00761   DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00762 
00763   send_delayed_notifications();
00764   QueueType elems;
00765   QueueType queue;
00766   {
00767     GuardType guard(this->lock_);
00768 
00769     if (this->header_.length_ > 0) {
00770       // Clear the messages in the pkt_chain_ that is partially sent.
00771       // We just reuse these functions for normal partial send except actual sending.
00772       int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00773       int result = this->adjust_packet_after_send(num_bytes_left);
00774 
00775       if (result == 0) {
00776         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00777               "The adjustment logic says that the packet is cleared.\n"));
00778 
00779       } else {
00780         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00781               "The adjustment returned partial sent.\n"));
00782       }
00783     }
00784 
00785     elems.swap(this->elems_);
00786     queue.swap(this->queue_);
00787 
00788     this->header_.length_ = 0;
00789     this->pkt_chain_ = 0;
00790     this->header_complete_ = false;
00791     this->start_counter_ = 0;
00792     this->mode_ = mode;
00793     this->mode_before_suspend_ = MODE_NOT_SET;
00794   }
00795 
00796   // We need remove the queued elements outside the lock,
00797   // otherwise we have a deadlock situation when remove vistor
00798   // calls the data_droped on each dropped elements.
00799 
00800   // Clear all samples in queue.
00801   RemoveAllVisitor remove_all_visitor;
00802 
00803   elems.accept_remove_visitor(remove_all_visitor);
00804   queue.accept_remove_visitor(remove_all_visitor);
00805 }
00806 
00807 int
00808 TransportSendStrategy::start()
00809 {
00810   DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00811 
00812   {
00813     GuardType guard(this->lock_);
00814 
00815     if (!this->start_i()) {
00816       return -1;
00817     }
00818   }
00819 
00820   size_t header_chunks(1);
00821 
00822   // If a secondary send buffer is bound, sent headers should
00823   // be cached to properly maintain the buffer:
00824   if (this->send_buffer_ != 0) {
00825     header_chunks += this->send_buffer_->capacity();
00826 
00827   } else {
00828     header_chunks += 1;
00829   }
00830 
00831   this->header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
00832   this->header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
00833 
00834   // Since we (the TransportSendStrategy object) are a reference-counted
00835   // object, but the synch_ object doesn't necessarily know this, we need
00836   // to give a "copy" of a reference to ourselves to the synch_ object here.
00837   // We will do the reverse when we unregister ourselves (as a worker) from
00838   // the synch_ object.
00839 
00840   if (this->synch_->register_worker(*this) == -1) {
00841 
00842     ACE_ERROR_RETURN((LM_ERROR,
00843                       "(%P|%t) ERROR: TransportSendStrategy failed to register "
00844                       "as a worker with the ThreadSynch object.\n"),
00845                      -1);
00846   }
00847 
00848   return 0;
00849 }
00850 
00851 void
00852 TransportSendStrategy::stop()
00853 {
00854   DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00855 
00856   if (this->header_block_ != 0) {
00857     this->header_block_->release ();
00858     this->header_block_ = 0;
00859   }
00860 
00861   this->synch_->unregister_worker();
00862 
00863   {
00864     GuardType guard(this->lock_);
00865 
00866     if (this->pkt_chain_ != 0) {
00867       size_t size = this->pkt_chain_->total_length();
00868 
00869       if (size > 0) {
00870         this->pkt_chain_->release();
00871         ACE_DEBUG((LM_WARNING,
00872                    ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00873                    ACE_TEXT("terminating with %d unsent bytes.\n"),
00874                    size));
00875       }
00876     }
00877   }
00878 
00879   {
00880     GuardType guard(this->lock_);
00881 
00882     this->stop_i();
00883   }
00884 
00885   // TBD SOON - What about all of the samples that may still be stuck in
00886   //            our queue_ and/or elems_?
00887 }
00888 
00889 void
00890 TransportSendStrategy::send(TransportQueueElement* element, bool relink)
00891 {
00892   if (Transport_debug_level > 9) {
00893     ACE_DEBUG((LM_DEBUG,
00894                ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00895                ACE_TEXT("sending data at 0x%x.\n"),
00896                id(), element));
00897   }
00898 
00899   DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00900 
00901   {
00902     GuardType guard(this->lock_);
00903 
00904     if (this->link_released_) {
00905       this->add_delayed_notification(element);
00906 
00907     } else {
00908       if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00909         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00910               "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00911               "graceful disconnecting, so discard message.\n"));
00912         element->data_dropped(true);
00913         return;
00914       }
00915 
00916       size_t element_length = element->msg()->total_length();
00917 
00918       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00919             "Send element msg() has total_length() == [%d].\n",
00920             element_length));
00921 
00922       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00923             "this->max_header_size_ == [%d].\n",
00924             this->max_header_size_));
00925 
00926       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00927             "this->max_size_ == [%d].\n",
00928             this->max_size_));
00929 
00930       const size_t max_message_size = this->max_message_size();
00931 
00932       // Really an assert.  We can't accept any element that wouldn't fit into
00933       // a transport packet by itself (ie, it would be the only element in the
00934       // packet).  This max_size_ is the user-configurable maximum, not based
00935       // on the transport's inherent maximum message size.  If max_message_size
00936       // is non-zero, we will fragment so max_size_ doesn't apply per-element.
00937       if (max_message_size == 0 &&
00938           this->max_header_size_ + element_length > this->max_size_) {
00939         ACE_ERROR((LM_ERROR,
00940                    "(%P|%t) ERROR: Element too large (%Q) "
00941                    "- won't fit into packet.\n", ACE_UINT64(element_length)));
00942         return;
00943       }
00944 
00945       // Check the mode_ to see if we simply put the element on the queue.
00946       if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00947         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00948                   "this->mode_ == %C, so queue elem and leave.\n",
00949                   mode_as_str(this->mode_)), 5);
00950 
00951         this->queue_.put(element);
00952 
00953         if (this->mode_ != MODE_SUSPEND) {
00954           this->synch_->work_available();
00955         }
00956 
00957         return;
00958       }
00959 
00960       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00961             "this->mode_ == MODE_DIRECT.\n"));
00962 
00963       // We are in the MODE_DIRECT send mode.  When in this mode, the send()
00964       // calls will "build up" the transport packet to be sent directly when it
00965       // reaches the optimal size, contains the maximum number of samples, etc.
00966 
00967       // We need to check if the current element (the arg passed-in to this
00968       // send() method) should be appended to the transport packet, or if the
00969       // transport packet should be sent (directly) first, dealing with the
00970       // current element afterwards.
00971 
00972       // We will decide to send the packet as it is now, under two circumstances:
00973       //
00974       //    Either:
00975       //
00976       //    (1) The current element won't fit into the current packet since it
00977       //        would violate the max_packet_size_.
00978       //
00979       //    -OR-
00980       //
00981       //    (2) There is at least one element already in the current packet,
00982       //        and the current element says that it must be sent in an
00983       //        exclusive packet (ie, in a packet all by itself).
00984       //
00985       const bool exclusive = element->requires_exclusive_packet();
00986 
00987       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00988             "The element %C require an exclusive packet.\n",
00989             (exclusive ? "DOES" : "does NOT")
00990           ));
00991 
00992       const size_t space_needed =
00993         (max_message_size > 0)
00994         ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG
00995         : /* not fragmenting */ element_length;
00996 
00997       if ((exclusive && (this->elems_.size() != 0))
00998           || (this->space_available() < space_needed)) {
00999 
01000         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01001               "Element won't fit in current packet or requires exclusive"
01002               " - send current packet (directly) now.\n"));
01003 
01004         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01005               "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01006               , this->max_header_size_, this->header_.length_, element_length));
01007 
01008         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01009               "Tot possible length: %d, max_len: %d\n"
01010               , this->max_header_size_ + this->header_.length_ + element_length
01011               , this->max_size_));
01012         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01013               "current elem size: %d\n"
01014               , this->elems_.size()));
01015 
01016         // Send the current packet, and deal with the current element
01017         // afterwards.
01018         // The invocation's relink status should dictate the direct_send's
01019         // relink. We don't want a (relink == false) invocation to end up
01020         // doing a relink. Think of (relink == false) as a non-blocking call.
01021         this->direct_send(relink);
01022 
01023         // Now check to see if we flipped into MODE_QUEUE, which would mean
01024         // that the direct_send() experienced backpressure, and the
01025         // packet was only partially sent.  If this has happened, we deal with
01026         // the current element by placing it on the queue (and then we are done).
01027         //
01028         // Otherwise, if the mode_ is still MODE_DIRECT, we can just
01029         // "drop" through to the next step in the logic where we append the
01030         // current element to the current packet.
01031         if (this->mode_ == MODE_QUEUE) {
01032           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01033                     "We experienced backpressure on that direct send, as "
01034                     "the mode_ is now MODE_QUEUE or MODE_SUSPEND.  "
01035                     "Queue elem and leave.\n"), 5);
01036           this->queue_.put(element);
01037           this->synch_->work_available();
01038 
01039           return;
01040         }
01041       }
01042 
01043       // Loop for sending 'element', in fragments if needed
01044       bool first_pkt = true; // enter the loop 1st time through unconditionally
01045       for (TransportQueueElement* next_fragment = 0;
01046            (first_pkt || next_fragment)
01047            && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01048            // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
01049 
01050         if (next_fragment) {
01051           element = next_fragment;
01052           element_length = next_fragment->msg()->total_length();
01053           this->header_.first_fragment_ = false;
01054         }
01055 
01056         this->header_.last_fragment_ = false;
01057         if (max_message_size) { // fragmentation enabled
01058           const size_t avail = this->space_available();
01059           if (element_length > avail) {
01060             VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting\n"), 0);
01061             ElementPair ep = element->fragment(avail);
01062             element = ep.first;
01063             element_length = element->msg()->total_length();
01064             next_fragment = ep.second;
01065             this->header_.first_fragment_ = first_pkt;
01066           } else if (next_fragment) {
01067             // We are sending the "tail" element of a previous fragment()
01068             // operation, and this element didn't itself require fragmentation
01069             this->header_.last_fragment_ = true;
01070             next_fragment = 0;
01071           }
01072         }
01073         first_pkt = false;
01074 
01075         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01076               "Start the 'append elem' to current packet logic.\n"));
01077 
01078         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01079               "Put element into current packet elems_.\n"));
01080 
01081         // Now that we know the current element should go into the current
01082         // packet, we can just go ahead and "append" the current element to
01083         // the current packet.
01084 
01085         // Add the current element to the collection of packet elements.
01086         this->elems_.put(element);
01087 
01088         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01089               "Before, the header_.length_ == [%d].\n",
01090               this->header_.length_));
01091 
01092         // Adjust the header_.length_ to account for the length of the element.
01093         this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01094         const size_t message_length = this->header_.length_;
01095 
01096         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01097               "After adding element's length, the header_.length_ == [%d].\n",
01098               message_length));
01099 
01100         // The current packet now contains the current element.  We need to
01101         // check to see if the conditions are such that we should go ahead and
01102         // attempt to send the packet "directly" now, or if we can just leave
01103         // and send the current packet later (in another send() call or in a
01104         // send_stop() call).
01105 
01106         // There a few conditions that will cause us to attempt to send the
01107         // packet (directly) right now:
01108         // - Fragmentation was needed
01109         // - The current packet has the maximum number of samples per packet.
01110         // - The current packet's total length exceeds the optimum packet size.
01111         // - The current element (currently part of the packet elems_)
01112         //   requires an exclusive packet.
01113         //
01114         if (next_fragment || (this->elems_.size() >= this->max_samples_)
01115             || (this->max_header_size_ + message_length > this->optimum_size_)
01116             || exclusive) {
01117           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01118                 "Now the current packet looks full - send it (directly).\n"));
01119 
01120           this->direct_send(relink);
01121 
01122           if (next_fragment && this->mode_ != MODE_DIRECT) {
01123             if (this->mode_ == MODE_QUEUE) {
01124               this->queue_.put(next_fragment);
01125               this->synch_->work_available();
01126 
01127             } else {
01128               next_fragment->data_dropped(true /* dropped by transport */);
01129             }
01130           } else if (mode_ == MODE_QUEUE) {
01131             // Background thread handles packets in progress
01132             this->synch_->work_available();
01133           }
01134 
01135           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01136                 "Back from the direct_send() attempt.\n"));
01137 
01138           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01139                 "And we %C as a result of the direct_send() call.\n",
01140                 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01141                                              : "stayed in MODE_DIRECT")));
01142 
01143         } else {
01144           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01145                 "Packet not sent. Send conditions weren't satisfied.\n"));
01146           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01147                 "elems_.size(): %d, max_samples_: %d\n",
01148                 int(this->elems_.size()), int(this->max_samples_)));
01149           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01150                 "header_size_: %d, optimum_size_: %d\n",
01151                 int(this->max_header_size_ + message_length),
01152                 int(this->optimum_size_)));
01153           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01154                 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01155 
01156           if (this->mode_ == MODE_QUEUE) {
01157             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01158                   "We flipped into MODE_QUEUE.\n"));
01159 
01160           } else {
01161             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01162                   "We stayed in MODE_DIRECT.\n"));
01163           }
01164         }
01165       }
01166     }
01167   }
01168 
01169   send_delayed_notifications();
01170 }
01171 
01172 void
01173 TransportSendStrategy::send_stop(RepoId /*repoId*/)
01174 {
01175   DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01176   {
01177     GuardType guard(this->lock_);
01178 
01179     if (this->link_released_)
01180       return;
01181 
01182     if (this->start_counter_ == 0) {
01183       // This is an indication of a logic error.  This is more of an assert.
01184       VDBG_LVL((LM_ERROR,
01185                 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01186       return;
01187     }
01188 
01189     --this->start_counter_;
01190 
01191     if (this->start_counter_ != 0) {
01192       // This wasn't the last send_stop() that we are expecting.  We only
01193       // really honor the first send_start() and the last send_stop().
01194       // We can return without doing anything else in this case.
01195       return;
01196     }
01197 
01198     if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01199       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01200             "TransportSendStrategy::send_stop: dont try to send current packet "
01201             "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01202       return;
01203     }
01204 
01205     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01206           "This is an 'important' send_stop() event since our "
01207           "start_counter_ is 0.\n"));
01208 
01209     // We just caused the start_counter_ to become zero.  This
01210     // means that we aren't expecting another send() or send_stop() at any
01211     // time in the near future (ie, it isn't imminent).
01212 
01213     // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
01214     // anything to do here because samples have already been going to the
01215     // queue.
01216 
01217     // We only need to do something if the mode_ is
01218     // MODE_DIRECT.  It means that we may have some sample(s) in the
01219     // current packet that have never been sent.  This is our
01220     // opportunity to send the current packet directly if this is the case.
01221     if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01222       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01223             "But since we are in %C, we don't have to do "
01224             "anything more in this important send_stop().\n",
01225             mode_as_str(this->mode_)));
01226       // We don't do anything if we are in MODE_QUEUE.  Just leave.
01227       return;
01228     }
01229 
01230     size_t header_length = this->header_.length_;
01231     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01232           "We are in MODE_DIRECT in an important send_stop() - "
01233           "header_.length_ == [%d].\n", header_length));
01234 
01235     // Only attempt to send the current packet (directly) if the current
01236     // packet actually contains something (it could be empty).
01237     if ((header_length > 0) &&
01238         //(this->elems_.size ()+this->not_yet_pac_q_->size() > 0))
01239         (this->elems_.size() > 0)) {
01240       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01241             "There is something in the current packet - attempt to send "
01242             "it (directly) now.\n"));
01243       // If a relink needs to be done for this packet to be sent, do it.
01244       this->direct_send(true);
01245       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01246             "Back from the attempt to send leftover packet directly.\n"));
01247 
01248       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01249             "But we %C as a result.\n",
01250             ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01251                                           "stayed in MODE_DIRECT" )));
01252       if (this->mode_ == MODE_QUEUE  && this->mode_ != MODE_SUSPEND) {
01253         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01254               "Notify Synch thread of work availability\n"));
01255         this->synch_->work_available();
01256       }
01257     }
01258   }
01259 
01260   send_delayed_notifications();
01261 }
01262 
01263 void
01264 TransportSendStrategy::remove_all_msgs(RepoId pub_id)
01265 {
01266   DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01267 
01268   const TransportQueueElement::MatchOnPubId match(pub_id);
01269   send_delayed_notifications(&match);
01270 
01271   GuardType guard(this->lock_);
01272 
01273   if (this->send_buffer_ != 0) {
01274     // If a secondary send buffer is bound, removed samples must
01275     // be retained in order to properly maintain the buffer:
01276     this->send_buffer_->retain_all(pub_id);
01277   }
01278 
01279   do_remove_sample(pub_id, match, 0);
01280 }
01281 
01282 RemoveResult
01283 TransportSendStrategy::remove_sample(const DataSampleElement* sample, void* context)
01284 {
01285   DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01286 
01287   VDBG_LVL((LM_DEBUG, "(%P|%t)  Removing sample: %@\n", sample->get_sample()), 5);
01288 
01289   // The sample to remove is either in temporary delayed notification list or
01290   // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
01291   // notification list by transport thread, it needs acquire WriterDataContainer lock for
01292   // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
01293   // complete as this call already hold the WriterContainer's lock. So this call is safe to
01294   // access the sample to remove. If it's going to be removed by this remove_sample() calling
01295   // thread, it will be removed either from delayed notification list or from internal list
01296   // in which case the element carry the info if the sample is released so the datalinkset
01297   // can stop calling rest datalinks to remove this sample if it's already released..
01298 
01299   const char* const payload = sample->get_sample()->cont()->rd_ptr();
01300   RepoId pub_id = sample->get_pub_id();
01301   const TransportQueueElement::MatchOnDataPayload modp(payload);
01302   if (send_delayed_notifications(&modp)) {
01303     return REMOVE_RELEASED;
01304   }
01305 
01306   GuardType guard(this->lock_);
01307   return do_remove_sample(pub_id, modp, context);
01308 }
01309 
01310 RemoveResult
01311 TransportSendStrategy::do_remove_sample(const RepoId&,
01312   const TransportQueueElement::MatchCriteria& criteria,
01313   void*)
01314 {
01315   DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01316 
01317   //ciju: Tim had the idea that we could do the following check
01318   // if ((this->mode_ == MODE_DIRECT) ||
01319   //     ((this->pkt_chain_ == 0) && (queue_ == empty)))
01320   // then we can assume that the sample can be safely removed (no need for
01321   // replacement) from the elems_ queue.
01322   if ((this->mode_ == MODE_DIRECT)
01323       || ((this->pkt_chain_ == 0) && (this->queue_.size() == 0))) {
01324     //ciju: I believe this is the only mode where a safe
01325     // assumption can be made that the samples
01326     // in the elems_ queue aren't part of a packet.
01327     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01328           "The mode is MODE_DIRECT, or the queue is empty and no "
01329           "transport packet is in progress.\n"));
01330 
01331     QueueRemoveVisitor simple_rem_vis(criteria);
01332     this->elems_.accept_remove_visitor(simple_rem_vis);
01333 
01334     const RemoveResult status = simple_rem_vis.status();
01335 
01336     if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01337       this->header_.length_ -= simple_rem_vis.removed_bytes();
01338 
01339     } else if (status == REMOVE_NOT_FOUND) {
01340       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01341             "Failed to find the sample to remove.\n"));
01342     }
01343 
01344     return status;
01345   }
01346 
01347   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01348         "Visit the queue_ with the RemoveElementVisitor.\n"));
01349 
01350   QueueRemoveVisitor simple_rem_vis(criteria);
01351   this->queue_.accept_remove_visitor(simple_rem_vis);
01352 
01353   RemoveResult status = simple_rem_vis.status();
01354 
01355   if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01356     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01357           "The sample was removed from the queue_.\n"));
01358     // This means that the visitor did not encounter any fatal error
01359     // along the way, *AND* the sample was found in the queue_,
01360     // and has now been removed.  We are done.
01361     return status;
01362   }
01363 
01364   if (status == REMOVE_ERROR) {
01365     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01366           "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01367     // This means that the visitor encountered some fatal error along
01368     // the way (and it already reported something to the log).
01369     // Return our failure code.
01370     return status;
01371   }
01372 
01373   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01374         "The RemoveElementVisitor did not find the sample in queue_.\n"));
01375 
01376   // We get here if the visitor did not encounter any fatal error, but it
01377   // also didn't find the sample - and hence it didn't perform any
01378   // "remove sample" logic.
01379 
01380   // Now we need to turn our attention to the current transport packet,
01381   // since the packet is likely in a "partially sent" state, and the
01382   // sample may still be contributing unsent bytes in the pkt_chain_.
01383 
01384   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01385         "Visit our elems_ with the PacketRemoveVisitor.\n"));
01386 
01387   PacketRemoveVisitor pac_rem_vis(criteria,
01388                                   this->pkt_chain_,
01389                                   this->header_block_,
01390                                   this->replaced_element_mb_allocator_,
01391                                   this->replaced_element_db_allocator_);
01392 
01393   this->elems_.accept_replace_visitor(pac_rem_vis);
01394 
01395   status = pac_rem_vis.status();
01396 
01397   if (status == REMOVE_ERROR) {
01398     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01399           "The PacketRemoveVisitor encountered a fatal error.\n"));
01400 
01401   } else if (status == REMOVE_NOT_FOUND) {
01402     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01403           "The PacketRemoveVisitor didn't find the sample.\n"));
01404 
01405   } else {
01406     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01407           "The PacketRemoveVisitor found the sample and removed it.\n"));
01408   }
01409 
01410   return status;
01411 }
01412 
01413 void
01414 TransportSendStrategy::direct_send(bool relink)
01415 {
01416   DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01417 
01418   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01419         "Prepare the current packet for a direct send attempt.\n"));
01420 
01421   // Prepare the packet for sending.
01422   this->prepare_packet();
01423 
01424   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01425         "Now attempt to send the packet.\n"));
01426 
01427   // We will try resend the packet if the send() fails and then connection
01428   // is re-established.  Only loops if the "continue" line is hit.
01429   while (true) {
01430     // Attempt to send the packet
01431     const SendPacketOutcome outcome = this->send_packet();
01432 
01433     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01434           "The outcome of the send_packet() was %d.\n", outcome));
01435 
01436     if ((outcome == OUTCOME_BACKPRESSURE) ||
01437         (outcome == OUTCOME_PARTIAL_SEND)) {
01438       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01439                 "The outcome of the send_packet() was either "
01440                 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01441 
01442       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01443                 "Flip into the MODE_QUEUE mode_.\n"), 5);
01444 
01445       // We encountered backpressure, or only sent part of the packet.
01446       this->mode_ = MODE_QUEUE;
01447 
01448     } else if ((outcome == OUTCOME_PEER_LOST) ||
01449                (outcome == OUTCOME_SEND_ERROR)) {
01450       if (outcome == OUTCOME_SEND_ERROR) {
01451         ACE_ERROR((LM_WARNING,
01452                    ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01453                    ACE_TEXT("send buffer management: %p.\n"),
01454                    ACE_TEXT("send_bytes")));
01455 
01456         if (Transport_debug_level > 0) {
01457           this->transport_.config().dump();
01458         }
01459       } else {
01460         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01461               "The outcome of the send_packet() was "
01462               "OUTCOME_PEER_LOST.\n"));
01463       }
01464 
01465       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01466                 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01467 
01468       if (this->mode_ != MODE_SUSPEND) {
01469         this->mode_before_suspend_ = this->mode_;
01470         this->mode_ = MODE_SUSPEND;
01471       }
01472 
01473       if (relink) {
01474         bool do_suspend = false;
01475         this->relink(do_suspend);
01476 
01477         if (this->mode_ == MODE_SUSPEND) {
01478           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01479                     "The reconnect has not done yet and we are "
01480                     "still in MODE_SUSPEND.\n"), 5);
01481 
01482         } else if (this->mode_ == MODE_TERMINATED) {
01483           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01484                     "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01485 
01486         } else {
01487           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01488                     "Try send the packet again since the connection "
01489                     "is re-established.\n"), 5);
01490 
01491           // Try send the packet again since the connection is re-established.
01492           continue;
01493         }
01494       }
01495 
01496     } else {
01497       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01498             "The outcome of the send_packet() must have been "
01499             "OUTCOME_COMPLETE_SEND.\n"));
01500       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01501             "So, we will just stay in MODE_DIRECT.\n"));
01502     }
01503 
01504     break;
01505   }
01506 
01507   // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
01508 }
01509 
01510 void
01511 TransportSendStrategy::get_packet_elems_from_queue()
01512 {
01513   DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01514 
01515   for (TransportQueueElement* element = this->queue_.peek(); element != 0;
01516        element = this->queue_.peek()) {
01517 
01518     // Total number of bytes in the current element's message block chain.
01519     size_t element_length = element->msg()->total_length();
01520 
01521     // Flag used to determine if the element requires a packet all to itself.
01522     const bool exclusive_packet = element->requires_exclusive_packet();
01523 
01524     const size_t avail = this->space_available();
01525 
01526     bool frag = false;
01527     if (element_length > avail) {
01528       // The current element won't fit into the current packet
01529       if (this->max_message_size()) { // fragmentation enabled
01530         this->header_.first_fragment_ = !element->is_fragment();
01531         VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting from queue\n"), 0);
01532         ElementPair ep = element->fragment(avail);
01533         element = ep.first;
01534         element_length = element->msg()->total_length();
01535         this->queue_.replace_head(ep.second);
01536         frag = true; // queue_ is already taken care of, don't get() later
01537       } else {
01538         break;
01539       }
01540     }
01541 
01542     // If exclusive and the current packet is empty, we won't violate the
01543     // exclusive_packet requirement by put()'ing the element
01544     // into the elems_ collection.
01545     if ((exclusive_packet && this->elems_.size() == 0)
01546         || !exclusive_packet) {
01547       // At this point, we have passed all of the pre-conditions and we can
01548       // now extract the current element from the queue_, put it into the
01549       // packet elems_, and adjust the packet header_.length_.
01550       this->elems_.put(frag ? element : this->queue_.get());
01551       if (this->header_.length_ == 0) {
01552         this->header_.last_fragment_ = !frag && element->is_fragment();
01553       }
01554       this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01555       VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Packetizing from queue\n"), 0);
01556     }
01557 
01558     // With exclusive and (elems_.size() != 0), we don't use the current
01559     // element as part of the packet.  We know that there is already
01560     // at least one element in the packet, and the current element
01561     // is going to need its own (exclusive) packet.  We will just
01562     // use the packet elems_ as it is now.  Always break once
01563     // we've encountered and dealt with the exclusive_packet case.
01564     // Also break if fragmentation was required.
01565     if (exclusive_packet || frag
01566         // If the current number of packet elems_ has reached the maximum
01567         // number of samples per packet, then we are done.
01568         || this->elems_.size() == this->max_samples_
01569         // If the current value of the header_.length_ exceeds (or equals)
01570         // the optimum_size_ for a packet, then we are done.
01571         || this->header_.length_ >= this->optimum_size_) {
01572       break;
01573     }
01574   }
01575 }
01576 
01577 void
01578 TransportSendStrategy::prepare_header()
01579 {
01580   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01581 
01582   // Increment header sequence for packet:
01583   this->header_.sequence_ = ++this->header_sequence_;
01584 
01585   // Allow the specific implementation the opportunity to set
01586   // values in the packet header.
01587   this->prepare_header_i();
01588 }
01589 
01590 void
01591 TransportSendStrategy::prepare_header_i()
01592 {
01593   DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01594 
01595   // Default implementation does nothing.
01596 }
01597 
01598 void
01599 TransportSendStrategy::prepare_packet()
01600 {
01601   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01602 
01603   // Prepare the header for sending.
01604   this->prepare_header();
01605 
01606   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01607         "Marshal the packet header.\n"));
01608 
01609   if (this->header_block_ != 0) {
01610     this->header_block_->release();
01611   }
01612 
01613   ACE_NEW_MALLOC(this->header_block_,
01614     static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01615     ACE_Message_Block(this->max_header_size_,
01616                       ACE_Message_Block::MB_DATA,
01617                       0,
01618                       0,
01619                       0,
01620                       0,
01621                       ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01622                       ACE_Time_Value::zero,
01623                       ACE_Time_Value::max_time,
01624                       this->header_db_allocator_.get(),
01625                       this->header_mb_allocator_.get()));
01626 
01627   marshal_transport_header(this->header_block_);
01628 
01629   this->pkt_chain_ = this->header_block_->duplicate();
01630 
01631   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01632         "Use a BuildChainVisitor to visit the packet elems_.\n"));
01633 
01634   // Build up a chain of blocks by duplicating the message block chain
01635   // held by each element (in elems_), and then chaining the new duplicate
01636   // blocks together to form one long chain.
01637   BuildChainVisitor visitor;
01638   this->elems_.accept_visitor(visitor);
01639 
01640   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01641         "Attach the visitor's chain of blocks to the lone (packet "
01642         "header) block currently in the pkt_chain_.\n"));
01643 
01644   // Attach the visitor's chain of blocks to the packet header block.
01645   this->pkt_chain_->cont(visitor.chain());
01646 
01647   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01648         "Increment header sequence for next packet.\n"));
01649 
01650   // Allow the specific implementation the opportunity to process the
01651   // newly prepared packet.
01652   this->prepare_packet_i();
01653 
01654   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01655         "Set the header_complete_ flag to false.\n"));
01656 
01657   // Set the header_complete_ to false to indicate
01658   // that the first block in the pkt_chain_ is the packet header block
01659   // (actually a duplicate() of the packet header_block_).
01660   this->header_complete_ = false;
01661 }
01662 
01663 bool
01664 TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
01665 {
01666   return *mb << this->header_;
01667 }
01668 
01669 void
01670 TransportSendStrategy::prepare_packet_i()
01671 {
01672   DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01673 
01674   // Default implementation does nothing.
01675 }
01676 
01677 void
01678 TransportSendStrategy::set_graceful_disconnecting(bool flag)
01679 {
01680   this->graceful_disconnecting_ = flag;
01681 }
01682 
01683 ssize_t
01684 TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
01685 {
01686   if (Transport_debug_level > 9) {
01687     ACE_DEBUG((LM_DEBUG,
01688                ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01689                ACE_TEXT("sending data at 0x%x.\n"),
01690                id(), packet));
01691   }
01692   DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01693 
01694 #if defined(OPENDDS_SECURITY)
01695   // pre_send_packet may provide different data that takes the place of the
01696   // original "packet" (used for security encryption/authentication)
01697   Message_Block_Ptr substitute(pre_send_packet(packet));
01698 #endif
01699 
01700   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01701             "Populate the iovec array using the packet.\n"), 5);
01702 
01703   iovec iov[MAX_SEND_BLOCKS];
01704 
01705 #if defined(OPENDDS_SECURITY)
01706   const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
01707 #else
01708   const int num_blocks = mb_to_iov(*packet, iov);
01709 #endif
01710 
01711   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01712             "There are [%d] number of entries in the iovec array.\n",
01713             num_blocks), 5);
01714 
01715   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01716             "Attempt to send_bytes() now.\n"), 5);
01717 
01718   const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01719 
01720   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01721             "The send_bytes() said that num_bytes_sent == [%d].\n",
01722             num_bytes_sent), 5);
01723 
01724 #if defined(OPENDDS_SECURITY)
01725   if (substitute && num_bytes_sent > 0) {
01726     // Although the "substitute" data took the place of "packet", the rest
01727     // of the framework needs to account for the bytes in "packet" being taken
01728     // care of, as if they were actually sent.
01729     // Since this is done with datagram sockets, partial sends aren't possible.
01730     return packet->total_length();
01731   }
01732 #endif
01733 
01734   return num_bytes_sent;
01735 }
01736 
01737 TransportSendStrategy::SendPacketOutcome
01738 TransportSendStrategy::send_packet()
01739 {
01740   DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01741 
01742   int bp_flag = 0;
01743   const ssize_t num_bytes_sent =
01744     this->do_send_packet(this->pkt_chain_, bp_flag);
01745 
01746   if (num_bytes_sent == 0) {
01747     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01748               "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01749     // This means that the peer has disconnected.
01750     return OUTCOME_PEER_LOST;
01751   }
01752 
01753   if (num_bytes_sent < 0) {
01754     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01755               "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01756 
01757     // Check for backpressure...
01758     if (bp_flag == 1) {
01759       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01760                 "Since backpressure flag is true, return "
01761                 "OUTCOME_BACKPRESSURE.\n"), 5);
01762       // Ok.  Not really an error - just backpressure.
01763       return OUTCOME_BACKPRESSURE;
01764     }
01765 
01766     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01767               "Since backpressure flag is false, return "
01768               "OUTCOME_SEND_ERROR.\n"), 5);
01769 
01770     // Not backpressure - it's a real error.
01771     // Note: moved this to send_bytes so the errno msg could be written.
01772     //ACE_ERROR((LM_ERROR,
01773     //           "(%P|%t) ERROR: Call to peer().send() failed with negative "
01774     //           "return code.\n"));
01775 
01776     return OUTCOME_SEND_ERROR;
01777   }
01778 
01779   if (this->send_buffer_ != 0) {
01780     // If a secondary send buffer is bound, sent samples must
01781     // be inserted in order to properly maintain the buffer:
01782     this->send_buffer_->insert(this->header_.sequence_,
01783       &this->elems_, this->pkt_chain_);
01784   }
01785 
01786   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01787             "Since num_bytes_sent > 0, adjust the packet to account for "
01788             "the bytes that did get sent.\n"),5);
01789 
01790   // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
01791   // to account for the bytes that have been sent.
01792   const int result =
01793     this->adjust_packet_after_send(num_bytes_sent);
01794 
01795   if (result == 0) {
01796     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01797           "The adjustment logic says that the complete packet was "
01798           "sent.  Return OUTCOME_COMPLETE_SEND.\n"));
01799     return OUTCOME_COMPLETE_SEND;
01800   }
01801 
01802   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01803         "The adjustment logic says that only a part of the packet was "
01804         "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01805 
01806   return OUTCOME_PARTIAL_SEND;
01807 }
01808 
01809 ssize_t
01810 TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
01811 {
01812   int val = 0;
01813   ACE_HANDLE handle = this->get_handle();
01814 
01815   if (handle == ACE_INVALID_HANDLE)
01816     return -1;
01817 
01818   ACE::record_and_set_non_blocking_mode(handle, val);
01819 
01820   // Set the back-pressure flag to false.
01821   bp = 0;
01822 
01823   // Clear errno
01824   errno = 0;
01825 
01826   ssize_t result = this->send_bytes_i(iov, n);
01827 
01828   if (result == -1) {
01829     if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01830       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
01831             "Backpressure encountered.\n"));
01832       // Set the back-pressure flag to true
01833       bp = 1;
01834 
01835     } else {
01836       VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01837                 ACE_TEXT("sendv"), n),1);
01838 
01839       // try to get the application to core when "Bad Address" is returned
01840       // by looking at the iovec
01841       for (int ii = 0; ii < n; ii++) {
01842         ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01843                    ii, iov[ii].iov_len, iov[ii].iov_base));
01844       }
01845     }
01846   }
01847 
01848   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
01849             "The sendv() returned [%d].\n", result), 5);
01850 
01851   ACE::restore_non_blocking_mode(handle, val);
01852 
01853   return result;
01854 }
01855 
01856 void
01857 TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
01858 {
01859   if (Transport_debug_level) {
01860     size_t size = this->delayed_delivered_notification_queue_.size();
01861     if ((size > 0) && (size % this->max_samples_ == 0)) {
01862       ACE_DEBUG((LM_DEBUG,
01863                  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01864                  size));
01865     }
01866   }
01867 
01868   this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01869 }
01870 
01871 void
01872 OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request(TransportQueueElement* element)
01873 {
01874   GuardType guard(this->lock_);
01875   element->data_delivered();
01876 }
01877 
01878 size_t
01879 TransportSendStrategy::space_available() const
01880 {
01881   const size_t used = this->max_header_size_ + this->header_.length_,
01882     max_msg = this->max_message_size();
01883   if (max_msg) {
01884     return std::min(this->max_size_ - used, max_msg - used);
01885   }
01886   return this->max_size_ - used;
01887 }
01888 
01889 int
01890 TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
01891 {
01892   int num_blocks = 0;
01893 #ifdef _MSC_VER
01894 #pragma warning(push)
01895 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
01896 // since on other platforms iov_len is 64-bit
01897 #pragma warning(disable : 4267)
01898 #endif
01899   for (const ACE_Message_Block* block = &msg;
01900        block && num_blocks < MAX_SEND_BLOCKS;
01901        block = block->cont()) {
01902     iov[num_blocks].iov_len = block->length();
01903     iov[num_blocks++].iov_base = block->rd_ptr();
01904   }
01905 #ifdef _MSC_VER
01906 #pragma warning(pop)
01907 #endif
01908   return num_blocks;
01909 }
01910 
01911 // close namespaces
01912 }
01913 }
01914 
01915 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1