LCOV - code coverage report
Current view: top level - DCPS/transport/framework - TransportSendStrategy.cpp (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 711 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 36 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
       9             : 
      10             : #include "TransportSendStrategy.h"
      11             : 
      12             : #include "RemoveAllVisitor.h"
      13             : #include "TransportInst.h"
      14             : #include "ThreadSynchStrategy.h"
      15             : #include "ThreadSynchResource.h"
      16             : #include "TransportQueueElement.h"
      17             : #include "TransportSendElement.h"
      18             : #include "TransportSendBuffer.h"
      19             : #include "BuildChainVisitor.h"
      20             : #include "QueueRemoveVisitor.h"
      21             : #include "PacketRemoveVisitor.h"
      22             : #include "TransportDefs.h"
      23             : #include "DirectPriorityMapper.h"
      24             : #include "EntryExit.h"
      25             : 
      26             : #include <dds/DCPS/DataSampleHeader.h>
      27             : #include <dds/DCPS/DataSampleElement.h>
      28             : #include <dds/DCPS/Service_Participant.h>
      29             : 
      30             : #include <ace/Reverse_Lock_T.h>
      31             : 
      32             : #if !defined (__ACE_INLINE__)
      33             : #include "TransportSendStrategy.inl"
      34             : #endif /* __ACE_INLINE__ */
      35             : 
      36             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      37             : 
      38             : namespace OpenDDS {
      39             : namespace DCPS {
      40             : 
      41             : //TBD: The number of chunks of the replace element allocator
      42             : //     is hard coded for now. This will be configurable when
      43             : //     we implement the dds configurations. This value should
      44             : //     be the number of marshalled DataSampleHeader that a
      45             : //     packet could contain.
      46             : #define NUM_REPLACED_ELEMENT_CHUNKS 20
      47             : 
      48             : namespace {
      49             :   /// Arbitrary small constant that represents the minimum
      50             :   /// amount of payload data we'll have in one fragment.
      51             :   /// In this case "payload data" includes the content-filtering
      52             :   /// GUID sequence, so this is chosen to be 4 + (16 * N).
      53             :   static const size_t MIN_FRAG = 68;
      54             : }
      55             : 
      56             : // I think 2 chunks for the header message block is enough
      57             : // - one for the original copy and one for duplicate which
      58             : // occurs every packet and is released after packet is sent.
      59             : // The data block only needs 1 chunk since the duplicate()
      60             : // just increases the ref count.
      61           0 : TransportSendStrategy::TransportSendStrategy(
      62             :   std::size_t id,
      63             :   const TransportImpl_rch& transport,
      64             :   ThreadSynchResource* synch_resource,
      65             :   Priority priority,
      66           0 :   const ThreadSynchStrategy_rch& thread_sync_strategy)
      67             :   : ThreadSynchWorker(id),
      68           0 :     max_samples_(DEFAULT_CONFIG_MAX_SAMPLES_PER_PACKET),
      69           0 :     optimum_size_(DEFAULT_CONFIG_OPTIMUM_PACKET_SIZE),
      70           0 :     max_size_(DEFAULT_CONFIG_MAX_PACKET_SIZE),
      71           0 :     max_header_size_(0),
      72           0 :     header_block_(0),
      73           0 :     pkt_chain_(0),
      74           0 :     header_complete_(false),
      75           0 :     start_counter_(0),
      76           0 :     mode_(MODE_DIRECT),
      77           0 :     mode_before_suspend_(MODE_NOT_SET),
      78           0 :     lock_(),
      79           0 :     replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
      80           0 :     replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
      81           0 :     transport_(transport),
      82           0 :     graceful_disconnecting_(false),
      83           0 :     link_released_(true),
      84           0 :     send_buffer_(0)
      85             : {
      86             :   DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
      87             : 
      88           0 :   TransportInst_rch cfg = transport->config();
      89           0 :   if (cfg) {
      90           0 :     max_samples_ = cfg->max_samples_per_packet_;
      91           0 :     optimum_size_ = cfg->optimum_packet_size_;
      92           0 :     max_size_ = cfg->max_packet_size_;
      93             :   }
      94             : 
      95             :   // Create a ThreadSynch object just for us.
      96           0 :   DirectPriorityMapper mapper(priority);
      97           0 :   synch_.reset(thread_sync_strategy->create_synch_object(
      98             :                    synch_resource,
      99             : #ifdef ACE_WIN32
     100             :                    ACE_DEFAULT_THREAD_PRIORITY,
     101             : #else
     102           0 :                    mapper.thread_priority(),
     103             : #endif
     104           0 :                    TheServiceParticipant->scheduler()));
     105             : 
     106             :   // We cache this value in data member since it doesn't change, and we
     107             :   // don't want to keep asking for it over and over.
     108           0 :   max_header_size_ = TransportHeader::get_max_serialized_size();
     109             : 
     110           0 :   delayed_delivered_notification_queue_.reserve(max_samples_);
     111           0 : }
     112             : 
     113           0 : TransportSendStrategy::~TransportSendStrategy()
     114             : {
     115             :   DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
     116             : 
     117             : 
     118           0 :   delayed_delivered_notification_queue_.clear();
     119           0 : }
     120             : 
     121             : void
     122           0 : TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
     123             : {
     124           0 :   send_buffer_ = send_buffer;
     125             : 
     126           0 :   if (send_buffer_ != 0) {
     127           0 :     send_buffer_->bind(this);
     128             :   }
     129           0 : }
     130             : 
     131             : ThreadSynchWorker::WorkOutcome
     132           0 : TransportSendStrategy::perform_work()
     133             : {
     134             :   DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
     135             : 
     136             :   SendPacketOutcome outcome;
     137           0 :   bool no_more_work = false;
     138             : 
     139             :   { // scope for the guard(lock_);
     140           0 :     GuardType guard(lock_);
     141             : 
     142           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(mode_)), 5);
     143             : 
     144           0 :     if (mode_ == MODE_TERMINATED) {
     145           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     146             :                 "Entered perform_work() and mode_ is MODE_TERMINATED - "
     147             :                 "we lost connection and could not reconnect, just return "
     148             :                 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
     149           0 :       return WORK_OUTCOME_BROKEN_RESOURCE;
     150             :     }
     151             : 
     152             :     // The perform_work() is called by our synch_ object using
     153             :     // a thread designated to call this method when it thinks
     154             :     // we need to be called in order to "service" the queue_ and/or
     155             :     // deal with a partially sent current packet.
     156             :     //
     157             :     // We will return a 0 if we don't see a need to have our perform_work()
     158             :     // called again, and we will return a 1 if we do see the need to have our
     159             :     // perform_work() method called again.
     160             : 
     161             :     // First, make sure that the mode_ indicates that we are, indeed, in
     162             :     // the MODE_QUEUE mode.  If we are not in MODE_QUEUE mode (meaning we are
     163             :     // in MODE_DIRECT), then it means we didn't need to have this perform_work()
     164             :     // method called - in this case, do nothing other than return
     165             :     // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
     166             :     // see a need for it to call our perform_work() again (at least not
     167             :     // right now).
     168           0 :     if (mode_ != MODE_QUEUE && mode_ != MODE_SUSPEND) {
     169           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     170             :                 "Entered perform_work() and mode_ is %C - just return "
     171             :                 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(mode_)), 5);
     172           0 :       return WORK_OUTCOME_NO_MORE_TO_DO;
     173             :     }
     174             : 
     175             :     // Check the "state" of the current packet.  We will either find that the
     176             :     // current packet is in a state of being "partially sent", or we will find
     177             :     // it in a state of being "empty".  When the current packet is "empty", it
     178             :     // means that it is time to build up the current packet using elements
     179             :     // extracted from the queue_, and then we will attempt to send the
     180             :     // packet.  When we find the current packet in the "partially sent" state,
     181             :     // we will not touch the queue_ - we will just try to send the unsent
     182             :     // bytes in the current (partially sent) packet.
     183           0 :     const size_t header_length = header_.length_;
     184             : 
     185           0 :     if (header_length == 0) {
     186           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     187             :                 "The current packet doesn't have any unsent bytes - we "
     188             :                 "need to 'populate' the current packet with elems from "
     189             :                 "the queue.\n"), 5);
     190             : 
     191             :       // The current packet is "empty".  Build up the current packet using
     192             :       // elements from the queue_, and prepare the current packet to be sent.
     193             : 
     194             :       // Before we build the packet from the queue_, let's make sure that
     195             :       // there is actually something on the queue_ to build from.
     196           0 :       if (queue_.size() == 0) {
     197           0 :         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     198             :                   "But the queue is empty.  We have cleared the "
     199             :                   "backpressure situation.\n"),5);
     200             :         // We are here because the queue_ is empty, and there isn't
     201             :         // any "partial packet" bytes left to send.  We have overcome
     202             :         // the backpressure situation and don't have anything to do
     203             :         // right now.
     204             : 
     205           0 :         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     206             :                   "Flip mode to MODE_DIRECT, and return "
     207             :                   "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
     208             : 
     209             :         // Flip the mode back to MODE_DIRECT.
     210           0 :         mode_ = MODE_DIRECT;
     211             : 
     212             :         // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
     213             :         // perform_work() doesn't need to be called again (at this time).
     214           0 :         return WORK_OUTCOME_NO_MORE_TO_DO;
     215             :       }
     216             : 
     217           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     218             :                 "There is at least one elem in the queue - get the packet "
     219             :                 "elems from the queue.\n"), 5);
     220             : 
     221             :       // There is stuff in the queue_ if we get to this point in the logic.
     222             :       // Build-up the current packet using element(s) from the queue_.
     223           0 :       get_packet_elems_from_queue();
     224             : 
     225           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     226             :                 "Prepare the packet from the packet elems_.\n"), 5);
     227             : 
     228             :       // Now we can prepare the new packet to be sent.
     229           0 :       prepare_packet();
     230             : 
     231           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     232             :                 "Packet has been prepared from packet elems_.\n"), 5);
     233             : 
     234             :     } else {
     235           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     236             :                 "We have a current packet that still has unsent bytes.\n"), 5);
     237             :     }
     238             : 
     239           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     240             :               "Attempt to send the current packet.\n"), 5);
     241             : 
     242             :     // Now we can attempt to send the current packet - whether it is
     243             :     // a "partially sent" packet or one that we just built-up using elements
     244             :     // from the queue_ (and subsequently prepared for sending) - it doesn't
     245             :     // matter.  Just attempt to send as many of the "unsent" bytes in the
     246             :     // packet as possible.
     247           0 :     outcome = send_packet();
     248             : 
     249             :     // If we sent the whole packet (eg, partial_send is false), and the queue_
     250             :     // is now empty, then we've cleared the backpressure situation.
     251           0 :     if ((outcome == OUTCOME_COMPLETE_SEND) && (queue_.size() == 0)) {
     252           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     253             :                 "Flip the mode to MODE_DIRECT, and then return "
     254             :                 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
     255             : 
     256             :       // Revert back to MODE_DIRECT mode.
     257           0 :       mode_ = MODE_DIRECT;
     258           0 :       no_more_work = true;
     259             :     }
     260           0 :   } // End of scope for guard(lock_);
     261             : 
     262           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     263             :             "The outcome of the send_packet() was %d.\n", outcome), 5);
     264             : 
     265           0 :   send_delayed_notifications();
     266             : 
     267             :   // If we sent the whole packet (eg, partial_send is false), and the queue_
     268             :   // is now empty, then we've cleared the backpressure situation.
     269           0 :   if (no_more_work) {
     270           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     271             :               "We sent the whole packet, and there is nothing left on "
     272             :               "the queue now.\n"), 5);
     273             : 
     274             :     // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
     275             :     // don't desire another call to this perform_work() method.
     276           0 :     return WORK_OUTCOME_NO_MORE_TO_DO;
     277             :   }
     278             : 
     279           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     280             :             "We still have unsent bytes in the current packet AND/OR there "
     281             :             "are still elements in the queue.\n"), 5);
     282             : 
     283           0 :   if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
     284           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     285             :               "We lost our connection, or had some fatal connection "
     286             :               "error.  Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
     287             : 
     288           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     289             :               "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
     290             : 
     291           0 :     bool do_suspend = true;
     292           0 :     relink(do_suspend);
     293             : 
     294           0 :     if (mode_ == MODE_SUSPEND) {
     295           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     296             :                 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
     297             :                 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
     298             :       // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
     299             :       // don't desire another call to this perform_work() method.
     300           0 :       return WORK_OUTCOME_NO_MORE_TO_DO;
     301             : 
     302           0 :     } else if (mode_ == MODE_TERMINATED) {
     303           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     304             :                 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
     305           0 :       return WORK_OUTCOME_BROKEN_RESOURCE;
     306             : 
     307             :     } else {
     308           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     309             :                 "Reconnect succeeded, Notify synch thread of work "
     310             :                 "availability.\n"), 5);
     311             :       // If the datalink is re-established then notify the synch
     312             :       // thread to perform work.  We do not hold the object lock at
     313             :       // this point.
     314           0 :       synch_->work_available();
     315             :     }
     316             :   }
     317             : 
     318           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     319             :             "We still have an 'unbroken' connection.\n"), 5);
     320             : 
     321           0 :   if (outcome == OUTCOME_BACKPRESSURE) {
     322           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     323             :               "We experienced backpressure on our attempt to send the "
     324             :               "packet.  Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
     325             :     // We have a "clogged resource".
     326           0 :     return WORK_OUTCOME_CLOGGED_RESOURCE;
     327             :   }
     328             : 
     329           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     330             :             "We may have sent the whole current packet, but still have "
     331             :             "elements on the queue.\n"), 5);
     332           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     333             :             "Or, we may have only partially sent the current packet.\n"), 5);
     334             : 
     335           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     336             :             "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
     337             : 
     338             :   // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
     339             :   // in the queue_ to be sent.  *OR* we have have had an OUTCOME_PARTIAL_SEND,
     340             :   // which equates to the same thing - we still have work to do.
     341             : 
     342             :   // We are still in MODE_QUEUE mode, thus there is still work to be
     343             :   // done to service the queue_ and/or a partially sent current packet.
     344             :   // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
     345             :   // want it to call this perform_work() method.
     346           0 :   return WORK_OUTCOME_MORE_TO_DO;
     347             : }
     348             : 
     349             : // Now we need to "peel off" those message blocks that were fully
     350             : // sent, adjust the first message block with an unsent byte to have
     351             : // its rd_ptr() pointing to that first unsent byte, and set the
     352             : // pkt_chain_ to that first message block with an unsent byte.
     353             : // As we "peel off" fully sent message blocks, we need to also deal with
     354             : // fully sent elements by removing them from the elems_ and
     355             : // calling their data_delivered() method.  In addition, as we peel off
     356             : // the message blocks that are fully sent, we need to untie them from
     357             : // the chain and release them.
     358             : // And finally, don't forget to adjust the header_.length_ to
     359             : // account for the num_bytes_sent (beware that some of the num_bytes_sent
     360             : // may be packet header bytes and shouldn't affect the header_.length_
     361             : // which doesn't include the packet header bytes.
     362             : int
     363           0 : TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
     364             : {
     365             :   DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
     366             : 
     367           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     368             :         "Adjusting the current packet because %d bytes of the packet "
     369             :         "have been sent.\n", num_bytes_sent));
     370             : 
     371           0 :   ssize_t num_bytes_left = num_bytes_sent;
     372           0 :   ssize_t num_non_header_bytes_sent = 0;
     373             : 
     374           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     375             :         "Set num_bytes_left to %d.\n", num_bytes_left));
     376           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     377             :         "Set num_non_header_bytes_sent to %d.\n",
     378             :         num_non_header_bytes_sent));
     379             : 
     380           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     381             :         "Peek at the element at the front of the packet elems_.\n"));
     382             : 
     383             :   // This is the element currently at the front of elems_.
     384           0 :   TransportQueueElement* element = elems_.peek();
     385             : 
     386           0 :   if(!element){
     387           0 :     ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
     388             :   } else {
     389           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     390             :           "Use the element's msg() to find the last block in "
     391             :           "the msg() chain.\n"));
     392             : 
     393             :     // Get a pointer to the last message block in the element.
     394           0 :     const ACE_Message_Block* elem_tail_block = element->msg();
     395             : 
     396           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     397             :           "Start with tail block == element->msg().\n"));
     398             : 
     399           0 :     while (elem_tail_block->cont() != 0) {
     400           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     401             :             "Set tail block to its cont() block (next in chain).\n"));
     402           0 :       elem_tail_block = elem_tail_block->cont();
     403             :     }
     404             : 
     405           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     406             :           "Tail block now set (because tail block's cont() is 0).\n"));
     407             : 
     408           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     409             :           "Start the 'while (num_bytes_left > 0)' loop.\n"));
     410             : 
     411           0 :     while (num_bytes_left > 0) {
     412           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     413             :             "At top of 'num bytes left' loop.  num_bytes_left == [%d].\n",
     414             :             num_bytes_left));
     415             : 
     416           0 :       const int block_length = static_cast<int>(pkt_chain_->length());
     417             : 
     418           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     419             :             "Length of block at front of pkt_chain_ is [%d].\n",
     420             :             block_length));
     421             : 
     422           0 :       if (block_length <= num_bytes_left) {
     423           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     424             :               "The whole block at the front of pkt_chain_ was sent.\n"));
     425             : 
     426             :         // The entire message block at the front of the chain has been sent.
     427             :         // Detach the head message block from the chain and adjust
     428             :         // the pkt_chain_ to point to the next block (if any) in
     429             :         // the chain.
     430           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     431             :               "Extract the fully sent block from the pkt_chain_.\n"));
     432             : 
     433           0 :         ACE_Message_Block* fully_sent_block = pkt_chain_;
     434             : 
     435           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     436             :               "Set pkt_chain_ to pkt_chain_->cont().\n"));
     437             : 
     438           0 :         pkt_chain_ = pkt_chain_->cont();
     439             : 
     440           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     441             :               "Set the fully sent block's cont() to 0.\n"));
     442             : 
     443           0 :         fully_sent_block->cont(0);
     444             : 
     445             :         // Update the num_bytes_left to indicate that we have
     446             :         // processed the entire length of the block.
     447           0 :         num_bytes_left -= block_length;
     448             : 
     449           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     450             :               "Updated num_bytes_left to account for fully sent "
     451             :               "block (block_length == [%d]).\n", block_length));
     452           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     453             :               "Now, num_bytes_left == [%d].\n", num_bytes_left));
     454             : 
     455           0 :         if (!header_complete_) {
     456           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     457             :                 "Since the header_complete_ flag is false, it means "
     458             :                 "that the packet header block was still in the "
     459             :                 "pkt_chain_.\n"));
     460             : 
     461           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     462             :                 "Not anymore...  Set the header_complete_ flag "
     463             :                 "to true.\n"));
     464             : 
     465             :           // That was the packet header block.  And now we know that it
     466             :           // has been completely sent.
     467           0 :           header_complete_ = true;
     468             : 
     469           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     470             :                 "Release the fully sent block.\n"));
     471             : 
     472             :           // Release the fully_sent_block
     473           0 :           fully_sent_block->release();
     474             : 
     475             :         } else {
     476           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     477             :                 "Since the header_complete_ flag is true, it means "
     478             :                 "that the packet header block was not in the "
     479             :                 "pkt_chain_.\n"));
     480           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     481             :                 "So, the fully sent block was part of an element.\n"));
     482             : 
     483             :           // That wasn't the packet header block.  It was from the
     484             :           // element currently at the front of the elems_
     485             :           // collection.  If it was the last block from the
     486             :           // element, then we need to extract the element from the
     487             :           // elems_ collection and invoke data_delivered() on it.
     488           0 :           num_non_header_bytes_sent += block_length;
     489             : 
     490           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     491             :                 "Updated num_non_header_bytes_sent to account for "
     492             :                 "fully sent block (block_length == [%d]).\n",
     493             :                 block_length));
     494             : 
     495           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     496             :                 "Now, num_non_header_bytes_sent == [%d].\n",
     497             :                 num_non_header_bytes_sent));
     498             : 
     499           0 :           if (fully_sent_block->base() == elem_tail_block->base()) {
     500           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     501             :                   "Ok.  The fully sent block was a duplicate of "
     502             :                   "the tail block of the element that is at the "
     503             :                   "front of the packet elems_.\n"));
     504             : 
     505           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     506             :                   "This means that we have completely sent the "
     507             :                   "element at the front of the packet elems_.\n"));
     508             : 
     509             :             // This means that we have completely sent the element
     510             :             // that is currently at the front of the elems_ collection.
     511             : 
     512           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     513             :                   "We can release the fully sent block now.\n"));
     514             : 
     515             :             // Release the fully_sent_block
     516           0 :             fully_sent_block->release();
     517             : 
     518           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     519             :                   "We can extract the element from the front of "
     520             :                   "the packet elems_ (we were just peeking).\n"));
     521             : 
     522             :             // Extract the element from the elems_ collection
     523           0 :             element = elems_.get();
     524             : 
     525           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     526             :                   "Tell the element that a decision has been made "
     527             :                   "regarding its fate - data_delivered().\n"));
     528             : 
     529             :             // Inform the element that the data has been delivered.
     530           0 :             add_delayed_notification(element);
     531             : 
     532           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     533             :                   "Peek at the next element in the packet "
     534             :                   "elems_.\n"));
     535             : 
     536             :             // Set up for the next element in elems_ by peek()'ing.
     537           0 :             element = elems_.peek();
     538             : 
     539           0 :             if (element != 0) {
     540           0 :               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     541             :                     "The is an element still in the packet "
     542             :                     "elems_ (we are peeking at it now).\n"));
     543             : 
     544           0 :               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     545             :                     "We are going to find the tail block for the "
     546             :                     "current element (we are peeking at).\n"));
     547             : 
     548             :               // There was a "next element".  Determine the
     549             :               // elem_tail_block for it.
     550           0 :               elem_tail_block = element->msg();
     551             : 
     552           0 :               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     553             :                     "Start w/tail block == element->msg().\n"));
     554             : 
     555           0 :               while (elem_tail_block->cont() != 0) {
     556           0 :                 VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     557             :                       "Set tail block to next in chain.\n"));
     558           0 :                 elem_tail_block = elem_tail_block->cont();
     559             :               }
     560             : 
     561           0 :               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     562             :                     "Done finding tail block.\n"));
     563             :             }
     564             : 
     565             :           } else {
     566           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     567             :                   "Ok.  The fully sent block is *not* a "
     568             :                   "duplicate of the tail block of the element "
     569             :                   "at the front of the packet elems_.\n"));
     570             : 
     571           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     572             :                   "Thus, we have not completely sent the "
     573             :                   "element yet.\n"));
     574             : 
     575             :             // We didn't completely send the element - it has more
     576             :             // message blocks that haven't been sent (that we know of).
     577             : 
     578           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     579             :                   "We can release the fully_sent_block now.\n"));
     580             : 
     581             :             // Release the fully_sent_block
     582           0 :             fully_sent_block->release();
     583             :           }
     584             :         }
     585             : 
     586             :       } else {
     587           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     588             :               "Only part of the block at the front of pkt_chain_ "
     589             :               "was sent.\n"));
     590             : 
     591           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     592             :               "Advance the rd_ptr() of the front block (of pkt_chain_) "
     593             :               "by the num_bytes_left (%d).\n", num_bytes_left));
     594             : 
     595             :         // Only part of the current block was sent.
     596           0 :         pkt_chain_->rd_ptr(num_bytes_left);
     597             : 
     598           0 :         if (header_complete_) {
     599           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     600             :                 "And since the packet header block has already been "
     601             :                 "completely sent, add num_bytes_left to the "
     602             :                 "num_non_header_bytes_sent.\n"));
     603             : 
     604           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     605             :                 "Before, num_non_header_bytes_sent == %d.\n",
     606             :                 num_non_header_bytes_sent));
     607             : 
     608             :           // We know that the current block isn't the packet header
     609             :           // block because the packet header block has already been
     610             :           // completely sent.  We need to count these bytes in the
     611             :           // num_non_header_bytes_sent.
     612           0 :           num_non_header_bytes_sent += num_bytes_left;
     613             : 
     614           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     615             :                 "After, num_non_header_bytes_sent == %d.\n",
     616             :                 num_non_header_bytes_sent));
     617             :         }
     618             : 
     619           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     620             :               "Set the num_bytes_left to 0 now.\n"));
     621             : 
     622           0 :         num_bytes_left = 0;
     623             :       }
     624             :     }
     625             :   }
     626             : 
     627           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     628             :         "The 'num_bytes_left' loop has completed.\n"));
     629             : 
     630           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     631             :         "Adjust the header_.length_ to account for the "
     632             :         "num_non_header_bytes_sent.\n"));
     633           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     634             :         "Before, header_.length_ == %d.\n",
     635             :         header_.length_));
     636             : 
     637             :   // Adjust the packet header_.length_ to indicate how many non header
     638             :   // bytes are left to send.
     639           0 :   header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
     640             : 
     641           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     642             :         "After, header_.length_ == %d.\n",
     643             :         header_.length_));
     644             : 
     645             :   // Returns 0 if the entire packet was sent, and returns 1 otherwise.
     646           0 :   int rc = (header_.length_ == 0) ? 0 : 1;
     647             : 
     648           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     649             :         "Adjustments all done.  Returning [%d].  0 means entire packet "
     650             :         "has been sent.  1 means otherwise.\n",
     651             :         rc));
     652             : 
     653           0 :   return rc;
     654             : }
     655             : 
     656             : bool
     657           0 : TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
     658             : {
     659             :   DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
     660           0 :   TransportQueueElement* sample = 0;
     661           0 :   SendMode mode = MODE_NOT_SET;
     662             : 
     663           0 :   OPENDDS_VECTOR(TQESendModePair) samples;
     664             : 
     665           0 :   size_t num_delayed_notifications = 0;
     666           0 :   bool found_element = false;
     667             : 
     668             :   {
     669           0 :     GuardType guard(lock_);
     670             : 
     671           0 :     num_delayed_notifications = delayed_delivered_notification_queue_.size();
     672             : 
     673           0 :     if (num_delayed_notifications == 0) {
     674           0 :       return false;
     675             : 
     676           0 :     } else if (num_delayed_notifications == 1) {
     677             :       // Optimization for the most common case (doesn't need vectors)
     678             : 
     679           0 :       if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
     680           0 :         found_element = true;
     681           0 :         sample = delayed_delivered_notification_queue_[0].first;
     682           0 :         mode = delayed_delivered_notification_queue_[0].second;
     683             : 
     684           0 :         delayed_delivered_notification_queue_.clear();
     685             :       }
     686             : 
     687             :     } else {
     688           0 :       OPENDDS_VECTOR(TQESendModePair)::iterator iter;
     689           0 :       for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
     690           0 :         sample = iter->first;
     691           0 :         mode = iter->second;
     692           0 :         if (!match || match->matches(*sample)) {
     693           0 :           found_element = true;
     694           0 :           samples.push_back(*iter);
     695           0 :           iter = delayed_delivered_notification_queue_.erase(iter);
     696             :         } else {
     697           0 :           ++iter;
     698             :         }
     699             :       }
     700             :     }
     701           0 :   }
     702             : 
     703           0 :   if (!found_element) {
     704           0 :     return false;
     705             :   }
     706             : 
     707           0 :   bool transport_shutdown = true;
     708           0 :   TransportImpl_rch transport = transport_.lock();
     709           0 :   if (transport) {
     710           0 :     transport_shutdown = transport->is_shut_down();
     711             :   }
     712             : 
     713           0 :   if (num_delayed_notifications == 1) {
     714             :     // optimization for the common case
     715           0 :     if (mode == MODE_TERMINATED) {
     716           0 :       if (!transport_shutdown || sample->owned_by_transport()) {
     717           0 :         sample->data_dropped(true);
     718             :       }
     719             :     } else {
     720           0 :       if (!transport_shutdown || sample->owned_by_transport()) {
     721           0 :         sample->data_delivered();
     722             :       }
     723             :     }
     724             : 
     725             :   } else {
     726           0 :     for (size_t i = 0; i < samples.size(); ++i) {
     727           0 :       if (samples[i].second == MODE_TERMINATED) {
     728           0 :         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
     729           0 :           samples[i].first->data_dropped(true);
     730             :         }
     731             :       } else {
     732           0 :         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
     733           0 :           samples[i].first->data_delivered();
     734             :         }
     735             :       }
     736             :     }
     737             :   }
     738           0 :   return true;
     739           0 : }
     740             : 
     741             : /// Remove all samples in the backpressure queue and packet queue.
     742             : void
     743           0 : TransportSendStrategy::terminate_send(bool graceful_disconnecting)
     744             : {
     745             :   DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
     746             : 
     747           0 :   bool reset_flag = true;
     748             : 
     749             :   {
     750           0 :     GuardType guard(lock_);
     751             : 
     752             :     // If the terminate_send call due to a non-graceful disconnection before
     753             :     // a datalink shutdown then we will not try to send the graceful disconnect
     754             :     // message.
     755           0 :     if ((mode_ == MODE_TERMINATED || mode_ == MODE_SUSPEND)
     756           0 :         && !graceful_disconnecting_) {
     757           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     758             :             "It was already terminated non gracefully, will not set to graceful disconnecting\n"));
     759           0 :       reset_flag = false;
     760             :     }
     761           0 :   }
     762             : 
     763           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:  Now flip to MODE_TERMINATED\n"));
     764             : 
     765           0 :   clear(MODE_TERMINATED);
     766             : 
     767           0 :   if (reset_flag) {
     768           0 :     GuardType guard(lock_);
     769           0 :     graceful_disconnecting_ = graceful_disconnecting;
     770           0 :   }
     771           0 : }
     772             : 
     773             : void
     774           0 : TransportSendStrategy::terminate_send_if_suspended()
     775             : {
     776           0 : }
     777             : 
     778             : void
     779           0 : TransportSendStrategy::clear(SendMode new_mode, SendMode old_mode)
     780             : {
     781             :   DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
     782             : 
     783           0 :   send_delayed_notifications();
     784           0 :   QueueType elems;
     785           0 :   QueueType queue;
     786             :   {
     787           0 :     GuardType guard(lock_);
     788             : 
     789           0 :     if (old_mode != MODE_NOT_SET && mode_ != old_mode)
     790           0 :       return;
     791             : 
     792           0 :     if (header_.length_ > 0 && pkt_chain_) {
     793             :       // Clear the messages in the pkt_chain_ that is partially sent.
     794             :       // We just reuse these functions for normal partial send except actual sending.
     795           0 :       int num_bytes_left = static_cast<int>(pkt_chain_->total_length());
     796           0 :       int result = adjust_packet_after_send(num_bytes_left);
     797             : 
     798           0 :       if (result == 0) {
     799           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     800             :               "The adjustment logic says that the packet is cleared.\n"));
     801             : 
     802             :       } else {
     803           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     804             :               "The adjustment returned partial sent.\n"));
     805             :       }
     806             :     }
     807             : 
     808           0 :     elems.swap(elems_);
     809           0 :     queue.swap(queue_);
     810             : 
     811           0 :     header_.length_ = 0;
     812           0 :     pkt_chain_ = 0;
     813           0 :     header_complete_ = false;
     814           0 :     start_counter_ = 0;
     815           0 :     mode_ = new_mode;
     816           0 :     mode_before_suspend_ = MODE_NOT_SET;
     817           0 :   }
     818             : 
     819             :   // We need remove the queued elements outside the lock,
     820             :   // otherwise we have a deadlock situation when remove visitor
     821             :   // calls the data_dropped on each dropped elements.
     822             : 
     823             :   // Clear all samples in queue.
     824           0 :   RemoveAllVisitor remove_all_visitor;
     825             : 
     826           0 :   elems.accept_remove_visitor(remove_all_visitor);
     827           0 :   queue.accept_remove_visitor(remove_all_visitor);
     828           0 : }
     829             : 
     830             : int
     831           0 : TransportSendStrategy::start()
     832             : {
     833             :   DBG_ENTRY_LVL("TransportSendStrategy","start",6);
     834             : 
     835             :   {
     836           0 :     GuardType guard(lock_);
     837             : 
     838           0 :     if (!start_i()) {
     839           0 :       return -1;
     840             :     }
     841           0 :   }
     842             : 
     843           0 :   size_t header_chunks(1);
     844             : 
     845             :   // If a secondary send buffer is bound, sent headers should
     846             :   // be cached to properly maintain the buffer:
     847           0 :   if (send_buffer_ != 0) {
     848           0 :     header_chunks += send_buffer_->capacity();
     849             : 
     850             :   } else {
     851           0 :     header_chunks += 1;
     852             :   }
     853             : 
     854           0 :   header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
     855           0 :   header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
     856           0 :   header_db_lock_pool_.reset(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())));
     857           0 :   header_data_allocator_.reset(new DataAllocator(TheServiceParticipant->association_chunk_multiplier(), max_header_size_));
     858             : 
     859             :   // Since we (the TransportSendStrategy object) are a reference-counted
     860             :   // object, but the synch_ object doesn't necessarily know this, we need
     861             :   // to give a "copy" of a reference to ourselves to the synch_ object here.
     862             :   // We will do the reverse when we unregister ourselves (as a worker) from
     863             :   // the synch_ object.
     864             : 
     865           0 :   if (synch_->register_worker(*this) == -1) {
     866             : 
     867           0 :     ACE_ERROR_RETURN((LM_ERROR,
     868             :                       "(%P|%t) ERROR: TransportSendStrategy failed to register "
     869             :                       "as a worker with the ThreadSynch object.\n"),
     870             :                      -1);
     871             :   }
     872             : 
     873           0 :   return 0;
     874             : }
     875             : 
     876             : void
     877           0 : TransportSendStrategy::stop()
     878             : {
     879             :   DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
     880             : 
     881           0 :   if (header_block_ != 0) {
     882           0 :     header_block_->release ();
     883           0 :     header_block_ = 0;
     884             :   }
     885             : 
     886           0 :   synch_->unregister_worker();
     887             : 
     888           0 :   QueueType elems;
     889           0 :   QueueType queue;
     890             :   {
     891           0 :     GuardType guard(lock_);
     892             : 
     893           0 :     if (pkt_chain_ != 0) {
     894           0 :       size_t size = pkt_chain_->total_length();
     895           0 :       if (size > 0) {
     896           0 :         pkt_chain_->release();
     897           0 :         ACE_DEBUG((LM_WARNING,
     898             :                    ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
     899             :                    ACE_TEXT("terminating with %d unsent bytes.\n"),
     900             :                    size));
     901             :       }
     902           0 :       pkt_chain_ = 0;
     903             :     }
     904             : 
     905           0 :     if (elems_.size()) {
     906           0 :       elems_.swap(elems);
     907           0 :       ACE_DEBUG((LM_WARNING,
     908             :                  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
     909             :                  ACE_TEXT("terminating with %d unsent elements.\n"),
     910             :                  elems_.size()));
     911             :     }
     912             : 
     913           0 :     if (queue_.size()) {
     914           0 :       queue_.swap(queue);
     915           0 :       ACE_DEBUG((LM_WARNING,
     916             :                  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
     917             :                  ACE_TEXT("terminating with %d queued elements.\n"),
     918             :                  queue_.size()));
     919             :     }
     920           0 :   }
     921             : 
     922           0 :   RemoveAllVisitor remove_all_visitor;
     923             : 
     924           0 :   elems.accept_remove_visitor(remove_all_visitor);
     925           0 :   queue.accept_remove_visitor(remove_all_visitor);
     926             : 
     927             :   {
     928           0 :     GuardType guard(lock_);
     929             : 
     930           0 :     stop_i();
     931           0 :   }
     932           0 : }
     933             : 
     934             : void
     935           0 : TransportSendStrategy::send(TransportQueueElement* element, bool relink)
     936             : {
     937           0 :   if (Transport_debug_level > 9) {
     938           0 :     ACE_DEBUG((LM_DEBUG,
     939             :                ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
     940             :                ACE_TEXT("sending data at 0x%x.\n"),
     941             :                id(), element));
     942             :   }
     943             : 
     944             :   DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
     945             : 
     946             :   {
     947           0 :     GuardType guard(lock_);
     948             : 
     949           0 :     if (link_released_) {
     950           0 :       add_delayed_notification(element);
     951             : 
     952             :     } else {
     953           0 :       if (mode_ == MODE_TERMINATED && !graceful_disconnecting_) {
     954           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     955             :               "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
     956             :               "graceful disconnecting, so discard message.\n"));
     957           0 :         guard.release();
     958           0 :         element->data_dropped(true);
     959           0 :         return;
     960             :       }
     961             : 
     962           0 :       size_t element_length = element->msg()->total_length();
     963             : 
     964           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     965             :             "Send element msg() has total_length() == [%d].\n",
     966             :             element_length));
     967             : 
     968           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     969             :             "max_header_size_ == [%d].\n",
     970             :             max_header_size_));
     971             : 
     972           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
     973             :             "max_size_ == [%d].\n",
     974             :             max_size_));
     975             : 
     976           0 :       const size_t max_msg_size = max_message_size();
     977             : 
     978             :       // Really an assert.  We can't accept any element that wouldn't fit into
     979             :       // a transport packet by itself (ie, it would be the only element in the
     980             :       // packet).  This max_size_ is the user-configurable maximum, not based
     981             :       // on the transport's inherent maximum message size.  If max_msg_size
     982             :       // is non-zero, we will fragment so max_size_ doesn't apply per-element.
     983           0 :       if (max_msg_size == 0 &&
     984           0 :           max_header_size_ + element_length > max_size_) {
     985           0 :         ACE_ERROR((LM_ERROR,
     986             :                    "(%P|%t) ERROR: Element too large (%Q) "
     987             :                    "- won't fit into packet.\n", ACE_UINT64(element_length)));
     988           0 :         return;
     989             :       }
     990             : 
     991             :       // Check the mode_ to see if we simply put the element on the queue.
     992           0 :       if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
     993           0 :         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
     994             :                   "mode_ == %C, so queue elem and leave.\n",
     995             :                   mode_as_str(mode_)), 5);
     996             : 
     997           0 :         queue_.put(element);
     998             : 
     999           0 :         if (mode_ != MODE_SUSPEND) {
    1000           0 :           synch_->work_available();
    1001             :         }
    1002             : 
    1003           0 :         return;
    1004             :       }
    1005             : 
    1006           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1007             :             "mode_ == MODE_DIRECT.\n"));
    1008             : 
    1009             :       // We are in the MODE_DIRECT send mode.  When in this mode, the send()
    1010             :       // calls will "build up" the transport packet to be sent directly when it
    1011             :       // reaches the optimal size, contains the maximum number of samples, etc.
    1012             : 
    1013             :       // We need to check if the current element (the arg passed-in to this
    1014             :       // send() method) should be appended to the transport packet, or if the
    1015             :       // transport packet should be sent (directly) first, dealing with the
    1016             :       // current element afterwards.
    1017             : 
    1018             :       // We will decide to send the packet as it is now, under two circumstances:
    1019             :       //
    1020             :       //    Either:
    1021             :       //
    1022             :       //    (1) The current element won't fit into the current packet since it
    1023             :       //        would violate the max_packet_size_.
    1024             :       //
    1025             :       //    -OR-
    1026             :       //
    1027             :       //    (2) There is at least one element already in the current packet,
    1028             :       //        and the current element says that it must be sent in an
    1029             :       //        exclusive packet (ie, in a packet all by itself).
    1030             :       //
    1031           0 :       const bool exclusive = element->requires_exclusive_packet();
    1032             : 
    1033           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1034             :             "The element %C require an exclusive packet.\n",
    1035             :             (exclusive ? "DOES" : "does NOT")
    1036             :           ));
    1037             : 
    1038             :       const size_t space_needed =
    1039             :         (max_msg_size > 0)
    1040           0 :         ? /* fragmenting */ DataSampleHeader::get_max_serialized_size() + MIN_FRAG
    1041           0 :         : /* not fragmenting */ element_length;
    1042             : 
    1043           0 :       if ((exclusive && (elems_.size() != 0))
    1044           0 :           || (current_space_available() < space_needed)) {
    1045             : 
    1046           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1047             :               "Element won't fit in current packet or requires exclusive"
    1048             :               " - send current packet (directly) now.\n"));
    1049             : 
    1050           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1051             :               "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
    1052             :               , max_header_size_, header_.length_, element_length));
    1053             : 
    1054           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1055             :               "Tot possible length: %d, max_len: %d\n"
    1056             :               , max_header_size_ + header_.length_ + element_length
    1057             :               , max_size_));
    1058           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1059             :               "current elem size: %d\n"
    1060             :               , elems_.size()));
    1061             : 
    1062             :         // Send the current packet, and deal with the current element
    1063             :         // afterwards.
    1064             :         // The invocation's relink status should dictate the direct_send's
    1065             :         // do_relink. We don't want a (relink == false) invocation to end up
    1066             :         // doing a relink. Think of (relink == false) as a non-blocking call.
    1067           0 :         direct_send(relink);
    1068             : 
    1069             :         // Now check to see if we flipped into MODE_QUEUE, which would mean
    1070             :         // that the direct_send() experienced backpressure, and the
    1071             :         // packet was only partially sent.  If this has happened, we deal with
    1072             :         // the current element by placing it on the queue (and then we are done).
    1073             :         //
    1074             :         // Otherwise, if the mode_ is still MODE_DIRECT, we can just
    1075             :         // "drop" through to the next step in the logic where we append the
    1076             :         // current element to the current packet.
    1077           0 :         if (mode_ == MODE_QUEUE) {
    1078           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1079             :                     "We experienced backpressure on that direct send, as "
    1080             :                     "the mode_ is now MODE_QUEUE or MODE_SUSPEND.  "
    1081             :                     "Queue elem and leave.\n"), 5);
    1082           0 :           queue_.put(element);
    1083           0 :           synch_->work_available();
    1084             : 
    1085           0 :           return;
    1086             :         }
    1087             :       }
    1088             : 
    1089             :       // Loop for sending 'element', in fragments if needed
    1090           0 :       bool first_pkt = true; // enter the loop 1st time through unconditionally
    1091           0 :       for (TransportQueueElement* next_fragment = 0;
    1092           0 :            (first_pkt || next_fragment)
    1093           0 :            && (mode_ == MODE_DIRECT || mode_ == MODE_TERMINATED);) {
    1094             :            // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
    1095             : 
    1096           0 :         if (next_fragment) {
    1097           0 :           element = next_fragment;
    1098           0 :           element_length = next_fragment->msg()->total_length();
    1099           0 :           header_.first_fragment_ = false;
    1100             :         }
    1101             : 
    1102           0 :         header_.last_fragment_ = false;
    1103           0 :         if (max_msg_size) { // fragmentation enabled
    1104           0 :           const size_t avail = current_space_available();
    1105           0 :           if (element_length > avail) {
    1106           0 :             VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting %B > %B\n", element_length, avail), 0);
    1107           0 :             const TqePair ep = element->fragment(avail);
    1108           0 :             if (ep == null_tqe_pair) {
    1109           0 :               ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::send: "
    1110             :                 "Element Fragmentation Failed\n"));
    1111           0 :               return;
    1112             :             }
    1113           0 :             element = ep.first;
    1114           0 :             element_length = element->msg()->total_length();
    1115           0 :             next_fragment = ep.second;
    1116           0 :             header_.first_fragment_ = first_pkt;
    1117           0 :           } else if (next_fragment) {
    1118             :             // We are sending the "tail" element of a previous fragment()
    1119             :             // operation, and this element didn't itself require fragmentation
    1120           0 :             header_.last_fragment_ = true;
    1121           0 :             next_fragment = 0;
    1122             :           }
    1123             :         }
    1124           0 :         first_pkt = false;
    1125             : 
    1126           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1127             :               "Start the 'append elem' to current packet logic.\n"));
    1128             : 
    1129           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1130             :               "Put element into current packet elems_.\n"));
    1131             : 
    1132             :         // Now that we know the current element should go into the current
    1133             :         // packet, we can just go ahead and "append" the current element to
    1134             :         // the current packet.
    1135             : 
    1136             :         // Add the current element to the collection of packet elements.
    1137           0 :         elems_.put(element);
    1138             : 
    1139           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1140             :               "Before, the header_.length_ == [%d].\n",
    1141             :               header_.length_));
    1142             : 
    1143             :         // Adjust the header_.length_ to account for the length of the element.
    1144           0 :         header_.length_ += static_cast<ACE_UINT32>(element_length);
    1145           0 :         const size_t message_length = header_.length_;
    1146             : 
    1147           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1148             :               "After adding element's length, the header_.length_ == [%d].\n",
    1149             :               message_length));
    1150             : 
    1151             :         // The current packet now contains the current element.  We need to
    1152             :         // check to see if the conditions are such that we should go ahead and
    1153             :         // attempt to send the packet "directly" now, or if we can just leave
    1154             :         // and send the current packet later (in another send() call or in a
    1155             :         // send_stop() call).
    1156             : 
    1157             :         // There a few conditions that will cause us to attempt to send the
    1158             :         // packet (directly) right now:
    1159             :         // - Fragmentation was needed
    1160             :         // - The current packet has the maximum number of samples per packet.
    1161             :         // - The current packet's total length exceeds the optimum packet size.
    1162             :         // - The current element (currently part of the packet elems_)
    1163             :         //   requires an exclusive packet.
    1164             :         //
    1165           0 :         if (next_fragment || (elems_.size() >= max_samples_)
    1166           0 :             || (max_header_size_ + message_length > optimum_size_)
    1167           0 :             || exclusive) {
    1168           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1169             :                 "Now the current packet looks full - send it (directly).\n"));
    1170             : 
    1171           0 :           direct_send(relink);
    1172             : 
    1173           0 :           if (next_fragment && mode_ != MODE_DIRECT) {
    1174           0 :             if (mode_ == MODE_QUEUE) {
    1175           0 :               queue_.put(next_fragment);
    1176           0 :               synch_->work_available();
    1177             : 
    1178             :             } else {
    1179           0 :               next_fragment->data_dropped(true /* dropped by transport */);
    1180             :             }
    1181           0 :           } else if (mode_ == MODE_QUEUE) {
    1182             :             // Background thread handles packets in progress
    1183           0 :             synch_->work_available();
    1184             :           }
    1185             : 
    1186           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1187             :                 "Back from the direct_send() attempt.\n"));
    1188             : 
    1189           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1190             :                 "And we %C as a result of the direct_send() call.\n",
    1191             :                 ((mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
    1192             :                                              : "stayed in MODE_DIRECT")));
    1193             : 
    1194             :         } else {
    1195           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1196             :                 "Packet not sent. Send conditions weren't satisfied.\n"));
    1197           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1198             :                 "elems_.size(): %d, max_samples_: %d\n",
    1199             :                 int(elems_.size()), int(max_samples_)));
    1200           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1201             :                 "header_size_: %d, optimum_size_: %d\n",
    1202             :                 int(max_header_size_ + message_length),
    1203             :                 int(optimum_size_)));
    1204           0 :           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1205             :                 "element_requires_exclusive_packet: %d\n", int(exclusive)));
    1206             : 
    1207           0 :           if (mode_ == MODE_QUEUE) {
    1208           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1209             :                   "We flipped into MODE_QUEUE.\n"));
    1210             : 
    1211             :           } else {
    1212           0 :             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1213             :                   "We stayed in MODE_DIRECT.\n"));
    1214             :           }
    1215             :         }
    1216             :       }
    1217             :     }
    1218           0 :   }
    1219             : 
    1220           0 :   send_delayed_notifications();
    1221             : }
    1222             : 
    1223             : void
    1224           0 : TransportSendStrategy::send_stop(GUID_t /*repoId*/)
    1225             : {
    1226             :   DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
    1227             :   {
    1228           0 :     GuardType guard(lock_);
    1229             : 
    1230           0 :     if (link_released_)
    1231           0 :       return;
    1232             : 
    1233           0 :     if (start_counter_ == 0) {
    1234             :       // This is an indication of a logic error.  This is more of an assert.
    1235           0 :       VDBG_LVL((LM_ERROR,
    1236             :                 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
    1237           0 :       return;
    1238             :     }
    1239             : 
    1240           0 :     --start_counter_;
    1241             : 
    1242           0 :     if (start_counter_ != 0) {
    1243             :       // This wasn't the last send_stop() that we are expecting.  We only
    1244             :       // really honor the first send_start() and the last send_stop().
    1245             :       // We can return without doing anything else in this case.
    1246           0 :       return;
    1247             :     }
    1248             : 
    1249           0 :     if (mode_ == MODE_TERMINATED && !graceful_disconnecting_) {
    1250           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1251             :             "TransportSendStrategy::send_stop: dont try to send current packet "
    1252             :             "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
    1253           0 :       return;
    1254             :     }
    1255             : 
    1256           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1257             :           "This is an 'important' send_stop() event since our "
    1258             :           "start_counter_ is 0.\n"));
    1259             : 
    1260             :     // We just caused the start_counter_ to become zero.  This
    1261             :     // means that we aren't expecting another send() or send_stop() at any
    1262             :     // time in the near future (ie, it isn't imminent).
    1263             : 
    1264             :     // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
    1265             :     // anything to do here because samples have already been going to the
    1266             :     // queue.
    1267             : 
    1268             :     // We only need to do something if the mode_ is
    1269             :     // MODE_DIRECT.  It means that we may have some sample(s) in the
    1270             :     // current packet that have never been sent.  This is our
    1271             :     // opportunity to send the current packet directly if this is the case.
    1272           0 :     if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
    1273           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1274             :             "But since we are in %C, we don't have to do "
    1275             :             "anything more in this important send_stop().\n",
    1276             :             mode_as_str(mode_)));
    1277             :       // We don't do anything if we are in MODE_QUEUE.  Just leave.
    1278           0 :       return;
    1279             :     }
    1280             : 
    1281           0 :     size_t header_length = header_.length_;
    1282           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1283             :           "We are in MODE_DIRECT in an important send_stop() - "
    1284             :           "header_.length_ == [%d].\n", header_length));
    1285             : 
    1286             :     // Only attempt to send the current packet (directly) if the current
    1287             :     // packet actually contains something (it could be empty).
    1288           0 :     if ((header_length > 0) &&
    1289             :         //(elems_.size ()+not_yet_pac_q_->size() > 0))
    1290           0 :         (elems_.size() > 0)) {
    1291           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1292             :             "There is something in the current packet - attempt to send "
    1293             :             "it (directly) now.\n"));
    1294             :       // If a relink needs to be done for this packet to be sent, do it.
    1295           0 :       direct_send(true);
    1296           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1297             :             "Back from the attempt to send leftover packet directly.\n"));
    1298             : 
    1299           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1300             :             "But we %C as a result.\n",
    1301             :             ((mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
    1302             :                                           "stayed in MODE_DIRECT" )));
    1303           0 :       if (mode_ == MODE_QUEUE  && mode_ != MODE_SUSPEND) {
    1304           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1305             :               "Notify Synch thread of work availability\n"));
    1306           0 :         synch_->work_available();
    1307             :       }
    1308             :     }
    1309           0 :   }
    1310             : 
    1311           0 :   send_delayed_notifications();
    1312             : }
    1313             : 
    1314             : void
    1315           0 : TransportSendStrategy::remove_all_msgs(const GUID_t& pub_id)
    1316             : {
    1317             :   DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
    1318             : 
    1319           0 :   const TransportQueueElement::MatchOnPubId match(pub_id);
    1320           0 :   send_delayed_notifications(&match);
    1321             : 
    1322           0 :   GuardType guard(lock_);
    1323             : 
    1324           0 :   if (send_buffer_ != 0) {
    1325             :     // If a secondary send buffer is bound, removed samples must
    1326             :     // be retained in order to properly maintain the buffer:
    1327           0 :     send_buffer_->retain_all(pub_id);
    1328             :   }
    1329             : 
    1330           0 :   do_remove_sample(pub_id, match, true);
    1331           0 : }
    1332             : 
    1333             : RemoveResult
    1334           0 : TransportSendStrategy::remove_sample(const DataSampleElement* sample)
    1335             : {
    1336             :   DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
    1337             : 
    1338           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t)  Removing sample: %@\n", sample->get_sample()), 5);
    1339             : 
    1340             :   // The sample to remove is either in temporary delayed notification list or
    1341             :   // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
    1342             :   // notification list by transport thread, it needs acquire WriterDataContainer lock for
    1343             :   // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
    1344             :   // complete as this call already hold the WriterContainer's lock. So this call is safe to
    1345             :   // access the sample to remove. If it's going to be removed by this remove_sample() calling
    1346             :   // thread, it will be removed either from delayed notification list or from internal list
    1347             :   // in which case the element carry the info if the sample is released so the datalinkset
    1348             :   // can stop calling rest datalinks to remove this sample if it's already released..
    1349             : 
    1350           0 :   const char* const payload = sample->get_sample()->cont()->rd_ptr();
    1351           0 :   GUID_t pub_id = sample->get_pub_id();
    1352           0 :   const TransportQueueElement::MatchOnDataPayload modp(payload);
    1353           0 :   if (send_delayed_notifications(&modp)) {
    1354           0 :     return REMOVE_RELEASED;
    1355             :   }
    1356             : 
    1357           0 :   GuardType guard(lock_);
    1358           0 :   return do_remove_sample(pub_id, modp);
    1359           0 : }
    1360             : 
    1361             : RemoveResult
    1362           0 : TransportSendStrategy::do_remove_sample(const GUID_t&,
    1363             :   const TransportQueueElement::MatchCriteria& criteria, bool remove_all)
    1364             : {
    1365             :   DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
    1366             : 
    1367             :   //ciju: Tim had the idea that we could do the following check
    1368             :   // if ((mode_ == MODE_DIRECT) ||
    1369             :   //     ((pkt_chain_ == 0) && (queue_ == empty)))
    1370             :   // then we can assume that the sample can be safely removed (no need for
    1371             :   // replacement) from the elems_ queue.
    1372           0 :   if ((mode_ == MODE_DIRECT)
    1373           0 :       || ((pkt_chain_ == 0) && (queue_.size() == 0))) {
    1374             :     //ciju: I believe this is the only mode where a safe
    1375             :     // assumption can be made that the samples
    1376             :     // in the elems_ queue aren't part of a packet.
    1377           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1378             :           "The mode is MODE_DIRECT, or the queue is empty and no "
    1379             :           "transport packet is in progress.\n"));
    1380             : 
    1381           0 :     QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
    1382           0 :     elems_.accept_remove_visitor(simple_rem_vis);
    1383             : 
    1384           0 :     const RemoveResult status = simple_rem_vis.status();
    1385             : 
    1386           0 :     if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
    1387           0 :       header_.length_ -= simple_rem_vis.removed_bytes();
    1388             : 
    1389           0 :     } else if (status == REMOVE_NOT_FOUND) {
    1390           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1391             :             "Failed to find the sample to remove.\n"));
    1392             :     }
    1393             : 
    1394           0 :     if (criteria.unique() || !remove_all) return status;
    1395           0 :   }
    1396             : 
    1397           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1398             :         "Visit the queue_ with the RemoveElementVisitor.\n"));
    1399             : 
    1400           0 :   QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
    1401           0 :   queue_.accept_remove_visitor(simple_rem_vis);
    1402             : 
    1403           0 :   RemoveResult status = simple_rem_vis.status();
    1404             : 
    1405           0 :   if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
    1406           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1407             :           "The sample was removed from the queue_.\n"));
    1408             :     // This means that the visitor did not encounter any fatal error
    1409             :     // along the way, *AND* the sample was found in the queue_,
    1410             :     // and has now been removed.  We are done.
    1411           0 :     if (criteria.unique() || !remove_all) return status;
    1412             :   }
    1413             : 
    1414           0 :   if (status == REMOVE_ERROR) {
    1415           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1416             :           "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
    1417             :     // This means that the visitor encountered some fatal error along
    1418             :     // the way (and it already reported something to the log).
    1419             :     // Return our failure code.
    1420           0 :     return status;
    1421             :   }
    1422             : 
    1423           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1424             :         "The RemoveElementVisitor did not find the sample in queue_.\n"));
    1425             : 
    1426             :   // We get here if the visitor did not encounter any fatal error, but it
    1427             :   // also didn't find the sample - and hence it didn't perform any
    1428             :   // "remove sample" logic.
    1429             : 
    1430             :   // Now we need to turn our attention to the current transport packet,
    1431             :   // since the packet is likely in a "partially sent" state, and the
    1432             :   // sample may still be contributing unsent bytes in the pkt_chain_.
    1433             : 
    1434           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1435             :         "Visit our elems_ with the PacketRemoveVisitor.\n"));
    1436             : 
    1437             :   PacketRemoveVisitor pac_rem_vis(criteria,
    1438           0 :                                   pkt_chain_,
    1439             :                                   header_block_,
    1440           0 :                                   replaced_element_mb_allocator_,
    1441           0 :                                   replaced_element_db_allocator_,
    1442           0 :                                   remove_all);
    1443             : 
    1444           0 :   elems_.accept_replace_visitor(pac_rem_vis);
    1445             : 
    1446           0 :   status = pac_rem_vis.status();
    1447             : 
    1448           0 :   if (status == REMOVE_ERROR) {
    1449           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1450             :           "The PacketRemoveVisitor encountered a fatal error.\n"));
    1451             : 
    1452           0 :   } else if (status == REMOVE_NOT_FOUND) {
    1453           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1454             :           "The PacketRemoveVisitor didn't find the sample.\n"));
    1455             : 
    1456             :   } else {
    1457           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1458             :           "The PacketRemoveVisitor found the sample and removed it.\n"));
    1459             :   }
    1460             : 
    1461           0 :   return status;
    1462           0 : }
    1463             : 
    1464             : void
    1465           0 : TransportSendStrategy::direct_send(bool do_relink)
    1466             : {
    1467             :   DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
    1468             : 
    1469           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1470             :         "Prepare the current packet for a direct send attempt.\n"));
    1471             : 
    1472             :   // Prepare the packet for sending.
    1473           0 :   prepare_packet();
    1474             : 
    1475           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1476             :         "Now attempt to send the packet.\n"));
    1477             : 
    1478             :   // We will try resend the packet if the send() fails and then connection
    1479             :   // is re-established.  Only loops if the "continue" line is hit.
    1480             :   while (true) {
    1481             :     // Attempt to send the packet
    1482           0 :     const SendPacketOutcome outcome = send_packet();
    1483             : 
    1484           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1485             :           "The outcome of the send_packet() was %d.\n", outcome));
    1486             : 
    1487           0 :     if ((outcome == OUTCOME_BACKPRESSURE) ||
    1488             :         (outcome == OUTCOME_PARTIAL_SEND)) {
    1489           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1490             :                 "The outcome of the send_packet() was either "
    1491             :                 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
    1492             : 
    1493           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1494             :                 "Flip into the MODE_QUEUE mode_.\n"), 5);
    1495             : 
    1496             :       // We encountered backpressure, or only sent part of the packet.
    1497           0 :       mode_ = MODE_QUEUE;
    1498             : 
    1499           0 :     } else if ((outcome == OUTCOME_PEER_LOST) ||
    1500             :                (outcome == OUTCOME_SEND_ERROR)) {
    1501           0 :       if (outcome == OUTCOME_SEND_ERROR) {
    1502           0 :         VDBG_LVL((LM_WARNING,
    1503             :                   "(%P|%t) WARNING: Problem detected in "
    1504             :                   "send buffer management: %p.\n",
    1505             :                   "send_bytes"), 1);
    1506             : 
    1507           0 :         if (Transport_debug_level > 0) {
    1508           0 :           TransportImpl_rch transport = transport_.lock();
    1509           0 :           if (transport) {
    1510           0 :             transport->dump();
    1511             :           }
    1512           0 :         }
    1513             :       } else {
    1514           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1515             :               "The outcome of the send_packet() was "
    1516             :               "OUTCOME_PEER_LOST.\n"));
    1517             :       }
    1518             : 
    1519           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1520             :                 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
    1521             : 
    1522           0 :       if (mode_ != MODE_SUSPEND) {
    1523           0 :         mode_before_suspend_ = mode_;
    1524           0 :         mode_ = MODE_SUSPEND;
    1525             :       }
    1526             : 
    1527           0 :       if (do_relink) {
    1528           0 :         bool do_suspend = false;
    1529           0 :         relink(do_suspend);
    1530             : 
    1531           0 :         if (mode_ == MODE_SUSPEND) {
    1532           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1533             :                     "The reconnect has not done yet and we are "
    1534             :                     "still in MODE_SUSPEND.\n"), 5);
    1535             : 
    1536           0 :         } else if (mode_ == MODE_TERMINATED) {
    1537           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1538             :                     "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
    1539             : 
    1540             :         } else {
    1541           0 :           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1542             :                     "Try send the packet again since the connection "
    1543             :                     "is re-established.\n"), 5);
    1544             : 
    1545             :           // Try send the packet again since the connection is re-established.
    1546           0 :           continue;
    1547           0 :         }
    1548             :       }
    1549             : 
    1550           0 :     } else {
    1551           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1552             :             "The outcome of the send_packet() must have been "
    1553             :             "OUTCOME_COMPLETE_SEND.\n"));
    1554           0 :       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1555             :             "So, we will just stay in MODE_DIRECT.\n"));
    1556             :     }
    1557             : 
    1558           0 :     break;
    1559           0 :   }
    1560             : 
    1561             :   // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
    1562           0 : }
    1563             : 
    1564             : void
    1565           0 : TransportSendStrategy::get_packet_elems_from_queue()
    1566             : {
    1567             :   DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
    1568             : 
    1569           0 :   for (TransportQueueElement* element = queue_.peek(); element != 0;
    1570           0 :        element = queue_.peek()) {
    1571             : 
    1572             :     // Total number of bytes in the current element's message block chain.
    1573           0 :     size_t element_length = element->msg()->total_length();
    1574             : 
    1575             :     // Flag used to determine if the element requires a packet all to itself.
    1576           0 :     const bool exclusive_packet = element->requires_exclusive_packet();
    1577             : 
    1578           0 :     const size_t avail = current_space_available();
    1579             : 
    1580           0 :     bool frag = false;
    1581           0 :     if (element_length > avail) {
    1582             :       // The current element won't fit into the current packet
    1583           0 :       if (max_message_size()) { // fragmentation enabled
    1584           0 :         header_.first_fragment_ = !element->is_fragment();
    1585           0 :         VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting from queue\n"), 0);
    1586           0 :         const TqePair ep = element->fragment(avail);
    1587           0 :         if (ep == null_tqe_pair) {
    1588           0 :           ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::get_packet_elems_from_queue: "
    1589             :             "Element Fragmentation Failed\n"));
    1590           0 :           return;
    1591             :         }
    1592           0 :         element = ep.first;
    1593           0 :         element_length = element->msg()->total_length();
    1594           0 :         queue_.replace_head(ep.second);
    1595           0 :         frag = true; // queue_ is already taken care of, don't get() later
    1596             :       } else {
    1597           0 :         break;
    1598             :       }
    1599             :     }
    1600             : 
    1601             :     // If exclusive and the current packet is empty, we won't violate the
    1602             :     // exclusive_packet requirement by put()'ing the element
    1603             :     // into the elems_ collection.
    1604           0 :     if ((exclusive_packet && elems_.size() == 0)
    1605           0 :         || !exclusive_packet) {
    1606             :       // At this point, we have passed all of the pre-conditions and we can
    1607             :       // now extract the current element from the queue_, put it into the
    1608             :       // packet elems_, and adjust the packet header_.length_.
    1609           0 :       elems_.put(frag ? element : queue_.get());
    1610           0 :       if (header_.length_ == 0) {
    1611           0 :         header_.last_fragment_ = !frag && element->is_fragment();
    1612             :       }
    1613           0 :       header_.length_ += static_cast<ACE_UINT32>(element_length);
    1614           0 :       VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Packetizing from queue\n"), 0);
    1615             :     }
    1616             : 
    1617             :     // With exclusive and (elems_.size() != 0), we don't use the current
    1618             :     // element as part of the packet.  We know that there is already
    1619             :     // at least one element in the packet, and the current element
    1620             :     // is going to need its own (exclusive) packet.  We will just
    1621             :     // use the packet elems_ as it is now.  Always break once
    1622             :     // we've encountered and dealt with the exclusive_packet case.
    1623             :     // Also break if fragmentation was required.
    1624           0 :     if (exclusive_packet || frag
    1625             :         // If the current number of packet elems_ has reached the maximum
    1626             :         // number of samples per packet, then we are done.
    1627           0 :         || elems_.size() == max_samples_
    1628             :         // If the current value of the header_.length_ exceeds (or equals)
    1629             :         // the optimum_size_ for a packet, then we are done.
    1630           0 :         || header_.length_ >= optimum_size_) {
    1631           0 :       break;
    1632             :     }
    1633             :   }
    1634             : }
    1635             : 
    1636             : void
    1637           0 : TransportSendStrategy::prepare_header()
    1638             : {
    1639             :   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
    1640             : 
    1641             :   // Increment header sequence for packet:
    1642           0 :   header_.sequence_ = ++header_sequence_;
    1643             : 
    1644             :   // Allow the specific implementation the opportunity to set
    1645             :   // values in the packet header.
    1646           0 :   prepare_header_i();
    1647           0 : }
    1648             : 
    1649             : void
    1650           0 : TransportSendStrategy::prepare_header_i()
    1651             : {
    1652             :   DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
    1653             : 
    1654             :   // Default implementation does nothing.
    1655           0 : }
    1656             : 
    1657             : void
    1658           0 : TransportSendStrategy::prepare_packet()
    1659             : {
    1660             :   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
    1661             : 
    1662             :   // Prepare the header for sending.
    1663           0 :   prepare_header();
    1664             : 
    1665           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1666             :         "Marshal the packet header.\n"));
    1667             : 
    1668           0 :   if (header_block_ != 0) {
    1669           0 :     header_block_->release();
    1670             :   }
    1671             : 
    1672           0 :   ACE_NEW_MALLOC(header_block_,
    1673             :     static_cast<ACE_Message_Block*>(header_mb_allocator_->malloc()),
    1674             :     ACE_Message_Block(max_header_size_,
    1675             :                       ACE_Message_Block::MB_DATA,
    1676             :                       0, // cont
    1677             :                       0, // data
    1678             :                       header_data_allocator_.get(),
    1679             :                       header_db_lock_pool_->get_lock(),
    1680             :                       ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
    1681             :                       ACE_Time_Value::zero,
    1682             :                       ACE_Time_Value::max_time,
    1683             :                       header_db_allocator_.get(),
    1684             :                       header_mb_allocator_.get()));
    1685             : 
    1686           0 :   marshal_transport_header(header_block_);
    1687             : 
    1688           0 :   pkt_chain_ = header_block_->duplicate();
    1689             : 
    1690           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1691             :         "Use a BuildChainVisitor to visit the packet elems_.\n"));
    1692             : 
    1693             :   // Build up a chain of blocks by duplicating the message block chain
    1694             :   // held by each element (in elems_), and then chaining the new duplicate
    1695             :   // blocks together to form one long chain.
    1696           0 :   BuildChainVisitor visitor;
    1697           0 :   elems_.accept_visitor(visitor);
    1698             : 
    1699           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1700             :         "Attach the visitor's chain of blocks to the lone (packet "
    1701             :         "header) block currently in the pkt_chain_.\n"));
    1702             : 
    1703             :   // Attach the visitor's chain of blocks to the packet header block.
    1704           0 :   pkt_chain_->cont(visitor.chain());
    1705             : 
    1706           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1707             :         "Increment header sequence for next packet.\n"));
    1708             : 
    1709             :   // Allow the specific implementation the opportunity to process the
    1710             :   // newly prepared packet.
    1711           0 :   prepare_packet_i();
    1712             : 
    1713           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1714             :         "Set the header_complete_ flag to false.\n"));
    1715             : 
    1716             :   // Set the header_complete_ to false to indicate
    1717             :   // that the first block in the pkt_chain_ is the packet header block
    1718             :   // (actually a duplicate() of the packet header_block_).
    1719           0 :   header_complete_ = false;
    1720           0 : }
    1721             : 
    1722             : bool
    1723           0 : TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
    1724             : {
    1725           0 :   return *mb << header_;
    1726             : }
    1727             : 
    1728             : void
    1729           0 : TransportSendStrategy::prepare_packet_i()
    1730             : {
    1731             :   DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
    1732             : 
    1733             :   // Default implementation does nothing.
    1734           0 : }
    1735             : 
    1736             : void
    1737           0 : TransportSendStrategy::set_graceful_disconnecting(bool flag)
    1738             : {
    1739           0 :   graceful_disconnecting_ = flag;
    1740           0 : }
    1741             : 
    1742             : ssize_t
    1743           0 : TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
    1744             : {
    1745           0 :   if (Transport_debug_level > 9) {
    1746           0 :     ACE_DEBUG((LM_DEBUG,
    1747             :                ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
    1748             :                ACE_TEXT("sending data at 0x%x.\n"),
    1749             :                id(), packet));
    1750             :   }
    1751             :   DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
    1752             : 
    1753             : #ifdef OPENDDS_SECURITY
    1754           0 :   Message_Block_Ptr substitute;
    1755           0 :   if (security_config()) {
    1756           0 :     const DDS::Security::CryptoTransform_var crypto = security_config()->get_crypto_transform();
    1757             :     // pre_send_packet may provide different data that takes the place of the
    1758             :     // original "packet" (used for security encryption/authentication)
    1759           0 :     if (crypto) {
    1760           0 :       substitute.reset(pre_send_packet(packet));
    1761           0 :       if (!substitute) {
    1762           0 :         VDBG((LM_DEBUG, "(%P|%t) DBG:   pre_send_packet returned NULL, dropping.\n"));
    1763           0 :         return packet->total_length();
    1764             :       }
    1765             :     }
    1766           0 :   }
    1767             : #endif
    1768             : 
    1769           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1770             :             "Populate the iovec array using the packet.\n"), 5);
    1771             : 
    1772             :   iovec iov[MAX_SEND_BLOCKS];
    1773             : 
    1774             : #ifdef OPENDDS_SECURITY
    1775           0 :   const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
    1776             : #else
    1777             :   const int num_blocks = mb_to_iov(*packet, iov);
    1778             : #endif
    1779             : 
    1780           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1781             :             "There are [%d] number of entries in the iovec array.\n",
    1782             :             num_blocks), 5);
    1783             : 
    1784           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1785             :             "Attempt to send_bytes() now.\n"), 5);
    1786             : 
    1787           0 :   const ssize_t num_bytes_sent = send_bytes(iov, num_blocks, bp);
    1788             : 
    1789           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1790             :             "The send_bytes() said that num_bytes_sent == [%d].\n",
    1791             :             num_bytes_sent), 5);
    1792             : 
    1793             : #ifdef OPENDDS_SECURITY
    1794           0 :   if (num_bytes_sent > 0 && substitute && packet->data_block() != substitute->data_block()) {
    1795             :     // Although the "substitute" data took the place of "packet", the rest
    1796             :     // of the framework needs to account for the bytes in "packet" being taken
    1797             :     // care of, as if they were actually sent.
    1798             :     // Since this is done with datagram sockets, partial sends aren't possible.
    1799           0 :     return packet->total_length();
    1800             :   }
    1801             : #endif
    1802             : 
    1803           0 :   return num_bytes_sent;
    1804           0 : }
    1805             : 
    1806             : TransportSendStrategy::SendPacketOutcome
    1807           0 : TransportSendStrategy::send_packet()
    1808             : {
    1809             :   DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
    1810             : 
    1811           0 :   int bp_flag = 0;
    1812             :   const ssize_t num_bytes_sent =
    1813           0 :     do_send_packet(pkt_chain_, bp_flag);
    1814             : 
    1815           0 :   if (num_bytes_sent == 0) {
    1816           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1817             :               "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
    1818             :     // This means that the peer has disconnected.
    1819           0 :     return OUTCOME_PEER_LOST;
    1820             :   }
    1821             : 
    1822           0 :   if (num_bytes_sent < 0) {
    1823           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1824             :               "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
    1825             : 
    1826             :     // Check for backpressure...
    1827           0 :     if (bp_flag == 1) {
    1828           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1829             :                 "Since backpressure flag is true, return "
    1830             :                 "OUTCOME_BACKPRESSURE.\n"), 5);
    1831             :       // Ok.  Not really an error - just backpressure.
    1832           0 :       return OUTCOME_BACKPRESSURE;
    1833             :     }
    1834             : 
    1835           0 :     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1836             :               "Since backpressure flag is false, return "
    1837             :               "OUTCOME_SEND_ERROR.\n"), 5);
    1838             : 
    1839             :     // Not backpressure - it's a real error.
    1840             :     // Note: moved this to send_bytes so the errno msg could be written.
    1841             :     //ACE_ERROR((LM_ERROR,
    1842             :     //           "(%P|%t) ERROR: Call to peer().send() failed with negative "
    1843             :     //           "return code.\n"));
    1844             : 
    1845           0 :     return OUTCOME_SEND_ERROR;
    1846             :   }
    1847             : 
    1848           0 :   if (send_buffer_ != 0) {
    1849             :     // If a secondary send buffer is bound, sent samples must
    1850             :     // be inserted in order to properly maintain the buffer:
    1851           0 :     send_buffer_->insert(header_.sequence_,
    1852             :       &elems_, pkt_chain_);
    1853             :   }
    1854             : 
    1855           0 :   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
    1856             :             "Since num_bytes_sent > 0, adjust the packet to account for "
    1857             :             "the bytes that did get sent.\n"),5);
    1858             : 
    1859             :   // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
    1860             :   // to account for the bytes that have been sent.
    1861             :   const int result =
    1862           0 :     adjust_packet_after_send(num_bytes_sent);
    1863             : 
    1864           0 :   if (result == 0) {
    1865           0 :     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1866             :           "The adjustment logic says that the complete packet was "
    1867             :           "sent.  Return OUTCOME_COMPLETE_SEND.\n"));
    1868           0 :     return OUTCOME_COMPLETE_SEND;
    1869             :   }
    1870             : 
    1871           0 :   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
    1872             :         "The adjustment logic says that only a part of the packet was "
    1873             :         "sent. Return OUTCOME_PARTIAL_SEND.\n"));
    1874             : 
    1875           0 :   return OUTCOME_PARTIAL_SEND;
    1876             : }
    1877             : 
    1878             : ssize_t
    1879           0 : TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
    1880             : {
    1881           0 :   int val = 0;
    1882           0 :   ACE_HANDLE handle = get_handle();
    1883             : 
    1884           0 :   if (handle == ACE_INVALID_HANDLE)
    1885           0 :     return -1;
    1886             : 
    1887           0 :   ACE::record_and_set_non_blocking_mode(handle, val);
    1888             : 
    1889             :   // Set the back-pressure flag to false.
    1890           0 :   bp = 0;
    1891             : 
    1892             :   // Clear errno
    1893           0 :   errno = 0;
    1894             : 
    1895           0 :   ssize_t result = send_bytes_i(iov, n);
    1896             : 
    1897           0 :   if (result == -1) {
    1898           0 :     if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
    1899           0 :       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
    1900             :             "Backpressure encountered.\n"));
    1901             :       // Set the back-pressure flag to true
    1902           0 :       bp = 1;
    1903             : 
    1904           0 :     } else {
    1905           0 :       VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
    1906             :                 ACE_TEXT("sendv"), n),1);
    1907             : 
    1908             :       // try to get the application to core when "Bad Address" is returned
    1909             :       // by looking at the iovec
    1910           0 :       for (int ii = 0; ii < n; ii++) {
    1911           0 :         ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
    1912             :                    ii, iov[ii].iov_len, iov[ii].iov_base));
    1913             :       }
    1914             :     }
    1915             :   }
    1916             : 
    1917           0 :   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
    1918             :             "The sendv() returned [%d].\n", result), 5);
    1919             : 
    1920           0 :   ACE::restore_non_blocking_mode(handle, val);
    1921             : 
    1922           0 :   return result;
    1923             : }
    1924             : 
    1925             : void
    1926           0 : TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
    1927             : {
    1928           0 :   if (Transport_debug_level) {
    1929           0 :     size_t size = delayed_delivered_notification_queue_.size();
    1930           0 :     if ((size > 0) && (size % max_samples_ == 0)) {
    1931           0 :       ACE_DEBUG((LM_DEBUG,
    1932             :                  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
    1933             :                  size));
    1934             :     }
    1935             :   }
    1936             : 
    1937           0 :   delayed_delivered_notification_queue_.push_back(std::make_pair(element, mode_.load()));
    1938           0 : }
    1939             : 
    1940           0 : void TransportSendStrategy::deliver_ack_request(TransportQueueElement* element)
    1941             : {
    1942           0 :   const TransportQueueElement::MatchOnElement moe(element);
    1943             :   {
    1944           0 :     GuardType guard(lock_);
    1945           0 :     do_remove_sample(GUID_UNKNOWN, moe);
    1946           0 :   }
    1947             : 
    1948           0 :   element->data_delivered();
    1949           0 : }
    1950             : 
    1951           0 : size_t TransportSendStrategy::space_available(size_t already_used) const
    1952             : {
    1953           0 :   const size_t used = max_header_size_ + already_used;
    1954           0 :   const size_t max_msg = max_message_size();
    1955           0 :   if (max_msg) {
    1956           0 :     return std::min(static_cast<size_t>(max_size_), max_msg) - used;
    1957             :   }
    1958           0 :   return max_size_ - used;
    1959             : }
    1960             : 
    1961           0 : size_t TransportSendStrategy::current_space_available() const
    1962             : {
    1963           0 :   return space_available(header_.length_);
    1964             : }
    1965             : 
    1966             : int
    1967           0 : TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
    1968             : {
    1969           0 :   int num_blocks = 0;
    1970             : #ifdef _MSC_VER
    1971             : #pragma warning(push)
    1972             : // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
    1973             : // since on other platforms iov_len is 64-bit
    1974             : #pragma warning(disable : 4267)
    1975             : #endif
    1976           0 :   for (const ACE_Message_Block* block = &msg;
    1977           0 :        block && num_blocks < MAX_SEND_BLOCKS;
    1978           0 :        block = block->cont()) {
    1979           0 :     iov[num_blocks].iov_len = block->length();
    1980           0 :     iov[num_blocks++].iov_base = block->rd_ptr();
    1981             :   }
    1982             : #ifdef _MSC_VER
    1983             : #pragma warning(pop)
    1984             : #endif
    1985           0 :   return num_blocks;
    1986             : }
    1987             : 
    1988           0 : bool TransportSendStrategy::fragmentation_helper(
    1989             :   TransportQueueElement* original_element, TqeVector& elements_to_send)
    1990             : {
    1991           0 :   original_element->increment_loan();
    1992           0 :   const size_t space = space_available();
    1993           0 :   for (TransportQueueElement* e = original_element; e;) {
    1994           0 :     const size_t esize = e->msg()->total_length();
    1995           0 :     if (esize > space) {
    1996           0 :       VDBG_LVL((LM_DEBUG, "(%P|%t) TransportSendStrategy::fragmentation_helper: "
    1997             :           "message size %B > space %B: Fragmenting\n", esize, space), 0);
    1998           0 :       const TqePair pair = e->fragment(space);
    1999           0 :       if (pair == null_tqe_pair) {
    2000           0 :         ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::fragmentation_helper: "
    2001             :           "Element Fragmentation Failed\n"));
    2002           0 :         return false;
    2003             :       }
    2004           0 :       elements_to_send.push_back(pair.first);
    2005           0 :       e = pair.second;
    2006             :     } else {
    2007           0 :       elements_to_send.push_back(e);
    2008           0 :       e = 0;
    2009             :     }
    2010             :   }
    2011           0 :   return true;
    2012             : }
    2013             : 
    2014             : } // namespace DCPS
    2015             : } // namespace OpenDDS
    2016             : 
    2017             : OPENDDS_END_VERSIONED_NAMESPACE_DECL

Generated by: LCOV version 1.16