TransportSendStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "DCPS/DdsDcps_pch.h" //Only the _pch include should start with DCPS/
00009 #include "TransportSendStrategy.h"
00010 #include "RemoveAllVisitor.h"
00011 #include "TransportInst.h"
00012 #include "ThreadSynchStrategy.h"
00013 #include "ThreadSynchResource.h"
00014 #include "TransportQueueElement.h"
00015 #include "TransportSendElement.h"
00016 #include "TransportSendBuffer.h"
00017 #include "BuildChainVisitor.h"
00018 #include "QueueRemoveVisitor.h"
00019 #include "PacketRemoveVisitor.h"
00020 #include "TransportDefs.h"
00021 #include "DirectPriorityMapper.h"
00022 #include "dds/DCPS/DataSampleHeader.h"
00023 #include "dds/DCPS/DataSampleElement.h"
00024 #include "dds/DCPS/Service_Participant.h"
00025 #include "EntryExit.h"
00026 
00027 #include "ace/Reverse_Lock_T.h"
00028 
00029 #if !defined (__ACE_INLINE__)
00030 #include "TransportSendStrategy.inl"
00031 #endif /* __ACE_INLINE__ */
00032 
00033 namespace OpenDDS {
00034 namespace DCPS {
00035 
00036 //TBD: The number of chunks of the replace element allocator
00037 //     is hard coded for now. This will be configurable when
00038 //     we implement the dds configurations. This value should
00039 //     be the number of marshalled DataSampleHeader that a
00040 //     packet could contain.
00041 #define NUM_REPLACED_ELEMENT_CHUNKS 20
00042 
00043 namespace {
00044   /// Arbitrary small constant that represents the minimum
00045   /// amount of payload data we'll have in one fragment.
00046   /// In this case "payload data" includes the content-filtering
00047   /// GUID sequence, so this is chosen to be 4 + (16 * N).
00048   static const size_t MIN_FRAG = 68;
00049 }
00050 
00051 // I think 2 chunks for the header message block is enough
00052 // - one for the original copy and one for duplicate which
00053 // occurs every packet and is released after packet is sent.
00054 // The data block only needs 1 chunk since the duplicate()
00055 // just increases the ref count.
00056 TransportSendStrategy::TransportSendStrategy(
00057   std::size_t id,
00058   const TransportInst_rch& transport_inst,
00059   ThreadSynchResource* synch_resource,
00060   Priority priority,
00061   const ThreadSynchStrategy_rch& thread_sync_strategy)
00062   : ThreadSynchWorker(id),
00063     max_samples_(transport_inst->max_samples_per_packet_),
00064     optimum_size_(transport_inst->optimum_packet_size_),
00065     max_size_(transport_inst->max_packet_size_),
00066     queue_(new QueueType(transport_inst->queue_messages_per_pool_,
00067                          transport_inst->queue_initial_pools_)),
00068     max_header_size_(0),
00069     header_block_(0),
00070     elems_(new QueueType(1, transport_inst->max_samples_per_packet_)),
00071     pkt_chain_(0),
00072     header_complete_(false),
00073     start_counter_(0),
00074     mode_(MODE_DIRECT),
00075     mode_before_suspend_(MODE_NOT_SET),
00076     header_mb_allocator_(0),
00077     header_db_allocator_(0),
00078     synch_(0),
00079     lock_(),
00080     replaced_element_allocator_(NUM_REPLACED_ELEMENT_CHUNKS),
00081     replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00082     replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00083     retained_element_allocator_(0),
00084     transport_inst_(transport_inst),
00085     graceful_disconnecting_(false),
00086     link_released_(true),
00087     send_buffer_(0),
00088     transport_shutdown_(false)
00089 {
00090   DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00091 
00092   // Create a ThreadSynch object just for us.
00093   DirectPriorityMapper mapper(priority);
00094   this->synch_ = thread_sync_strategy->create_synch_object(
00095                    synch_resource,
00096 #ifdef ACE_WIN32
00097                    ACE_DEFAULT_THREAD_PRIORITY,
00098 #else
00099                    mapper.thread_priority(),
00100 #endif
00101                    TheServiceParticipant->scheduler());
00102 
00103   // We cache this value in data member since it doesn't change, and we
00104   // don't want to keep asking for it over and over.
00105   this->max_header_size_ = TransportHeader::max_marshaled_size();
00106 
00107   if (Transport_debug_level >= 2) {
00108     ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportSendStrategy replaced_element_allocator %x with %d chunks\n",
00109                &replaced_element_allocator_, NUM_REPLACED_ELEMENT_CHUNKS));
00110   }
00111 
00112   delayed_delivered_notification_queue_.reserve(this->max_samples_);
00113 }
00114 
00115 TransportSendStrategy::~TransportSendStrategy()
00116 {
00117   DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00118 
00119   delete this->synch_;
00120 
00121   this->delayed_delivered_notification_queue_.clear();
00122 
00123   delete this->elems_;
00124   delete this->queue_;
00125 }
00126 
00127 void
00128 TransportSendStrategy::send_buffer(TransportSendBuffer* send_buffer)
00129 {
00130   this->send_buffer_ = send_buffer;
00131 
00132   if (this->send_buffer_ != 0) {
00133     this->send_buffer_->bind(this);
00134   }
00135 }
00136 
00137 ThreadSynchWorker::WorkOutcome
00138 TransportSendStrategy::perform_work()
00139 {
00140   DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00141 
00142   SendPacketOutcome outcome;
00143   bool no_more_work = false;
00144 
00145   { // scope for the guard(this->lock_);
00146     GuardType guard(this->lock_);
00147 
00148     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00149 
00150     if (this->mode_ == MODE_TERMINATED) {
00151       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00152                 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00153                 "we lost connection and could not reconnect, just return "
00154                 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00155       return WORK_OUTCOME_BROKEN_RESOURCE;
00156     }
00157 
00158     // The perform_work() is called by our synch_ object using
00159     // a thread designated to call this method when it thinks
00160     // we need to be called in order to "service" the queue_ and/or
00161     // deal with a partially sent current packet.
00162     //
00163     // We will return a 0 if we don't see a need to have our perform_work()
00164     // called again, and we will return a 1 if we do see the need to have our
00165     // perform_work() method called again.
00166 
00167     // First, make sure that the mode_ indicates that we are, indeed, in
00168     // the MODE_QUEUE mode.  If we are not in MODE_QUEUE mode (meaning we are
00169     // in MODE_DIRECT), then it means we didn't need to have this perform_work()
00170     // method called - in this case, do nothing other than return
00171     // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
00172     // see a need for it to call our perform_work() again (at least not
00173     // right now).
00174     if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00175       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00176                 "Entered perform_work() and mode_ is %C - just return "
00177                 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00178       return WORK_OUTCOME_NO_MORE_TO_DO;
00179     }
00180 
00181     // Check the "state" of the current packet.  We will either find that the
00182     // current packet is in a state of being "partially sent", or we will find
00183     // it in a state of being "empty".  When the current packet is "empty", it
00184     // means that it is time to build up the current packet using elements
00185     // extracted from the queue_, and then we will attempt to send the
00186     // packet.  When we find the current packet in the "partially sent" state,
00187     // we will not touch the queue_ - we will just try to send the unsent
00188     // bytes in the current (partially sent) packet.
00189     const size_t header_length = this->header_.length_;
00190 
00191     if (header_length == 0) {
00192       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00193                 "The current packet doesn't have any unsent bytes - we "
00194                 "need to 'populate' the current packet with elems from "
00195                 "the queue.\n"), 5);
00196 
00197       // The current packet is "empty".  Build up the current packet using
00198       // elements from the queue_, and prepare the current packet to be sent.
00199 
00200       // Before we build the packet from the queue_, let's make sure that
00201       // there is actually something on the queue_ to build from.
00202       if (this->queue_->size() == 0) {
00203         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00204                   "But the queue is empty.  We have cleared the "
00205                   "backpressure situation.\n"),5);
00206         // We are here because the queue_ is empty, and there isn't
00207         // any "partial packet" bytes left to send.  We have overcome
00208         // the backpressure situation and don't have anything to do
00209         // right now.
00210 
00211         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00212                   "Flip mode to MODE_DIRECT, and return "
00213                   "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00214 
00215         // Flip the mode back to MODE_DIRECT.
00216         this->mode_ = MODE_DIRECT;
00217 
00218         // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
00219         // perform_work() doesn't need to be called again (at this time).
00220         return WORK_OUTCOME_NO_MORE_TO_DO;
00221       }
00222 
00223       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00224                 "There is at least one elem in the queue - get the packet "
00225                 "elems from the queue.\n"), 5);
00226 
00227       // There is stuff in the queue_ if we get to this point in the logic.
00228       // Build-up the current packet using element(s) from the queue_.
00229       this->get_packet_elems_from_queue();
00230 
00231       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00232                 "Prepare the packet from the packet elems_.\n"), 5);
00233 
00234       // Now we can prepare the new packet to be sent.
00235       this->prepare_packet();
00236 
00237       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00238                 "Packet has been prepared from packet elems_.\n"), 5);
00239 
00240     } else {
00241       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00242                 "We have a current packet that still has unsent bytes.\n"), 5);
00243     }
00244 
00245     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00246               "Attempt to send the current packet.\n"), 5);
00247 
00248     // Now we can attempt to send the current packet - whether it is
00249     // a "partially sent" packet or one that we just built-up using elements
00250     // from the queue_ (and subsequently prepared for sending) - it doesn't
00251     // matter.  Just attempt to send as many of the "unsent" bytes in the
00252     // packet as possible.
00253     outcome = this->send_packet();
00254 
00255     // If we sent the whole packet (eg, partial_send is false), and the queue_
00256     // is now empty, then we've cleared the backpressure situation.
00257     if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_->size() == 0)) {
00258       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00259                 "Flip the mode to MODE_DIRECT, and then return "
00260                 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00261 
00262       // Revert back to MODE_DIRECT mode.
00263       this->mode_ = MODE_DIRECT;
00264       no_more_work = true;
00265     }
00266   } // End of scope for guard(this->lock_);
00267 
00268   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00269             "The outcome of the send_packet() was %d.\n", outcome), 5);
00270 
00271   send_delayed_notifications();
00272 
00273   // If we sent the whole packet (eg, partial_send is false), and the queue_
00274   // is now empty, then we've cleared the backpressure situation.
00275   if (no_more_work) {
00276     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00277               "We sent the whole packet, and there is nothing left on "
00278               "the queue now.\n"), 5);
00279 
00280     // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00281     // don't desire another call to this perform_work() method.
00282     return WORK_OUTCOME_NO_MORE_TO_DO;
00283   }
00284 
00285   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00286             "We still have unsent bytes in the current packet AND/OR there "
00287             "are still elements in the queue.\n"), 5);
00288 
00289   if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00290     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00291               "We lost our connection, or had some fatal connection "
00292               "error.  Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00293 
00294     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00295               "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00296 
00297     bool do_suspend = true;
00298     this->relink(do_suspend);
00299 
00300     if (this->mode_ == MODE_SUSPEND) {
00301       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00302                 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00303                 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00304       // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00305       // don't desire another call to this perform_work() method.
00306       return WORK_OUTCOME_NO_MORE_TO_DO;
00307 
00308     } else if (this->mode_ == MODE_TERMINATED) {
00309       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00310                 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00311       return WORK_OUTCOME_BROKEN_RESOURCE;
00312 
00313     } else {
00314       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00315                 "Reconnect succeeded, Notify synch thread of work "
00316                 "availability.\n"), 5);
00317       // If the datalink is re-established then notify the synch
00318       // thread to perform work.  We do not hold the object lock at
00319       // this point.
00320       this->synch_->work_available();
00321     }
00322   }
00323 
00324   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00325             "We still have an 'unbroken' connection.\n"), 5);
00326 
00327   if (outcome == OUTCOME_BACKPRESSURE) {
00328     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00329               "We experienced backpressure on our attempt to send the "
00330               "packet.  Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00331     // We have a "clogged resource".
00332     return WORK_OUTCOME_CLOGGED_RESOURCE;
00333   }
00334 
00335   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00336             "We may have sent the whole current packet, but still have "
00337             "elements on the queue.\n"), 5);
00338   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00339             "Or, we may have only partially sent the current packet.\n"), 5);
00340 
00341   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00342             "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00343 
00344   // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
00345   // in the queue_ to be sent.  *OR* we have have had an OUTCOME_PARTIAL_SEND,
00346   // which equates to the same thing - we still have work to do.
00347 
00348   // We are still in MODE_QUEUE mode, thus there is still work to be
00349   // done to service the queue_ and/or a partially sent current packet.
00350   // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
00351   // want it to call this perform_work() method.
00352   return WORK_OUTCOME_MORE_TO_DO;
00353 }
00354 
00355 // Now we need to "peel off" those message blocks that were fully
00356 // sent, adjust the first message block with an unsent byte to have
00357 // its rd_ptr() pointing to that first unsent byte, and set the
00358 // pkt_chain_ to that first message block with an unsent byte.
00359 // As we "peel off" fully sent message blocks, we need to also deal with
00360 // fully sent elements by removing them from the elems_ and
00361 // calling their data_delivered() method.  In addition, as we peel off
00362 // the message blocks that are fully sent, we need to untie them from
00363 // the chain and release them.
00364 // And finally, don't forget to adjust the header_.length_ to
00365 // account for the num_bytes_sent (beware that some of the num_bytes_sent
00366 // may be packet header bytes and shouldn't affect the header_.length_
00367 // which doesn't include the packet header bytes.
00368 int
00369 TransportSendStrategy::adjust_packet_after_send(ssize_t num_bytes_sent)
00370 {
00371   DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00372 
00373   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00374         "Adjusting the current packet because %d bytes of the packet "
00375         "have been sent.\n", num_bytes_sent));
00376 
00377   ssize_t num_bytes_left = num_bytes_sent;
00378   ssize_t num_non_header_bytes_sent = 0;
00379 
00380   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00381         "Set num_bytes_left to %d.\n", num_bytes_left));
00382   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00383         "Set num_non_header_bytes_sent to %d.\n",
00384         num_non_header_bytes_sent));
00385 
00386   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00387         "Peek at the element at the front of the packet elems_.\n"));
00388 
00389   // This is the element currently at the front of elems_.
00390   TransportQueueElement* element = this->elems_->peek();
00391 
00392   if(!element){
00393     ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00394   } else {
00395     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00396           "Use the element's msg() to find the last block in "
00397           "the msg() chain.\n"));
00398 
00399     // Get a pointer to the last message block in the element.
00400     const ACE_Message_Block* elem_tail_block = element->msg();
00401 
00402     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00403           "Start with tail block == element->msg().\n"));
00404 
00405     while (elem_tail_block->cont() != 0) {
00406       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00407             "Set tail block to its cont() block (next in chain).\n"));
00408       elem_tail_block = elem_tail_block->cont();
00409     }
00410 
00411     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00412           "Tail block now set (because tail block's cont() is 0).\n"));
00413 
00414     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00415           "Start the 'while (num_bytes_left > 0)' loop.\n"));
00416 
00417     while (num_bytes_left > 0) {
00418       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00419             "At top of 'num bytes left' loop.  num_bytes_left == [%d].\n",
00420             num_bytes_left));
00421 
00422       const int block_length = static_cast<int>(this->pkt_chain_->length());
00423 
00424       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00425             "Length of block at front of pkt_chain_ is [%d].\n",
00426             block_length));
00427 
00428       if (block_length <= num_bytes_left) {
00429         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00430               "The whole block at the front of pkt_chain_ was sent.\n"));
00431 
00432         // The entire message block at the front of the chain has been sent.
00433         // Detach the head message block from the chain and adjust
00434         // the pkt_chain_ to point to the next block (if any) in
00435         // the chain.
00436         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00437               "Extract the fully sent block from the pkt_chain_.\n"));
00438 
00439         ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00440 
00441         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00442               "Set pkt_chain_ to pkt_chain_->cont().\n"));
00443 
00444         this->pkt_chain_ = this->pkt_chain_->cont();
00445 
00446         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00447               "Set the fully sent block's cont() to 0.\n"));
00448 
00449         fully_sent_block->cont(0);
00450 
00451         // Update the num_bytes_left to indicate that we have
00452         // processed the entire length of the block.
00453         num_bytes_left -= block_length;
00454 
00455         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00456               "Updated num_bytes_left to account for fully sent "
00457               "block (block_length == [%d]).\n", block_length));
00458         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00459               "Now, num_bytes_left == [%d].\n", num_bytes_left));
00460 
00461         if (!this->header_complete_) {
00462           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00463                 "Since the header_complete_ flag is false, it means "
00464                 "that the packet header block was still in the "
00465                 "pkt_chain_.\n"));
00466 
00467           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00468                 "Not anymore...  Set the header_complete_ flag "
00469                 "to true.\n"));
00470 
00471           // That was the packet header block.  And now we know that it
00472           // has been completely sent.
00473           this->header_complete_ = true;
00474 
00475           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00476                 "Release the fully sent block.\n"));
00477 
00478           // Release the fully_sent_block
00479           fully_sent_block->release();
00480 
00481         } else {
00482           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00483                 "Since the header_complete_ flag is true, it means "
00484                 "that the packet header block was not in the "
00485                 "pkt_chain_.\n"));
00486           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00487                 "So, the fully sent block was part of an element.\n"));
00488 
00489           // That wasn't the packet header block.  It was from the
00490           // element currently at the front of the elems_
00491           // collection.  If it was the last block from the
00492           // element, then we need to extract the element from the
00493           // elems_ collection and invoke data_delivered() on it.
00494           num_non_header_bytes_sent += block_length;
00495 
00496           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00497                 "Updated num_non_header_bytes_sent to account for "
00498                 "fully sent block (block_length == [%d]).\n",
00499                 block_length));
00500 
00501           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00502                 "Now, num_non_header_bytes_sent == [%d].\n",
00503                 num_non_header_bytes_sent));
00504 
00505           if (fully_sent_block->base() == elem_tail_block->base()) {
00506             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00507                   "Ok.  The fully sent block was a duplicate of "
00508                   "the tail block of the element that is at the "
00509                   "front of the packet elems_.\n"));
00510 
00511             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00512                   "This means that we have completely sent the "
00513                   "element at the front of the packet elems_.\n"));
00514 
00515             // This means that we have completely sent the element
00516             // that is currently at the front of the elems_ collection.
00517 
00518             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00519                   "We can release the fully sent block now.\n"));
00520 
00521             // Release the fully_sent_block
00522             fully_sent_block->release();
00523 
00524             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00525                   "We can extract the element from the front of "
00526                   "the packet elems_ (we were just peeking).\n"));
00527 
00528             // Extract the element from the elems_ collection
00529             element = this->elems_->get();
00530 
00531             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00532                   "Tell the element that a decision has been made "
00533                   "regarding its fate - data_delivered().\n"));
00534 
00535             // Inform the element that the data has been delivered.
00536             this->add_delayed_notification(element);
00537 
00538             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00539                   "Peek at the next element in the packet "
00540                   "elems_.\n"));
00541 
00542             // Set up for the next element in elems_ by peek()'ing.
00543             element = this->elems_->peek();
00544 
00545             if (element != 0) {
00546               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00547                     "The is an element still in the packet "
00548                     "elems_ (we are peeking at it now).\n"));
00549 
00550               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00551                     "We are going to find the tail block for the "
00552                     "current element (we are peeking at).\n"));
00553 
00554               // There was a "next element".  Determine the
00555               // elem_tail_block for it.
00556               elem_tail_block = element->msg();
00557 
00558               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00559                     "Start w/tail block == element->msg().\n"));
00560 
00561               while (elem_tail_block->cont() != 0) {
00562                 VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00563                       "Set tail block to next in chain.\n"));
00564                 elem_tail_block = elem_tail_block->cont();
00565               }
00566 
00567               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00568                     "Done finding tail block.\n"));
00569             }
00570 
00571           } else {
00572             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00573                   "Ok.  The fully sent block is *not* a "
00574                   "duplicate of the tail block of the element "
00575                   "at the front of the packet elems_.\n"));
00576 
00577             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00578                   "Thus, we have not completely sent the "
00579                   "element yet.\n"));
00580 
00581             // We didn't completely send the element - it has more
00582             // message blocks that haven't been sent (that we know of).
00583 
00584             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00585                   "We can release the fully_sent_block now.\n"));
00586 
00587             // Release the fully_sent_block
00588             fully_sent_block->release();
00589           }
00590         }
00591 
00592       } else {
00593         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00594               "Only part of the block at the front of pkt_chain_ "
00595               "was sent.\n"));
00596 
00597         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00598               "Advance the rd_ptr() of the front block (of pkt_chain_) "
00599               "by the num_bytes_left (%d).\n", num_bytes_left));
00600 
00601         // Only part of the current block was sent.
00602         this->pkt_chain_->rd_ptr(num_bytes_left);
00603 
00604         if (this->header_complete_) {
00605           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00606                 "And since the packet header block has already been "
00607                 "completely sent, add num_bytes_left to the "
00608                 "num_non_header_bytes_sent.\n"));
00609 
00610           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00611                 "Before, num_non_header_bytes_sent == %d.\n",
00612                 num_non_header_bytes_sent));
00613 
00614           // We know that the current block isn't the packet header
00615           // block because the packet header block has already been
00616           // completely sent.  We need to count these bytes in the
00617           // num_non_header_bytes_sent.
00618           num_non_header_bytes_sent += num_bytes_left;
00619 
00620           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00621                 "After, num_non_header_bytes_sent == %d.\n",
00622                 num_non_header_bytes_sent));
00623         }
00624 
00625         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00626               "Set the num_bytes_left to 0 now.\n"));
00627 
00628         num_bytes_left = 0;
00629       }
00630     }
00631   }
00632 
00633   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00634         "The 'num_bytes_left' loop has completed.\n"));
00635 
00636   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00637         "Adjust the header_.length_ to account for the "
00638         "num_non_header_bytes_sent.\n"));
00639   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00640         "Before, header_.length_ == %d.\n",
00641         this->header_.length_));
00642 
00643   // Adjust the packet header_.length_ to indicate how many non header
00644   // bytes are left to send.
00645   this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00646 
00647   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00648         "After, header_.length_ == %d.\n",
00649         this->header_.length_));
00650 
00651   // Returns 0 if the entire packet was sent, and returns 1 otherwise.
00652   int rc = (this->header_.length_ == 0) ? 0 : 1;
00653 
00654   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00655         "Adjustments all done.  Returning [%d].  0 means entire packet "
00656         "has been sent.  1 means otherwise.\n",
00657         rc));
00658 
00659   return rc;
00660 }
00661 
00662 bool
00663 TransportSendStrategy::send_delayed_notifications(const TransportQueueElement::MatchCriteria* match)
00664 {
00665   DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00666   TransportQueueElement* sample = 0;
00667   SendMode mode = MODE_NOT_SET;
00668 
00669   OPENDDS_VECTOR(TQESendModePair) samples;
00670 
00671   size_t num_delayed_notifications = 0;
00672   bool found_element = false;
00673 
00674   {
00675     GuardType guard(lock_);
00676 
00677     num_delayed_notifications = delayed_delivered_notification_queue_.size();
00678 
00679     if (num_delayed_notifications == 0) {
00680       return false;
00681 
00682     } else if (num_delayed_notifications == 1) {
00683       // Optimization for the most common case (doesn't need vectors)
00684 
00685       if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00686         found_element = true;
00687         sample = delayed_delivered_notification_queue_[0].first;
00688         mode = delayed_delivered_notification_queue_[0].second;
00689 
00690         delayed_delivered_notification_queue_.clear();
00691       }
00692 
00693     } else {
00694       OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00695       for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00696         sample = iter->first;
00697         mode = iter->second;
00698         if (!match || match->matches(*sample)) {
00699           found_element = true;
00700           samples.push_back(*iter);
00701           iter = delayed_delivered_notification_queue_.erase(iter);
00702         } else {
00703           ++iter;
00704         }
00705       }
00706     }
00707   }
00708 
00709   if (!found_element)
00710     return false;
00711 
00712   if (num_delayed_notifications == 1) {
00713     // optimization for the common case
00714     if (mode == MODE_TERMINATED) {
00715       if (!transport_shutdown_ || sample->owned_by_transport()) {
00716         sample->data_dropped(true);
00717       }
00718     } else {
00719       if (!transport_shutdown_ || sample->owned_by_transport()) {
00720         sample->data_delivered();
00721       }
00722     }
00723 
00724   } else {
00725     for (size_t i = 0; i < samples.size(); ++i) {
00726       if (samples[i].second == MODE_TERMINATED) {
00727         if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00728           samples[i].first->data_dropped(true);
00729         }
00730       } else {
00731         if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00732           samples[i].first->data_delivered();
00733         }
00734       }
00735     }
00736   }
00737   return true;
00738 }
00739 
00740 /// Remove all samples in the backpressure queue and packet queue.
00741 void
00742 TransportSendStrategy::terminate_send(bool graceful_disconnecting)
00743 {
00744   DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00745 
00746   bool reset_flag = true;
00747 
00748   {
00749     GuardType guard(this->lock_);
00750 
00751     // If the terminate_send call due to a non-graceful disconnection before
00752     // a datalink shutdown then we will not try to send the graceful disconnect
00753     // message.
00754     if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00755         && !this->graceful_disconnecting_) {
00756       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00757             "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00758       reset_flag = false;
00759     }
00760   }
00761 
00762   VDBG((LM_DEBUG, "(%P|%t) DBG:  Now flip to MODE_TERMINATED \n"));
00763 
00764   this->clear(MODE_TERMINATED);
00765 
00766   if (reset_flag) {
00767     GuardType guard(this->lock_);
00768     this->graceful_disconnecting_ = graceful_disconnecting;
00769   }
00770 }
00771 
00772 void
00773 TransportSendStrategy::clear(SendMode mode)
00774 {
00775   DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00776 
00777   send_delayed_notifications();
00778   QueueType* elems = 0;
00779   QueueType* queue = 0;
00780   {
00781     GuardType guard(this->lock_);
00782 
00783     if (this->header_.length_ > 0) {
00784       // Clear the messages in the pkt_chain_ that is partially sent.
00785       // We just reuse these functions for normal partial send except actual sending.
00786       int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00787       int result = this->adjust_packet_after_send(num_bytes_left);
00788 
00789       if (result == 0) {
00790         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00791               "The adjustment logic says that the packet is cleared.\n"));
00792 
00793       } else {
00794         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00795               "The adjustment returned partial sent.\n"));
00796       }
00797     }
00798 
00799     elems = this->elems_;
00800     this->elems_ = new QueueType(1, this->transport_inst_->max_samples_per_packet_);
00801     queue = this->queue_;
00802     this->queue_ = new QueueType(this->transport_inst_->queue_messages_per_pool_,
00803                                  this->transport_inst_->queue_initial_pools_);
00804 
00805     this->header_.length_ = 0;
00806     this->pkt_chain_ = 0;
00807     this->header_complete_ = false;
00808     this->start_counter_ = 0;
00809     this->mode_ = mode;
00810     this->mode_before_suspend_ = MODE_NOT_SET;
00811   }
00812 
00813   // We need remove the queued elements outside the lock,
00814   // otherwise we have a deadlock situation when remove vistor
00815   // calls the data_droped on each dropped elements.
00816 
00817   // Clear all samples in queue.
00818   RemoveAllVisitor remove_all_visitor;
00819 
00820   elems->accept_remove_visitor(remove_all_visitor);
00821   queue->accept_remove_visitor(remove_all_visitor);
00822 
00823   delete elems;
00824   delete queue;
00825 }
00826 
00827 int
00828 TransportSendStrategy::start()
00829 {
00830   DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00831 
00832   {
00833     GuardType guard(this->lock_);
00834 
00835     if (!this->start_i()) {
00836       return -1;
00837     }
00838   }
00839 
00840   size_t header_chunks(1);
00841 
00842   // If a secondary send buffer is bound, sent headers should
00843   // be cached to properly maintain the buffer:
00844   if (this->send_buffer_ != 0) {
00845     header_chunks += this->send_buffer_->capacity();
00846 
00847   } else {
00848     header_chunks += 1;
00849   }
00850 
00851   ACE_NEW_RETURN(this->header_db_allocator_,
00852                  TransportDataBlockAllocator(header_chunks),
00853                  -1);
00854 
00855   ACE_NEW_RETURN(this->header_mb_allocator_,
00856                  TransportMessageBlockAllocator(header_chunks),
00857                  -1);
00858 
00859   // Since we (the TransportSendStrategy object) are a reference-counted
00860   // object, but the synch_ object doesn't necessarily know this, we need
00861   // to give a "copy" of a reference to ourselves to the synch_ object here.
00862   // We will do the reverse when we unregister ourselves (as a worker) from
00863   // the synch_ object.
00864   //MJM: The synch thingie knows to not "delete" us, right?
00865   this->_add_ref();
00866 
00867   if (this->synch_->register_worker(this) == -1) {
00868     // Take back our "copy".
00869     this->_remove_ref();
00870     ACE_ERROR_RETURN((LM_ERROR,
00871                       "(%P|%t) ERROR: TransportSendStrategy failed to register "
00872                       "as a worker with the ThreadSynch object.\n"),
00873                      -1);
00874   }
00875 
00876   return 0;
00877 }
00878 
00879 void
00880 TransportSendStrategy::stop()
00881 {
00882   DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00883 
00884   if (this->header_block_ != 0) {
00885     this->header_block_->release ();
00886     this->header_block_ = 0;
00887   }
00888 
00889   this->synch_->unregister_worker();
00890 
00891   // Since we gave the synch_ a "copy" of a reference to ourselves, we need
00892   // to take it back now.
00893   this->_remove_ref();
00894 
00895   {
00896     GuardType guard(this->lock_);
00897 
00898     if (this->pkt_chain_ != 0) {
00899       size_t size = this->pkt_chain_->total_length();
00900 
00901       if (size > 0) {
00902         this->pkt_chain_->release();
00903         ACE_DEBUG((LM_WARNING,
00904                    ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00905                    ACE_TEXT("terminating with %d unsent bytes.\n"),
00906                    size));
00907       }
00908     }
00909   }
00910 
00911   delete this->header_mb_allocator_;
00912   delete this->header_db_allocator_;
00913 
00914   {
00915     GuardType guard(this->lock_);
00916 
00917     this->stop_i();
00918   }
00919 
00920   // TBD SOON - What about all of the samples that may still be stuck in
00921   //            our queue_ and/or elems_?
00922 }
00923 
00924 void
00925 TransportSendStrategy::send(TransportQueueElement* element, bool relink)
00926 {
00927   if (Transport_debug_level > 9) {
00928     ACE_DEBUG((LM_DEBUG,
00929                ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00930                ACE_TEXT("sending data at 0x%x.\n"),
00931                id(), element));
00932   }
00933 
00934   DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00935 
00936   {
00937     GuardType guard(this->lock_);
00938 
00939     if (this->link_released_) {
00940       this->add_delayed_notification(element);
00941 
00942     } else {
00943       if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00944         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00945               "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00946               "graceful disconnecting, so discard message.\n"));
00947         element->data_dropped(true);
00948         return;
00949       }
00950 
00951       size_t element_length = element->msg()->total_length();
00952 
00953       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00954             "Send element msg() has total_length() == [%d].\n",
00955             element_length));
00956 
00957       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00958             "this->max_header_size_ == [%d].\n",
00959             this->max_header_size_));
00960 
00961       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00962             "this->max_size_ == [%d].\n",
00963             this->max_size_));
00964 
00965       const size_t max_message_size = this->max_message_size();
00966 
00967       // Really an assert.  We can't accept any element that wouldn't fit into
00968       // a transport packet by itself (ie, it would be the only element in the
00969       // packet).  This max_size_ is the user-configurable maximum, not based
00970       // on the transport's inherent maximum message size.  If max_message_size
00971       // is non-zero, we will fragment so max_size_ doesn't apply per-element.
00972       if (max_message_size == 0 &&
00973           this->max_header_size_ + element_length > this->max_size_) {
00974         ACE_ERROR((LM_ERROR,
00975                    "(%P|%t) ERROR: Element too large (%Q) "
00976                    "- won't fit into packet.\n", ACE_UINT64(element_length)));
00977         return;
00978       }
00979 
00980       // Check the mode_ to see if we simply put the element on the queue.
00981       if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00982         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00983                   "this->mode_ == %C, so queue elem and leave.\n",
00984                   mode_as_str(this->mode_)), 5);
00985 
00986         this->queue_->put(element);
00987 
00988         if (this->mode_ != MODE_SUSPEND) {
00989           this->synch_->work_available();
00990         }
00991 
00992         return;
00993       }
00994 
00995       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00996             "this->mode_ == MODE_DIRECT.\n"));
00997 
00998       // We are in the MODE_DIRECT send mode.  When in this mode, the send()
00999       // calls will "build up" the transport packet to be sent directly when it
01000       // reaches the optimal size, contains the maximum number of samples, etc.
01001 
01002       // We need to check if the current element (the arg passed-in to this
01003       // send() method) should be appended to the transport packet, or if the
01004       // transport packet should be sent (directly) first, dealing with the
01005       // current element afterwards.
01006 
01007       // We will decide to send the packet as it is now, under two circumstances:
01008       //
01009       //    Either:
01010       //
01011       //    (1) The current element won't fit into the current packet since it
01012       //        would violate the max_packet_size_.
01013       //
01014       //    -OR-
01015       //
01016       //    (2) There is at least one element already in the current packet,
01017       //        and the current element says that it must be sent in an
01018       //        exclusive packet (ie, in a packet all by itself).
01019       //
01020       const bool exclusive = element->requires_exclusive_packet();
01021 
01022       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01023             "The element %C require an exclusive packet.\n",
01024             (exclusive ? "DOES" : "does NOT")
01025           ));
01026 
01027       const size_t space_needed =
01028         (max_message_size > 0)
01029         ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG
01030         : /* not fragmenting */ element_length;
01031 
01032       if ((exclusive && (this->elems_->size() != 0))
01033           || (this->space_available() < space_needed)) {
01034 
01035         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01036               "Element won't fit in current packet or requires exclusive"
01037               " - send current packet (directly) now.\n"));
01038 
01039         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01040               "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01041               , this->max_header_size_, this->header_.length_, element_length));
01042 
01043         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01044               "Tot possible length: %d, max_len: %d\n"
01045               , this->max_header_size_ + this->header_.length_ + element_length
01046               , this->max_size_));
01047         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01048               "current elem size: %d\n"
01049               , this->elems_->size()));
01050 
01051         // Send the current packet, and deal with the current element
01052         // afterwards.
01053         // The invocation's relink status should dictate the direct_send's
01054         // relink. We don't want a (relink == false) invocation to end up
01055         // doing a relink. Think of (relink == false) as a non-blocking call.
01056         this->direct_send(relink);
01057 
01058         // Now check to see if we flipped into MODE_QUEUE, which would mean
01059         // that the direct_send() experienced backpressure, and the
01060         // packet was only partially sent.  If this has happened, we deal with
01061         // the current element by placing it on the queue (and then we are done).
01062         //
01063         // Otherwise, if the mode_ is still MODE_DIRECT, we can just
01064         // "drop" through to the next step in the logic where we append the
01065         // current element to the current packet.
01066         if (this->mode_ == MODE_QUEUE) {
01067           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01068                     "We experienced backpressure on that direct send, as "
01069                     "the mode_ is now MODE_QUEUE or MODE_SUSPEND.  "
01070                     "Queue elem and leave.\n"), 5);
01071           this->queue_->put(element);
01072           this->synch_->work_available();
01073 
01074           return;
01075         }
01076       }
01077 
01078       // Loop for sending 'element', in fragments if needed
01079       bool first_pkt = true; // enter the loop 1st time through unconditionally
01080       for (TransportQueueElement* next_fragment = 0;
01081            (first_pkt || next_fragment)
01082            && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01083            // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
01084 
01085         if (next_fragment) {
01086           element = next_fragment;
01087           element_length = next_fragment->msg()->total_length();
01088           this->header_.first_fragment_ = false;
01089         }
01090 
01091         this->header_.last_fragment_ = false;
01092         if (max_message_size) { // fragmentation enabled
01093           const size_t avail = this->space_available();
01094           if (element_length > avail) {
01095             VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting\n"), 0);
01096             ElementPair ep = element->fragment(avail);
01097             element = ep.first;
01098             element_length = element->msg()->total_length();
01099             next_fragment = ep.second;
01100             this->header_.first_fragment_ = first_pkt;
01101           } else if (next_fragment) {
01102             // We are sending the "tail" element of a previous fragment()
01103             // operation, and this element didn't itself require fragmentation
01104             this->header_.last_fragment_ = true;
01105             next_fragment = 0;
01106           }
01107         }
01108         first_pkt = false;
01109 
01110         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01111               "Start the 'append elem' to current packet logic.\n"));
01112 
01113         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01114               "Put element into current packet elems_.\n"));
01115 
01116         // Now that we know the current element should go into the current
01117         // packet, we can just go ahead and "append" the current element to
01118         // the current packet.
01119 
01120         // Add the current element to the collection of packet elements.
01121         this->elems_->put(element);
01122 
01123         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01124               "Before, the header_.length_ == [%d].\n",
01125               this->header_.length_));
01126 
01127         // Adjust the header_.length_ to account for the length of the element.
01128         this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01129         const size_t message_length = this->header_.length_;
01130 
01131         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01132               "After adding element's length, the header_.length_ == [%d].\n",
01133               message_length));
01134 
01135         // The current packet now contains the current element.  We need to
01136         // check to see if the conditions are such that we should go ahead and
01137         // attempt to send the packet "directly" now, or if we can just leave
01138         // and send the current packet later (in another send() call or in a
01139         // send_stop() call).
01140 
01141         // There a few conditions that will cause us to attempt to send the
01142         // packet (directly) right now:
01143         // - Fragmentation was needed
01144         // - The current packet has the maximum number of samples per packet.
01145         // - The current packet's total length exceeds the optimum packet size.
01146         // - The current element (currently part of the packet elems_)
01147         //   requires an exclusive packet.
01148         //
01149         if (next_fragment || (this->elems_->size() >= this->max_samples_)
01150             || (this->max_header_size_ + message_length > this->optimum_size_)
01151             || exclusive) {
01152           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01153                 "Now the current packet looks full - send it (directly).\n"));
01154 
01155           this->direct_send(relink);
01156 
01157           if (next_fragment && this->mode_ != MODE_DIRECT) {
01158             if (this->mode_ == MODE_QUEUE) {
01159               this->queue_->put(next_fragment);
01160               this->synch_->work_available();
01161 
01162             } else {
01163               next_fragment->data_dropped(true /* dropped by transport */);
01164             }
01165           } else if (mode_ == MODE_QUEUE) {
01166             // Background thread handles packets in progress
01167             this->synch_->work_available();
01168           }
01169 
01170           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01171                 "Back from the direct_send() attempt.\n"));
01172 
01173           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01174                 "And we %C as a result of the direct_send() call.\n",
01175                 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01176                                              : "stayed in MODE_DIRECT")));
01177 
01178         } else {
01179           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01180                 "Packet not sent. Send conditions weren't satisfied.\n"));
01181           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01182                 "elems_->size(): %d, max_samples_: %d\n",
01183                 int(this->elems_->size()), int(this->max_samples_)));
01184           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01185                 "header_size_: %d, optimum_size_: %d\n",
01186                 int(this->max_header_size_ + message_length),
01187                 int(this->optimum_size_)));
01188           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01189                 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01190 
01191           if (this->mode_ == MODE_QUEUE) {
01192             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01193                   "We flipped into MODE_QUEUE.\n"));
01194 
01195           } else {
01196             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01197                   "We stayed in MODE_DIRECT.\n"));
01198           }
01199         }
01200       }
01201     }
01202   }
01203 
01204   send_delayed_notifications();
01205 }
01206 
01207 void
01208 TransportSendStrategy::send_stop(RepoId /*repoId*/)
01209 {
01210   DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01211   {
01212     GuardType guard(this->lock_);
01213 
01214     if (this->link_released_)
01215       return;
01216 
01217     if (this->start_counter_ == 0) {
01218       // This is an indication of a logic error.  This is more of an assert.
01219       VDBG_LVL((LM_ERROR,
01220                 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01221       return;
01222     }
01223 
01224     --this->start_counter_;
01225 
01226     if (this->start_counter_ != 0) {
01227       // This wasn't the last send_stop() that we are expecting.  We only
01228       // really honor the first send_start() and the last send_stop().
01229       // We can return without doing anything else in this case.
01230       return;
01231     }
01232 
01233     if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01234       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01235             "TransportSendStrategy::send_stop: dont try to send current packet "
01236             "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01237       return;
01238     }
01239 
01240     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01241           "This is an 'important' send_stop() event since our "
01242           "start_counter_ is 0.\n"));
01243 
01244     // We just caused the start_counter_ to become zero.  This
01245     // means that we aren't expecting another send() or send_stop() at any
01246     // time in the near future (ie, it isn't imminent).
01247 
01248     // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
01249     // anything to do here because samples have already been going to the
01250     // queue.
01251 
01252     // We only need to do something if the mode_ is
01253     // MODE_DIRECT.  It means that we may have some sample(s) in the
01254     // current packet that have never been sent.  This is our
01255     // opportunity to send the current packet directly if this is the case.
01256     if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01257       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01258             "But since we are in %C, we don't have to do "
01259             "anything more in this important send_stop().\n",
01260             mode_as_str(this->mode_)));
01261       // We don't do anything if we are in MODE_QUEUE.  Just leave.
01262       return;
01263     }
01264 
01265     size_t header_length = this->header_.length_;
01266     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01267           "We are in MODE_DIRECT in an important send_stop() - "
01268           "header_.length_ == [%d].\n", header_length));
01269 
01270     // Only attempt to send the current packet (directly) if the current
01271     // packet actually contains something (it could be empty).
01272     if ((header_length > 0) &&
01273         //(this->elems_->size ()+this->not_yet_pac_q_->size() > 0))
01274         (this->elems_->size() > 0)) {
01275       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01276             "There is something in the current packet - attempt to send "
01277             "it (directly) now.\n"));
01278       // If a relink needs to be done for this packet to be sent, do it.
01279       this->direct_send(true);
01280       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01281             "Back from the attempt to send leftover packet directly.\n"));
01282 
01283       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01284             "But we %C as a result.\n",
01285             ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01286                                           "stayed in MODE_DIRECT" )));
01287       if (this->mode_ == MODE_QUEUE  && this->mode_ != MODE_SUSPEND) {
01288         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01289               "Notify Synch thread of work availability\n"));
01290         this->synch_->work_available();
01291       }
01292     }
01293   }
01294 
01295   send_delayed_notifications();
01296 }
01297 
01298 void
01299 TransportSendStrategy::remove_all_msgs(RepoId pub_id)
01300 {
01301   DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01302 
01303   const TransportQueueElement::MatchOnPubId match(pub_id);
01304   send_delayed_notifications(&match);
01305 
01306   GuardType guard(this->lock_);
01307 
01308   if (this->send_buffer_ != 0) {
01309     // If a secondary send buffer is bound, removed samples must
01310     // be retained in order to properly maintain the buffer:
01311     this->send_buffer_->retain_all(pub_id);
01312   }
01313 
01314   do_remove_sample(pub_id, match);
01315 }
01316 
01317 RemoveResult
01318 TransportSendStrategy::remove_sample(const DataSampleElement* sample)
01319 {
01320   DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01321 
01322   VDBG_LVL((LM_DEBUG, "(%P|%t)  Removing sample: %@\n", sample->get_sample()), 5);
01323 
01324   // The sample to remove is either in temporary delayed notification list or
01325   // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
01326   // notification list by transport thread, it needs acquire WriterDataContainer lock for
01327   // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
01328   // complete as this call already hold the WriterContainer's lock. So this call is safe to
01329   // access the sample to remove. If it's going to be removed by this remove_sample() calling
01330   // thread, it will be removed either from delayed notification list or from internal list
01331   // in which case the element carry the info if the sample is released so the datalinkset
01332   // can stop calling rest datalinks to remove this sample if it's already released..
01333 
01334   const char* const payload = sample->get_sample()->cont()->rd_ptr();
01335   RepoId pub_id = sample->get_pub_id();
01336   const TransportQueueElement::MatchOnDataPayload modp(payload);
01337   if (send_delayed_notifications(&modp)) {
01338     return REMOVE_RELEASED;
01339   }
01340 
01341   GuardType guard(this->lock_);
01342   return do_remove_sample(pub_id, modp);
01343 }
01344 
01345 RemoveResult
01346 TransportSendStrategy::do_remove_sample(const RepoId&,
01347   const TransportQueueElement::MatchCriteria& criteria)
01348 {
01349   DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01350 
01351   //ciju: Tim had the idea that we could do the following check
01352   // if ((this->mode_ == MODE_DIRECT) ||
01353   //     ((this->pkt_chain_ == 0) && (queue_ == empty)))
01354   // then we can assume that the sample can be safely removed (no need for
01355   // replacement) from the elems_ queue.
01356   if ((this->mode_ == MODE_DIRECT)
01357       || ((this->pkt_chain_ == 0) && (this->queue_->size() == 0))) {
01358     //ciju: I believe this is the only mode where a safe
01359     // assumption can be made that the samples
01360     // in the elems_ queue aren't part of a packet.
01361     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01362           "The mode is MODE_DIRECT, or the queue is empty and no "
01363           "transport packet is in progress.\n"));
01364 
01365     QueueRemoveVisitor simple_rem_vis(criteria);
01366     this->elems_->accept_remove_visitor(simple_rem_vis);
01367 
01368     const RemoveResult status = simple_rem_vis.status();
01369 
01370     if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01371       this->header_.length_ -= simple_rem_vis.removed_bytes();
01372 
01373     } else if (status == REMOVE_NOT_FOUND) {
01374       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01375             "Failed to find the sample to remove.\n"));
01376     }
01377 
01378     return status;
01379   }
01380 
01381   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01382         "Visit the queue_ with the RemoveElementVisitor.\n"));
01383 
01384   QueueRemoveVisitor simple_rem_vis(criteria);
01385   this->queue_->accept_remove_visitor(simple_rem_vis);
01386 
01387   RemoveResult status = simple_rem_vis.status();
01388 
01389   if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01390     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01391           "The sample was removed from the queue_.\n"));
01392     // This means that the visitor did not encounter any fatal error
01393     // along the way, *AND* the sample was found in the queue_,
01394     // and has now been removed.  We are done.
01395     return status;
01396   }
01397 
01398   if (status == REMOVE_ERROR) {
01399     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01400           "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01401     // This means that the visitor encountered some fatal error along
01402     // the way (and it already reported something to the log).
01403     // Return our failure code.
01404     return status;
01405   }
01406 
01407   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01408         "The RemoveElementVisitor did not find the sample in queue_.\n"));
01409 
01410   // We get here if the visitor did not encounter any fatal error, but it
01411   // also didn't find the sample - and hence it didn't perform any
01412   // "remove sample" logic.
01413 
01414   // Now we need to turn our attention to the current transport packet,
01415   // since the packet is likely in a "partially sent" state, and the
01416   // sample may still be contributing unsent bytes in the pkt_chain_.
01417 
01418   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01419         "Visit our elems_ with the PacketRemoveVisitor.\n"));
01420 
01421   PacketRemoveVisitor pac_rem_vis(criteria,
01422                                   this->pkt_chain_,
01423                                   this->header_block_,
01424                                   this->replaced_element_allocator_,
01425                                   this->replaced_element_mb_allocator_,
01426                                   this->replaced_element_db_allocator_);
01427 
01428   this->elems_->accept_replace_visitor(pac_rem_vis);
01429 
01430   status = pac_rem_vis.status();
01431 
01432   if (status == REMOVE_ERROR) {
01433     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01434           "The PacketRemoveVisitor encountered a fatal error.\n"));
01435 
01436   } else if (status == REMOVE_NOT_FOUND) {
01437     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01438           "The PacketRemoveVisitor didn't find the sample.\n"));
01439 
01440   } else {
01441     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01442           "The PacketRemoveVisitor found the sample and removed it.\n"));
01443   }
01444 
01445   return status;
01446 }
01447 
01448 void
01449 TransportSendStrategy::direct_send(bool relink)
01450 {
01451   DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01452 
01453   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01454         "Prepare the current packet for a direct send attempt.\n"));
01455 
01456   // Prepare the packet for sending.
01457   this->prepare_packet();
01458 
01459   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01460         "Now attempt to send the packet.\n"));
01461 
01462   // We will try resend the packet if the send() fails and then connection
01463   // is re-established.  Only loops if the "continue" line is hit.
01464   while (true) {
01465     // Attempt to send the packet
01466     const SendPacketOutcome outcome = this->send_packet();
01467 
01468     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01469           "The outcome of the send_packet() was %d.\n", outcome));
01470 
01471     if ((outcome == OUTCOME_BACKPRESSURE) ||
01472         (outcome == OUTCOME_PARTIAL_SEND)) {
01473       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01474                 "The outcome of the send_packet() was either "
01475                 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01476 
01477       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01478                 "Flip into the MODE_QUEUE mode_.\n"), 5);
01479 
01480       // We encountered backpressure, or only sent part of the packet.
01481       this->mode_ = MODE_QUEUE;
01482 
01483     } else if ((outcome == OUTCOME_PEER_LOST) ||
01484                (outcome == OUTCOME_SEND_ERROR)) {
01485       if (outcome == OUTCOME_SEND_ERROR) {
01486         ACE_ERROR((LM_WARNING,
01487                    ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01488                    ACE_TEXT("send buffer management: %p.\n"),
01489                    ACE_TEXT("send_bytes")));
01490 
01491         if (Transport_debug_level > 0) {
01492           this->transport_inst_->dump();
01493         }
01494       } else {
01495         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01496               "The outcome of the send_packet() was "
01497               "OUTCOME_PEER_LOST.\n"));
01498       }
01499 
01500       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01501                 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01502 
01503       if (this->mode_ != MODE_SUSPEND) {
01504         this->mode_before_suspend_ = this->mode_;
01505         this->mode_ = MODE_SUSPEND;
01506       }
01507 
01508       if (relink) {
01509         bool do_suspend = false;
01510         this->relink(do_suspend);
01511 
01512         if (this->mode_ == MODE_SUSPEND) {
01513           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01514                     "The reconnect has not done yet and we are "
01515                     "still in MODE_SUSPEND.\n"), 5);
01516 
01517         } else if (this->mode_ == MODE_TERMINATED) {
01518           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01519                     "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01520 
01521         } else {
01522           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01523                     "Try send the packet again since the connection "
01524                     "is re-established.\n"), 5);
01525 
01526           // Try send the packet again since the connection is re-established.
01527           continue;
01528         }
01529       }
01530 
01531     } else {
01532       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01533             "The outcome of the send_packet() must have been "
01534             "OUTCOME_COMPLETE_SEND.\n"));
01535       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01536             "So, we will just stay in MODE_DIRECT.\n"));
01537     }
01538 
01539     break;
01540   }
01541 
01542   // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
01543 }
01544 
01545 void
01546 TransportSendStrategy::get_packet_elems_from_queue()
01547 {
01548   DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01549 
01550   for (TransportQueueElement* element = this->queue_->peek(); element != 0;
01551        element = this->queue_->peek()) {
01552 
01553     // Total number of bytes in the current element's message block chain.
01554     size_t element_length = element->msg()->total_length();
01555 
01556     // Flag used to determine if the element requires a packet all to itself.
01557     const bool exclusive_packet = element->requires_exclusive_packet();
01558 
01559     const size_t avail = this->space_available();
01560 
01561     bool frag = false;
01562     if (element_length > avail) {
01563       // The current element won't fit into the current packet
01564       if (this->max_message_size()) { // fragmentation enabled
01565         this->header_.first_fragment_ = !element->is_fragment();
01566         VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting from queue\n"), 0);
01567         ElementPair ep = element->fragment(avail);
01568         element = ep.first;
01569         element_length = element->msg()->total_length();
01570         this->queue_->replace_head(ep.second);
01571         frag = true; // queue_ is already taken care of, don't get() later
01572       } else {
01573         break;
01574       }
01575     }
01576 
01577     // If exclusive and the current packet is empty, we won't violate the
01578     // exclusive_packet requirement by put()'ing the element
01579     // into the elems_ collection.
01580     if ((exclusive_packet && this->elems_->size() == 0)
01581         || !exclusive_packet) {
01582       // At this point, we have passed all of the pre-conditions and we can
01583       // now extract the current element from the queue_, put it into the
01584       // packet elems_, and adjust the packet header_.length_.
01585       this->elems_->put(frag ? element : this->queue_->get());
01586       if (this->header_.length_ == 0) {
01587         this->header_.last_fragment_ = !frag && element->is_fragment();
01588       }
01589       this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01590       VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Packetizing from queue\n"), 0);
01591     }
01592 
01593     // With exclusive and (elems_.size() != 0), we don't use the current
01594     // element as part of the packet.  We know that there is already
01595     // at least one element in the packet, and the current element
01596     // is going to need its own (exclusive) packet.  We will just
01597     // use the packet elems_ as it is now.  Always break once
01598     // we've encountered and dealt with the exclusive_packet case.
01599     // Also break if fragmentation was required.
01600     if (exclusive_packet || frag
01601         // If the current number of packet elems_ has reached the maximum
01602         // number of samples per packet, then we are done.
01603         || this->elems_->size() == this->max_samples_
01604         // If the current value of the header_.length_ exceeds (or equals)
01605         // the optimum_size_ for a packet, then we are done.
01606         || this->header_.length_ >= this->optimum_size_) {
01607       break;
01608     }
01609   }
01610 }
01611 
01612 void
01613 TransportSendStrategy::prepare_header()
01614 {
01615   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01616 
01617   // Increment header sequence for packet:
01618   this->header_.sequence_ = ++this->header_sequence_;
01619 
01620   // Allow the specific implementation the opportunity to set
01621   // values in the packet header.
01622   this->prepare_header_i();
01623 }
01624 
01625 void
01626 TransportSendStrategy::prepare_header_i()
01627 {
01628   DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01629 
01630   // Default implementation does nothing.
01631 }
01632 
01633 void
01634 TransportSendStrategy::prepare_packet()
01635 {
01636   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01637 
01638   // Prepare the header for sending.
01639   this->prepare_header();
01640 
01641   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01642         "Marshal the packet header.\n"));
01643 
01644   if (this->header_block_ != 0) {
01645     this->header_block_->release();
01646   }
01647 
01648   ACE_NEW_MALLOC(this->header_block_,
01649     static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01650     ACE_Message_Block(this->max_header_size_,
01651                       ACE_Message_Block::MB_DATA,
01652                       0,
01653                       0,
01654                       0,
01655                       0,
01656                       ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01657                       ACE_Time_Value::zero,
01658                       ACE_Time_Value::max_time,
01659                       this->header_db_allocator_,
01660                       this->header_mb_allocator_));
01661 
01662   marshal_transport_header(this->header_block_);
01663 
01664   this->pkt_chain_ = this->header_block_->duplicate();
01665 
01666   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01667         "Use a BuildChainVisitor to visit the packet elems_.\n"));
01668 
01669   // Build up a chain of blocks by duplicating the message block chain
01670   // held by each element (in elems_), and then chaining the new duplicate
01671   // blocks together to form one long chain.
01672   BuildChainVisitor visitor;
01673   this->elems_->accept_visitor(visitor);
01674 
01675   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01676         "Attach the visitor's chain of blocks to the lone (packet "
01677         "header) block currently in the pkt_chain_.\n"));
01678 
01679   // Attach the visitor's chain of blocks to the packet header block.
01680   this->pkt_chain_->cont(visitor.chain());
01681 
01682   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01683         "Increment header sequence for next packet.\n"));
01684 
01685   // Allow the specific implementation the opportunity to process the
01686   // newly prepared packet.
01687   this->prepare_packet_i();
01688 
01689   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01690         "Set the header_complete_ flag to false.\n"));
01691 
01692   // Set the header_complete_ to false to indicate
01693   // that the first block in the pkt_chain_ is the packet header block
01694   // (actually a duplicate() of the packet header_block_).
01695   this->header_complete_ = false;
01696 }
01697 
01698 void
01699 TransportSendStrategy::marshal_transport_header(ACE_Message_Block* mb)
01700 {
01701   *mb << this->header_;
01702 }
01703 
01704 void
01705 TransportSendStrategy::prepare_packet_i()
01706 {
01707   DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01708 
01709   // Default implementation does nothing.
01710 }
01711 
01712 void
01713 TransportSendStrategy::set_graceful_disconnecting(bool flag)
01714 {
01715   this->graceful_disconnecting_ = flag;
01716 }
01717 
01718 ssize_t
01719 TransportSendStrategy::do_send_packet(const ACE_Message_Block* packet, int& bp)
01720 {
01721   if (Transport_debug_level > 9) {
01722     ACE_DEBUG((LM_DEBUG,
01723                ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01724                ACE_TEXT("sending data at 0x%x.\n"),
01725                id(), packet));
01726   }
01727   DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01728 
01729   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01730             "Populate the iovec array using the packet.\n"), 5);
01731 
01732   iovec iov[MAX_SEND_BLOCKS];
01733 
01734   int num_blocks = mb_to_iov(*packet, iov);
01735 
01736   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01737             "There are [%d] number of entries in the iovec array.\n",
01738             num_blocks), 5);
01739 
01740   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01741             "Attempt to send_bytes() now.\n"), 5);
01742 
01743   const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01744 
01745   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01746             "The send_bytes() said that num_bytes_sent == [%d].\n",
01747             num_bytes_sent), 5);
01748 
01749   return num_bytes_sent;
01750 }
01751 
01752 TransportSendStrategy::SendPacketOutcome
01753 TransportSendStrategy::send_packet()
01754 {
01755   DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01756 
01757   int bp_flag = 0;
01758   const ssize_t num_bytes_sent =
01759     this->do_send_packet(this->pkt_chain_, bp_flag);
01760 
01761   if (num_bytes_sent == 0) {
01762     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01763               "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01764     // This means that the peer has disconnected.
01765     return OUTCOME_PEER_LOST;
01766   }
01767 
01768   if (num_bytes_sent < 0) {
01769     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01770               "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01771 
01772     // Check for backpressure...
01773     if (bp_flag == 1) {
01774       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01775                 "Since backpressure flag is true, return "
01776                 "OUTCOME_BACKPRESSURE.\n"), 5);
01777       // Ok.  Not really an error - just backpressure.
01778       return OUTCOME_BACKPRESSURE;
01779     }
01780 
01781     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01782               "Since backpressure flag is false, return "
01783               "OUTCOME_SEND_ERROR.\n"), 5);
01784 
01785     // Not backpressure - it's a real error.
01786     // Note: moved this to send_bytes so the errno msg could be written.
01787     //ACE_ERROR((LM_ERROR,
01788     //           "(%P|%t) ERROR: Call to peer().send() failed with negative "
01789     //           "return code.\n"));
01790 
01791     return OUTCOME_SEND_ERROR;
01792   }
01793 
01794   if (this->send_buffer_ != 0) {
01795     // If a secondary send buffer is bound, sent samples must
01796     // be inserted in order to properly maintain the buffer:
01797     this->send_buffer_->insert(this->header_.sequence_,
01798       this->elems_, this->pkt_chain_);
01799   }
01800 
01801   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01802             "Since num_bytes_sent > 0, adjust the packet to account for "
01803             "the bytes that did get sent.\n"),5);
01804 
01805   // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
01806   // to account for the bytes that have been sent.
01807   const int result =
01808     this->adjust_packet_after_send(num_bytes_sent);
01809 
01810   if (result == 0) {
01811     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01812           "The adjustment logic says that the complete packet was "
01813           "sent.  Return OUTCOME_COMPLETE_SEND.\n"));
01814     return OUTCOME_COMPLETE_SEND;
01815   }
01816 
01817   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01818         "The adjustment logic says that only a part of the packet was "
01819         "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01820 
01821   return OUTCOME_PARTIAL_SEND;
01822 }
01823 
01824 ssize_t
01825 TransportSendStrategy::non_blocking_send(const iovec iov[], int n, int& bp)
01826 {
01827   int val = 0;
01828   ACE_HANDLE handle = this->get_handle();
01829 
01830   if (handle == ACE_INVALID_HANDLE)
01831     return -1;
01832 
01833   ACE::record_and_set_non_blocking_mode(handle, val);
01834 
01835   // Set the back-pressure flag to false.
01836   bp = 0;
01837 
01838   // Clear errno
01839   errno = 0;
01840 
01841   ssize_t result = this->send_bytes_i(iov, n);
01842 
01843   if (result == -1) {
01844     if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01845       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
01846             "Backpressure encountered.\n"));
01847       // Set the back-pressure flag to true
01848       bp = 1;
01849 
01850     } else {
01851       VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01852                 ACE_TEXT("sendv"), n),1);
01853 
01854       // try to get the application to core when "Bad Address" is returned
01855       // by looking at the iovec
01856       for (int ii = 0; ii < n; ii++) {
01857         ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01858                    ii, iov[ii].iov_len, iov[ii].iov_base));
01859       }
01860     }
01861   }
01862 
01863   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
01864             "The sendv() returned [%d].\n", result), 5);
01865 
01866   ACE::restore_non_blocking_mode(handle, val);
01867 
01868   return result;
01869 }
01870 
01871 void
01872 TransportSendStrategy::add_delayed_notification(TransportQueueElement* element)
01873 {
01874   if (Transport_debug_level) {
01875     size_t size = this->delayed_delivered_notification_queue_.size();
01876     if ((size > 0) && (size % this->max_samples_ == 0)) {
01877       ACE_DEBUG((LM_DEBUG,
01878                  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01879                  size));
01880     }
01881   }
01882 
01883   this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01884 }
01885 
01886 size_t
01887 TransportSendStrategy::space_available() const
01888 {
01889   const size_t used = this->max_header_size_ + this->header_.length_,
01890     max_msg = this->max_message_size();
01891   if (max_msg) {
01892     return std::min(this->max_size_ - used, max_msg - used);
01893   }
01894   return this->max_size_ - used;
01895 }
01896 
01897 int
01898 TransportSendStrategy::mb_to_iov(const ACE_Message_Block& msg, iovec* iov)
01899 {
01900   int num_blocks = 0;
01901 #ifdef _MSC_VER
01902 #pragma warning(push)
01903 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
01904 // since on other platforms iov_len is 64-bit
01905 #pragma warning(disable : 4267)
01906 #endif
01907   for (const ACE_Message_Block* block = &msg;
01908        block && num_blocks < MAX_SEND_BLOCKS;
01909        block = block->cont()) {
01910     iov[num_blocks].iov_len = block->length();
01911     iov[num_blocks++].iov_base = block->rd_ptr();
01912   }
01913 #ifdef _MSC_VER
01914 #pragma warning(pop)
01915 #endif
01916   return num_blocks;
01917 }
01918 
01919 // close namespaces
01920 }
01921 }

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7