OpenDDS::DCPS::TransportSendStrategy Class Reference

#include <TransportSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::TransportSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportSendStrategy:
Collaboration graph
[legend]

List of all members.

Public Types

enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}
typedef BasicQueue
< TransportQueueElement
QueueType

Public Member Functions

virtual ~TransportSendStrategy ()
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer.
int start ()
void stop ()
void send_start ()
void send (TransportQueueElement *element, bool relink=true)
void send_stop (RepoId repoId)
RemoveResult remove_sample (const DataSampleElement *sample, void *context)
void remove_all_msgs (RepoId pub_id)
virtual WorkOutcome perform_work ()
virtual void relink (bool do_suspend=true)
void suspend_send ()
void resume_send ()
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue.
virtual void stop_i ()=0
 Let the subclass stop.
virtual bool start_i ()
 Let the subclass start.
void link_released (bool flag)
bool isDirectMode ()
virtual ACE_HANDLE get_handle ()
void deliver_ack_request (TransportQueueElement *element)
void clear (SendMode mode=MODE_DIRECT)
 Clear queued messages and messages in current packet.
SendMode mode () const
 Access the current sending mode.

Static Public Member Functions

static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)

Protected Member Functions

 TransportSendStrategy (std::size_t id, TransportImpl &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
virtual ssize_t send_bytes_i (const iovec iov[], int n)=0
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header.
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet.
TransportQueueElementcurrent_packet_first_element () const
virtual size_t max_message_size () const
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag.
virtual void add_delayed_notification (TransportQueueElement *element)
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
virtual RemoveResult do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria, void *context)
 Implement framework chain visitations to remove a sample.
ThreadSynchsynch () const

Protected Attributes

TransportHeader header_
 Current transport packet header.

Static Protected Attributes

static const size_t UDP_MAX_MESSAGE_SIZE = 65466

Private Types

enum  SendPacketOutcome {
  OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST,
  OUTCOME_SEND_ERROR
}
typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef std::pair
< TransportQueueElement
*, SendMode
TQESendModePair
 Used for delayed notifications when performing work.

Private Member Functions

void direct_send (bool relink)
void get_packet_elems_from_queue ()
void prepare_header ()
void prepare_packet ()
SendPacketOutcome send_packet ()
ssize_t do_send_packet (const ACE_Message_Block *packet, int &bp)
 Form an IOV and call the send_bytes() template method.
int adjust_packet_after_send (ssize_t num_bytes_sent)
size_t space_available () const
virtual bool marshal_transport_header (ACE_Message_Block *mb)
 OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_

Static Private Member Functions

static const char * mode_as_str (SendMode mode)
 Helper function to debugging.

Private Attributes

size_t max_samples_
 Configuration - max number of samples per transport packet.
ACE_UINT32 optimum_size_
 Configuration - optimum transport packet size (bytes).
ACE_UINT32 max_size_
 Configuration - max transport packet size (bytes).
QueueType queue_
size_t max_header_size_
 Maximum marshalled size of the transport packet header.
ACE_Message_Blockheader_block_
 Current transport packet header, marshalled.
SequenceNumber header_sequence_
 Current transport header sequence number.
QueueType elems_
ACE_Message_Blockpkt_chain_
bool header_complete_
unsigned start_counter_
SendMode mode_
 This mode determines how send() calls will be handled.
SendMode mode_before_suspend_
unique_ptr
< TransportMessageBlockAllocator
header_mb_allocator_
 Allocator for header data block.
unique_ptr
< TransportDataBlockAllocator
header_db_allocator_
 Allocator for header message block.
unique_ptr< ThreadSynchsynch_
 The thread synch object.
LockType lock_
MessageBlockAllocator replaced_element_mb_allocator_
 Cached allocator for TransportReplaceElement.
DataBlockAllocator replaced_element_db_allocator_
TransportImpltransport_
bool graceful_disconnecting_
bool link_released_
TransportSendBuffersend_buffer_

Friends

class TransportSendBuffer

Detailed Description

This class provides methods to fill packets with samples for sending and handles backpressure. It maintains the list of samples in current packets and also the list of samples queued during backpressure. A thread per connection is created to handle the queued samples.

Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.

Definition at line 49 of file TransportSendStrategy.h.


Member Typedef Documentation

Definition at line 267 of file TransportSendStrategy.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportSendStrategy::LockType [private]

Definition at line 266 of file TransportSendStrategy.h.

Definition at line 129 of file TransportSendStrategy.h.

Used for delayed notifications when performing work.

Definition at line 367 of file TransportSendStrategy.h.


Member Enumeration Documentation

Enumerator:
MODE_NOT_SET 
MODE_DIRECT 
MODE_QUEUE 
MODE_SUSPEND 
MODE_TERMINATED 

Definition at line 270 of file TransportSendStrategy.h.

00270                 {
00271     // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
00272     // we can check if the resume_send is paired with suspend_send.
00273     MODE_NOT_SET,
00274     // Send out the sample with current packet.
00275     MODE_DIRECT,
00276     // The samples need be queued because of the backpressure or partial send.
00277     MODE_QUEUE,
00278     // The samples need be queued because the connection is lost and we are
00279     // trying to reconnect.
00280     MODE_SUSPEND,
00281     // The samples need be dropped since we lost connection and could not
00282     // reconnect.
00283     MODE_TERMINATED
00284   };

Enumerator:
OUTCOME_COMPLETE_SEND 
OUTCOME_PARTIAL_SEND 
OUTCOME_BACKPRESSURE 
OUTCOME_PEER_LOST 
OUTCOME_SEND_ERROR 

Definition at line 196 of file TransportSendStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportSendStrategy::~TransportSendStrategy (  )  [virtual]

Definition at line 103 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

00104 {
00105   DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00106 
00107 
00108   this->delayed_delivered_notification_queue_.clear();
00109 }

OpenDDS::DCPS::TransportSendStrategy::TransportSendStrategy ( std::size_t  id,
TransportImpl transport,
ThreadSynchResource synch_resource,
Priority  priority,
const ThreadSynchStrategy_rch thread_sync_strategy 
) [protected]

Definition at line 58 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, max_header_size_, OpenDDS::DCPS::TransportHeader::max_marshaled_size(), max_samples_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), synch_, TheServiceParticipant, and OpenDDS::DCPS::DirectPriorityMapper::thread_priority().

00064   : ThreadSynchWorker(id),
00065     max_samples_(transport.config().max_samples_per_packet_),
00066     optimum_size_(transport.config().optimum_packet_size_),
00067     max_size_(transport.config().max_packet_size_),
00068     max_header_size_(0),
00069     header_block_(0),
00070     pkt_chain_(0),
00071     header_complete_(false),
00072     start_counter_(0),
00073     mode_(MODE_DIRECT),
00074     mode_before_suspend_(MODE_NOT_SET),
00075     lock_(),
00076     replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00077     replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00078     transport_(transport),
00079     graceful_disconnecting_(false),
00080     link_released_(true),
00081     send_buffer_(0)
00082 {
00083   DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00084 
00085   // Create a ThreadSynch object just for us.
00086   DirectPriorityMapper mapper(priority);
00087   this->synch_.reset(thread_sync_strategy->create_synch_object(
00088                    synch_resource,
00089 #ifdef ACE_WIN32
00090                    ACE_DEFAULT_THREAD_PRIORITY,
00091 #else
00092                    mapper.thread_priority(),
00093 #endif
00094                    TheServiceParticipant->scheduler()));
00095 
00096   // We cache this value in data member since it doesn't change, and we
00097   // don't want to keep asking for it over and over.
00098   this->max_header_size_ = TransportHeader::max_marshaled_size();
00099 
00100   delayed_delivered_notification_queue_.reserve(this->max_samples_);
00101 }

Here is the call graph for this function:


Member Function Documentation

void OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification ( TransportQueueElement element  )  [protected, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::TcpSendStrategy.

Definition at line 1857 of file TransportSendStrategy.cpp.

References LM_DEBUG, max_samples_, mode_, size, and OpenDDS::DCPS::Transport_debug_level.

Referenced by adjust_packet_after_send(), and send().

01858 {
01859   if (Transport_debug_level) {
01860     size_t size = this->delayed_delivered_notification_queue_.size();
01861     if ((size > 0) && (size % this->max_samples_ == 0)) {
01862       ACE_DEBUG((LM_DEBUG,
01863                  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01864                  size));
01865     }
01866   }
01867 
01868   this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01869 }

Here is the caller graph for this function:

int OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send ( ssize_t  num_bytes_sent  )  [private]

This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.

Definition at line 353 of file TransportSendStrategy.cpp.

References add_delayed_notification(), ACE_Message_Block::base(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_INFO, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), and VDBG.

Referenced by clear(), and send_packet().

00354 {
00355   DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00356 
00357   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00358         "Adjusting the current packet because %d bytes of the packet "
00359         "have been sent.\n", num_bytes_sent));
00360 
00361   ssize_t num_bytes_left = num_bytes_sent;
00362   ssize_t num_non_header_bytes_sent = 0;
00363 
00364   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00365         "Set num_bytes_left to %d.\n", num_bytes_left));
00366   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00367         "Set num_non_header_bytes_sent to %d.\n",
00368         num_non_header_bytes_sent));
00369 
00370   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00371         "Peek at the element at the front of the packet elems_.\n"));
00372 
00373   // This is the element currently at the front of elems_.
00374   TransportQueueElement* element = this->elems_.peek();
00375 
00376   if(!element){
00377     ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00378   } else {
00379     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00380           "Use the element's msg() to find the last block in "
00381           "the msg() chain.\n"));
00382 
00383     // Get a pointer to the last message block in the element.
00384     const ACE_Message_Block* elem_tail_block = element->msg();
00385 
00386     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00387           "Start with tail block == element->msg().\n"));
00388 
00389     while (elem_tail_block->cont() != 0) {
00390       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00391             "Set tail block to its cont() block (next in chain).\n"));
00392       elem_tail_block = elem_tail_block->cont();
00393     }
00394 
00395     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00396           "Tail block now set (because tail block's cont() is 0).\n"));
00397 
00398     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00399           "Start the 'while (num_bytes_left > 0)' loop.\n"));
00400 
00401     while (num_bytes_left > 0) {
00402       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00403             "At top of 'num bytes left' loop.  num_bytes_left == [%d].\n",
00404             num_bytes_left));
00405 
00406       const int block_length = static_cast<int>(this->pkt_chain_->length());
00407 
00408       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00409             "Length of block at front of pkt_chain_ is [%d].\n",
00410             block_length));
00411 
00412       if (block_length <= num_bytes_left) {
00413         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00414               "The whole block at the front of pkt_chain_ was sent.\n"));
00415 
00416         // The entire message block at the front of the chain has been sent.
00417         // Detach the head message block from the chain and adjust
00418         // the pkt_chain_ to point to the next block (if any) in
00419         // the chain.
00420         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00421               "Extract the fully sent block from the pkt_chain_.\n"));
00422 
00423         ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00424 
00425         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00426               "Set pkt_chain_ to pkt_chain_->cont().\n"));
00427 
00428         this->pkt_chain_ = this->pkt_chain_->cont();
00429 
00430         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00431               "Set the fully sent block's cont() to 0.\n"));
00432 
00433         fully_sent_block->cont(0);
00434 
00435         // Update the num_bytes_left to indicate that we have
00436         // processed the entire length of the block.
00437         num_bytes_left -= block_length;
00438 
00439         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00440               "Updated num_bytes_left to account for fully sent "
00441               "block (block_length == [%d]).\n", block_length));
00442         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00443               "Now, num_bytes_left == [%d].\n", num_bytes_left));
00444 
00445         if (!this->header_complete_) {
00446           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00447                 "Since the header_complete_ flag is false, it means "
00448                 "that the packet header block was still in the "
00449                 "pkt_chain_.\n"));
00450 
00451           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00452                 "Not anymore...  Set the header_complete_ flag "
00453                 "to true.\n"));
00454 
00455           // That was the packet header block.  And now we know that it
00456           // has been completely sent.
00457           this->header_complete_ = true;
00458 
00459           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00460                 "Release the fully sent block.\n"));
00461 
00462           // Release the fully_sent_block
00463           fully_sent_block->release();
00464 
00465         } else {
00466           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00467                 "Since the header_complete_ flag is true, it means "
00468                 "that the packet header block was not in the "
00469                 "pkt_chain_.\n"));
00470           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00471                 "So, the fully sent block was part of an element.\n"));
00472 
00473           // That wasn't the packet header block.  It was from the
00474           // element currently at the front of the elems_
00475           // collection.  If it was the last block from the
00476           // element, then we need to extract the element from the
00477           // elems_ collection and invoke data_delivered() on it.
00478           num_non_header_bytes_sent += block_length;
00479 
00480           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00481                 "Updated num_non_header_bytes_sent to account for "
00482                 "fully sent block (block_length == [%d]).\n",
00483                 block_length));
00484 
00485           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00486                 "Now, num_non_header_bytes_sent == [%d].\n",
00487                 num_non_header_bytes_sent));
00488 
00489           if (fully_sent_block->base() == elem_tail_block->base()) {
00490             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00491                   "Ok.  The fully sent block was a duplicate of "
00492                   "the tail block of the element that is at the "
00493                   "front of the packet elems_.\n"));
00494 
00495             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00496                   "This means that we have completely sent the "
00497                   "element at the front of the packet elems_.\n"));
00498 
00499             // This means that we have completely sent the element
00500             // that is currently at the front of the elems_ collection.
00501 
00502             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00503                   "We can release the fully sent block now.\n"));
00504 
00505             // Release the fully_sent_block
00506             fully_sent_block->release();
00507 
00508             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00509                   "We can extract the element from the front of "
00510                   "the packet elems_ (we were just peeking).\n"));
00511 
00512             // Extract the element from the elems_ collection
00513             element = this->elems_.get();
00514 
00515             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00516                   "Tell the element that a decision has been made "
00517                   "regarding its fate - data_delivered().\n"));
00518 
00519             // Inform the element that the data has been delivered.
00520             this->add_delayed_notification(element);
00521 
00522             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00523                   "Peek at the next element in the packet "
00524                   "elems_.\n"));
00525 
00526             // Set up for the next element in elems_ by peek()'ing.
00527             element = this->elems_.peek();
00528 
00529             if (element != 0) {
00530               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00531                     "The is an element still in the packet "
00532                     "elems_ (we are peeking at it now).\n"));
00533 
00534               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00535                     "We are going to find the tail block for the "
00536                     "current element (we are peeking at).\n"));
00537 
00538               // There was a "next element".  Determine the
00539               // elem_tail_block for it.
00540               elem_tail_block = element->msg();
00541 
00542               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00543                     "Start w/tail block == element->msg().\n"));
00544 
00545               while (elem_tail_block->cont() != 0) {
00546                 VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00547                       "Set tail block to next in chain.\n"));
00548                 elem_tail_block = elem_tail_block->cont();
00549               }
00550 
00551               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00552                     "Done finding tail block.\n"));
00553             }
00554 
00555           } else {
00556             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00557                   "Ok.  The fully sent block is *not* a "
00558                   "duplicate of the tail block of the element "
00559                   "at the front of the packet elems_.\n"));
00560 
00561             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00562                   "Thus, we have not completely sent the "
00563                   "element yet.\n"));
00564 
00565             // We didn't completely send the element - it has more
00566             // message blocks that haven't been sent (that we know of).
00567 
00568             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00569                   "We can release the fully_sent_block now.\n"));
00570 
00571             // Release the fully_sent_block
00572             fully_sent_block->release();
00573           }
00574         }
00575 
00576       } else {
00577         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00578               "Only part of the block at the front of pkt_chain_ "
00579               "was sent.\n"));
00580 
00581         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00582               "Advance the rd_ptr() of the front block (of pkt_chain_) "
00583               "by the num_bytes_left (%d).\n", num_bytes_left));
00584 
00585         // Only part of the current block was sent.
00586         this->pkt_chain_->rd_ptr(num_bytes_left);
00587 
00588         if (this->header_complete_) {
00589           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00590                 "And since the packet header block has already been "
00591                 "completely sent, add num_bytes_left to the "
00592                 "num_non_header_bytes_sent.\n"));
00593 
00594           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00595                 "Before, num_non_header_bytes_sent == %d.\n",
00596                 num_non_header_bytes_sent));
00597 
00598           // We know that the current block isn't the packet header
00599           // block because the packet header block has already been
00600           // completely sent.  We need to count these bytes in the
00601           // num_non_header_bytes_sent.
00602           num_non_header_bytes_sent += num_bytes_left;
00603 
00604           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00605                 "After, num_non_header_bytes_sent == %d.\n",
00606                 num_non_header_bytes_sent));
00607         }
00608 
00609         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00610               "Set the num_bytes_left to 0 now.\n"));
00611 
00612         num_bytes_left = 0;
00613       }
00614     }
00615   }
00616 
00617   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00618         "The 'num_bytes_left' loop has completed.\n"));
00619 
00620   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00621         "Adjust the header_.length_ to account for the "
00622         "num_non_header_bytes_sent.\n"));
00623   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00624         "Before, header_.length_ == %d.\n",
00625         this->header_.length_));
00626 
00627   // Adjust the packet header_.length_ to indicate how many non header
00628   // bytes are left to send.
00629   this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00630 
00631   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00632         "After, header_.length_ == %d.\n",
00633         this->header_.length_));
00634 
00635   // Returns 0 if the entire packet was sent, and returns 1 otherwise.
00636   int rc = (this->header_.length_ == 0) ? 0 : 1;
00637 
00638   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00639         "Adjustments all done.  Returning [%d].  0 means entire packet "
00640         "has been sent.  1 means otherwise.\n",
00641         rc));
00642 
00643   return rc;
00644 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::clear ( SendMode  mode = MODE_DIRECT  ) 

Clear queued messages and messages in current packet.

Definition at line 759 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), ACE_Message_Block::total_length(), and VDBG.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), and terminate_send().

00760 {
00761   DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00762 
00763   send_delayed_notifications();
00764   QueueType elems;
00765   QueueType queue;
00766   {
00767     GuardType guard(this->lock_);
00768 
00769     if (this->header_.length_ > 0) {
00770       // Clear the messages in the pkt_chain_ that is partially sent.
00771       // We just reuse these functions for normal partial send except actual sending.
00772       int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00773       int result = this->adjust_packet_after_send(num_bytes_left);
00774 
00775       if (result == 0) {
00776         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00777               "The adjustment logic says that the packet is cleared.\n"));
00778 
00779       } else {
00780         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00781               "The adjustment returned partial sent.\n"));
00782       }
00783     }
00784 
00785     elems.swap(this->elems_);
00786     queue.swap(this->queue_);
00787 
00788     this->header_.length_ = 0;
00789     this->pkt_chain_ = 0;
00790     this->header_complete_ = false;
00791     this->start_counter_ = 0;
00792     this->mode_ = mode;
00793     this->mode_before_suspend_ = MODE_NOT_SET;
00794   }
00795 
00796   // We need remove the queued elements outside the lock,
00797   // otherwise we have a deadlock situation when remove vistor
00798   // calls the data_droped on each dropped elements.
00799 
00800   // Clear all samples in queue.
00801   RemoveAllVisitor remove_all_visitor;
00802 
00803   elems.accept_remove_visitor(remove_all_visitor);
00804   queue.accept_remove_visitor(remove_all_visitor);
00805 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE OpenDDS::DCPS::TransportQueueElement * OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element (  )  const [protected]

Definition at line 143 of file TransportSendStrategy.inl.

References elems_, and OpenDDS::DCPS::BasicQueue< T >::peek().

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().

00144 {
00145   return this->elems_.peek();
00146 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request ( TransportQueueElement element  ) 

Definition at line 1872 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), and lock_.

01873 {
01874   GuardType guard(this->lock_);
01875   element->data_delivered();
01876 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::direct_send ( bool  relink  )  [private]

Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.

Definition at line 1414 of file TransportSendStrategy.cpp.

References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::config(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportInst::dump(), LM_DEBUG, LM_WARNING, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), transport_, OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

Referenced by send(), and send_stop().

01415 {
01416   DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01417 
01418   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01419         "Prepare the current packet for a direct send attempt.\n"));
01420 
01421   // Prepare the packet for sending.
01422   this->prepare_packet();
01423 
01424   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01425         "Now attempt to send the packet.\n"));
01426 
01427   // We will try resend the packet if the send() fails and then connection
01428   // is re-established.  Only loops if the "continue" line is hit.
01429   while (true) {
01430     // Attempt to send the packet
01431     const SendPacketOutcome outcome = this->send_packet();
01432 
01433     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01434           "The outcome of the send_packet() was %d.\n", outcome));
01435 
01436     if ((outcome == OUTCOME_BACKPRESSURE) ||
01437         (outcome == OUTCOME_PARTIAL_SEND)) {
01438       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01439                 "The outcome of the send_packet() was either "
01440                 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01441 
01442       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01443                 "Flip into the MODE_QUEUE mode_.\n"), 5);
01444 
01445       // We encountered backpressure, or only sent part of the packet.
01446       this->mode_ = MODE_QUEUE;
01447 
01448     } else if ((outcome == OUTCOME_PEER_LOST) ||
01449                (outcome == OUTCOME_SEND_ERROR)) {
01450       if (outcome == OUTCOME_SEND_ERROR) {
01451         ACE_ERROR((LM_WARNING,
01452                    ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01453                    ACE_TEXT("send buffer management: %p.\n"),
01454                    ACE_TEXT("send_bytes")));
01455 
01456         if (Transport_debug_level > 0) {
01457           this->transport_.config().dump();
01458         }
01459       } else {
01460         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01461               "The outcome of the send_packet() was "
01462               "OUTCOME_PEER_LOST.\n"));
01463       }
01464 
01465       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01466                 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01467 
01468       if (this->mode_ != MODE_SUSPEND) {
01469         this->mode_before_suspend_ = this->mode_;
01470         this->mode_ = MODE_SUSPEND;
01471       }
01472 
01473       if (relink) {
01474         bool do_suspend = false;
01475         this->relink(do_suspend);
01476 
01477         if (this->mode_ == MODE_SUSPEND) {
01478           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01479                     "The reconnect has not done yet and we are "
01480                     "still in MODE_SUSPEND.\n"), 5);
01481 
01482         } else if (this->mode_ == MODE_TERMINATED) {
01483           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01484                     "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01485 
01486         } else {
01487           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01488                     "Try send the packet again since the connection "
01489                     "is re-established.\n"), 5);
01490 
01491           // Try send the packet again since the connection is re-established.
01492           continue;
01493         }
01494       }
01495 
01496     } else {
01497       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01498             "The outcome of the send_packet() must have been "
01499             "OUTCOME_COMPLETE_SEND.\n"));
01500       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01501             "So, we will just stay in MODE_DIRECT.\n"));
01502     }
01503 
01504     break;
01505   }
01506 
01507   // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
01508 }

Here is the call graph for this function:

Here is the caller graph for this function:

RemoveResult OpenDDS::DCPS::TransportSendStrategy::do_remove_sample ( const RepoId pub_id,
const TransportQueueElement::MatchCriteria criteria,
void *  context 
) [protected, virtual]

Implement framework chain visitations to remove a sample.

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1311 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, header_block_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, mode_, MODE_DIRECT, pkt_chain_, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), replaced_element_db_allocator_, replaced_element_mb_allocator_, OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::QueueRemoveVisitor::status(), status, and VDBG.

Referenced by remove_all_msgs(), and remove_sample().

01314 {
01315   DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01316 
01317   //ciju: Tim had the idea that we could do the following check
01318   // if ((this->mode_ == MODE_DIRECT) ||
01319   //     ((this->pkt_chain_ == 0) && (queue_ == empty)))
01320   // then we can assume that the sample can be safely removed (no need for
01321   // replacement) from the elems_ queue.
01322   if ((this->mode_ == MODE_DIRECT)
01323       || ((this->pkt_chain_ == 0) && (this->queue_.size() == 0))) {
01324     //ciju: I believe this is the only mode where a safe
01325     // assumption can be made that the samples
01326     // in the elems_ queue aren't part of a packet.
01327     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01328           "The mode is MODE_DIRECT, or the queue is empty and no "
01329           "transport packet is in progress.\n"));
01330 
01331     QueueRemoveVisitor simple_rem_vis(criteria);
01332     this->elems_.accept_remove_visitor(simple_rem_vis);
01333 
01334     const RemoveResult status = simple_rem_vis.status();
01335 
01336     if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01337       this->header_.length_ -= simple_rem_vis.removed_bytes();
01338 
01339     } else if (status == REMOVE_NOT_FOUND) {
01340       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01341             "Failed to find the sample to remove.\n"));
01342     }
01343 
01344     return status;
01345   }
01346 
01347   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01348         "Visit the queue_ with the RemoveElementVisitor.\n"));
01349 
01350   QueueRemoveVisitor simple_rem_vis(criteria);
01351   this->queue_.accept_remove_visitor(simple_rem_vis);
01352 
01353   RemoveResult status = simple_rem_vis.status();
01354 
01355   if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01356     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01357           "The sample was removed from the queue_.\n"));
01358     // This means that the visitor did not encounter any fatal error
01359     // along the way, *AND* the sample was found in the queue_,
01360     // and has now been removed.  We are done.
01361     return status;
01362   }
01363 
01364   if (status == REMOVE_ERROR) {
01365     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01366           "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01367     // This means that the visitor encountered some fatal error along
01368     // the way (and it already reported something to the log).
01369     // Return our failure code.
01370     return status;
01371   }
01372 
01373   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01374         "The RemoveElementVisitor did not find the sample in queue_.\n"));
01375 
01376   // We get here if the visitor did not encounter any fatal error, but it
01377   // also didn't find the sample - and hence it didn't perform any
01378   // "remove sample" logic.
01379 
01380   // Now we need to turn our attention to the current transport packet,
01381   // since the packet is likely in a "partially sent" state, and the
01382   // sample may still be contributing unsent bytes in the pkt_chain_.
01383 
01384   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01385         "Visit our elems_ with the PacketRemoveVisitor.\n"));
01386 
01387   PacketRemoveVisitor pac_rem_vis(criteria,
01388                                   this->pkt_chain_,
01389                                   this->header_block_,
01390                                   this->replaced_element_mb_allocator_,
01391                                   this->replaced_element_db_allocator_);
01392 
01393   this->elems_.accept_replace_visitor(pac_rem_vis);
01394 
01395   status = pac_rem_vis.status();
01396 
01397   if (status == REMOVE_ERROR) {
01398     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01399           "The PacketRemoveVisitor encountered a fatal error.\n"));
01400 
01401   } else if (status == REMOVE_NOT_FOUND) {
01402     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01403           "The PacketRemoveVisitor didn't find the sample.\n"));
01404 
01405   } else {
01406     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01407           "The PacketRemoveVisitor found the sample and removed it.\n"));
01408   }
01409 
01410   return status;
01411 }

Here is the call graph for this function:

Here is the caller graph for this function:

ssize_t OpenDDS::DCPS::TransportSendStrategy::do_send_packet ( const ACE_Message_Block packet,
int &  bp 
) [private]

Form an IOV and call the send_bytes() template method.

Definition at line 1684 of file TransportSendStrategy.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), send_bytes(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().

01685 {
01686   if (Transport_debug_level > 9) {
01687     ACE_DEBUG((LM_DEBUG,
01688                ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01689                ACE_TEXT("sending data at 0x%x.\n"),
01690                id(), packet));
01691   }
01692   DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01693 
01694 #if defined(OPENDDS_SECURITY)
01695   // pre_send_packet may provide different data that takes the place of the
01696   // original "packet" (used for security encryption/authentication)
01697   Message_Block_Ptr substitute(pre_send_packet(packet));
01698 #endif
01699 
01700   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01701             "Populate the iovec array using the packet.\n"), 5);
01702 
01703   iovec iov[MAX_SEND_BLOCKS];
01704 
01705 #if defined(OPENDDS_SECURITY)
01706   const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
01707 #else
01708   const int num_blocks = mb_to_iov(*packet, iov);
01709 #endif
01710 
01711   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01712             "There are [%d] number of entries in the iovec array.\n",
01713             num_blocks), 5);
01714 
01715   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01716             "Attempt to send_bytes() now.\n"), 5);
01717 
01718   const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01719 
01720   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01721             "The send_bytes() said that num_bytes_sent == [%d].\n",
01722             num_bytes_sent), 5);
01723 
01724 #if defined(OPENDDS_SECURITY)
01725   if (substitute && num_bytes_sent > 0) {
01726     // Although the "substitute" data took the place of "packet", the rest
01727     // of the framework needs to account for the bytes in "packet" being taken
01728     // care of, as if they were actually sent.
01729     // Since this is done with datagram sockets, partial sends aren't possible.
01730     return packet->total_length();
01731   }
01732 #endif
01733 
01734   return num_bytes_sent;
01735 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_HANDLE OpenDDS::DCPS::TransportSendStrategy::get_handle ( void   )  [virtual]

Implements OpenDDS::DCPS::ThreadSynchWorker.

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 130 of file TransportSendStrategy.inl.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().

00131 {
00132   return ACE_INVALID_HANDLE;
00133 }

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::get_packet_elems_from_queue (  )  [private]

This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.

After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.

Definition at line 1511 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, LM_TRACE, max_message_size(), max_samples_, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), OpenDDS::DCPS::BasicQueue< T >::size(), space_available(), and VDBG_LVL.

Referenced by perform_work().

01512 {
01513   DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01514 
01515   for (TransportQueueElement* element = this->queue_.peek(); element != 0;
01516        element = this->queue_.peek()) {
01517 
01518     // Total number of bytes in the current element's message block chain.
01519     size_t element_length = element->msg()->total_length();
01520 
01521     // Flag used to determine if the element requires a packet all to itself.
01522     const bool exclusive_packet = element->requires_exclusive_packet();
01523 
01524     const size_t avail = this->space_available();
01525 
01526     bool frag = false;
01527     if (element_length > avail) {
01528       // The current element won't fit into the current packet
01529       if (this->max_message_size()) { // fragmentation enabled
01530         this->header_.first_fragment_ = !element->is_fragment();
01531         VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting from queue\n"), 0);
01532         ElementPair ep = element->fragment(avail);
01533         element = ep.first;
01534         element_length = element->msg()->total_length();
01535         this->queue_.replace_head(ep.second);
01536         frag = true; // queue_ is already taken care of, don't get() later
01537       } else {
01538         break;
01539       }
01540     }
01541 
01542     // If exclusive and the current packet is empty, we won't violate the
01543     // exclusive_packet requirement by put()'ing the element
01544     // into the elems_ collection.
01545     if ((exclusive_packet && this->elems_.size() == 0)
01546         || !exclusive_packet) {
01547       // At this point, we have passed all of the pre-conditions and we can
01548       // now extract the current element from the queue_, put it into the
01549       // packet elems_, and adjust the packet header_.length_.
01550       this->elems_.put(frag ? element : this->queue_.get());
01551       if (this->header_.length_ == 0) {
01552         this->header_.last_fragment_ = !frag && element->is_fragment();
01553       }
01554       this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01555       VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Packetizing from queue\n"), 0);
01556     }
01557 
01558     // With exclusive and (elems_.size() != 0), we don't use the current
01559     // element as part of the packet.  We know that there is already
01560     // at least one element in the packet, and the current element
01561     // is going to need its own (exclusive) packet.  We will just
01562     // use the packet elems_ as it is now.  Always break once
01563     // we've encountered and dealt with the exclusive_packet case.
01564     // Also break if fragmentation was required.
01565     if (exclusive_packet || frag
01566         // If the current number of packet elems_ has reached the maximum
01567         // number of samples per packet, then we are done.
01568         || this->elems_.size() == this->max_samples_
01569         // If the current value of the header_.length_ exceeds (or equals)
01570         // the optimum_size_ for a packet, then we are done.
01571         || this->header_.length_ >= this->optimum_size_) {
01572       break;
01573     }
01574   }
01575 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode (  ) 

Definition at line 116 of file TransportSendStrategy.inl.

References mode_, and MODE_DIRECT.

00117 {
00118   return this->mode_ == MODE_DIRECT;
00119 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released ( bool  flag  ) 

Definition at line 42 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, link_released_, and lock_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::MulticastSendStrategy().

00043 {
00044   DBG_ENTRY_LVL("TransportSendStrategy","link_released",6);
00045 
00046   GuardType guard(this->lock_);
00047   this->link_released_ = flag;
00048 }

Here is the caller graph for this function:

bool OpenDDS::DCPS::TransportSendStrategy::marshal_transport_header ( ACE_Message_Block mb  )  [private, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1664 of file TransportSendStrategy.cpp.

References header_.

Referenced by prepare_packet().

01665 {
01666   return *mb << this->header_;
01667 }

Here is the caller graph for this function:

ACE_INLINE size_t OpenDDS::DCPS::TransportSendStrategy::max_message_size ( void   )  const [protected, virtual]

The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.

Reimplemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Definition at line 137 of file TransportSendStrategy.inl.

Referenced by get_packet_elems_from_queue(), send(), and space_available().

00138 {
00139   return 0;
00140 }

Here is the caller graph for this function:

int OpenDDS::DCPS::TransportSendStrategy::mb_to_iov ( const ACE_Message_Block msg,
iovec iov 
) [static]

Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.

Definition at line 1890 of file TransportSendStrategy.cpp.

References iovec::iov_base, iovec::iov_len, and OpenDDS::DCPS::MAX_SEND_BLOCKS.

Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().

01891 {
01892   int num_blocks = 0;
01893 #ifdef _MSC_VER
01894 #pragma warning(push)
01895 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
01896 // since on other platforms iov_len is 64-bit
01897 #pragma warning(disable : 4267)
01898 #endif
01899   for (const ACE_Message_Block* block = &msg;
01900        block && num_blocks < MAX_SEND_BLOCKS;
01901        block = block->cont()) {
01902     iov[num_blocks].iov_len = block->length();
01903     iov[num_blocks++].iov_base = block->rd_ptr();
01904   }
01905 #ifdef _MSC_VER
01906 #pragma warning(pop)
01907 #endif
01908   return num_blocks;
01909 }

Here is the caller graph for this function:

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE OpenDDS::DCPS::TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode ( void   )  const

Access the current sending mode.

Definition at line 14 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, and mode_.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().

00015 {
00016   DBG_ENTRY_LVL("TransportSendStrategy","mode",6);
00017 
00018   return mode_;
00019 }

Here is the caller graph for this function:

ACE_INLINE const char * OpenDDS::DCPS::TransportSendStrategy::mode_as_str ( SendMode  mode  )  [static, private]

Helper function to debugging.

Definition at line 102 of file TransportSendStrategy.inl.

Referenced by perform_work(), send(), and send_stop().

00103 {
00104   static const char* SendModeStr[] = { "MODE_NOT_SET",
00105                                        "MODE_DIRECT",
00106                                        "MODE_QUEUE",
00107                                        "MODE_SUSPEND",
00108                                        "MODE_TERMINATED",
00109                                        "UNKNOWN"
00110                                      };
00111 
00112   return SendModeStr [mode];
00113 }

Here is the caller graph for this function:

ssize_t OpenDDS::DCPS::TransportSendStrategy::non_blocking_send ( const iovec  iov[],
int  n,
int &  bp 
) [protected, virtual]

Definition at line 1810 of file TransportSendStrategy.cpp.

References ACE_TEXT(), get_handle(), LM_DEBUG, LM_ERROR, ACE::record_and_set_non_blocking_mode(), ACE::restore_non_blocking_mode(), send_bytes_i(), VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().

01811 {
01812   int val = 0;
01813   ACE_HANDLE handle = this->get_handle();
01814 
01815   if (handle == ACE_INVALID_HANDLE)
01816     return -1;
01817 
01818   ACE::record_and_set_non_blocking_mode(handle, val);
01819 
01820   // Set the back-pressure flag to false.
01821   bp = 0;
01822 
01823   // Clear errno
01824   errno = 0;
01825 
01826   ssize_t result = this->send_bytes_i(iov, n);
01827 
01828   if (result == -1) {
01829     if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01830       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
01831             "Backpressure encountered.\n"));
01832       // Set the back-pressure flag to true
01833       bp = 1;
01834 
01835     } else {
01836       VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01837                 ACE_TEXT("sendv"), n),1);
01838 
01839       // try to get the application to core when "Bad Address" is returned
01840       // by looking at the iovec
01841       for (int ii = 0; ii < n; ii++) {
01842         ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01843                    ii, iov[ii].iov_len, iov[ii].iov_base));
01844       }
01845     }
01846   }
01847 
01848   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
01849             "The sendv() returned [%d].\n", result), 5);
01850 
01851   ACE::restore_non_blocking_mode(handle, val);
01852 
01853   return result;
01854 }

Here is the call graph for this function:

Here is the caller graph for this function:

OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR ( TQESendModePair   )  [private]

Referenced by send_delayed_notifications().

Here is the caller graph for this function:

ThreadSynchWorker::WorkOutcome OpenDDS::DCPS::TransportSendStrategy::perform_work ( void   )  [virtual]

Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.

Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.

Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.

Implements OpenDDS::DCPS::ThreadSynchWorker.

Definition at line 122 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), queue_, relink(), send_delayed_notifications(), send_packet(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.

00123 {
00124   DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00125 
00126   SendPacketOutcome outcome;
00127   bool no_more_work = false;
00128 
00129   { // scope for the guard(this->lock_);
00130     GuardType guard(this->lock_);
00131 
00132     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00133 
00134     if (this->mode_ == MODE_TERMINATED) {
00135       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00136                 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00137                 "we lost connection and could not reconnect, just return "
00138                 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00139       return WORK_OUTCOME_BROKEN_RESOURCE;
00140     }
00141 
00142     // The perform_work() is called by our synch_ object using
00143     // a thread designated to call this method when it thinks
00144     // we need to be called in order to "service" the queue_ and/or
00145     // deal with a partially sent current packet.
00146     //
00147     // We will return a 0 if we don't see a need to have our perform_work()
00148     // called again, and we will return a 1 if we do see the need to have our
00149     // perform_work() method called again.
00150 
00151     // First, make sure that the mode_ indicates that we are, indeed, in
00152     // the MODE_QUEUE mode.  If we are not in MODE_QUEUE mode (meaning we are
00153     // in MODE_DIRECT), then it means we didn't need to have this perform_work()
00154     // method called - in this case, do nothing other than return
00155     // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
00156     // see a need for it to call our perform_work() again (at least not
00157     // right now).
00158     if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00159       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00160                 "Entered perform_work() and mode_ is %C - just return "
00161                 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00162       return WORK_OUTCOME_NO_MORE_TO_DO;
00163     }
00164 
00165     // Check the "state" of the current packet.  We will either find that the
00166     // current packet is in a state of being "partially sent", or we will find
00167     // it in a state of being "empty".  When the current packet is "empty", it
00168     // means that it is time to build up the current packet using elements
00169     // extracted from the queue_, and then we will attempt to send the
00170     // packet.  When we find the current packet in the "partially sent" state,
00171     // we will not touch the queue_ - we will just try to send the unsent
00172     // bytes in the current (partially sent) packet.
00173     const size_t header_length = this->header_.length_;
00174 
00175     if (header_length == 0) {
00176       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00177                 "The current packet doesn't have any unsent bytes - we "
00178                 "need to 'populate' the current packet with elems from "
00179                 "the queue.\n"), 5);
00180 
00181       // The current packet is "empty".  Build up the current packet using
00182       // elements from the queue_, and prepare the current packet to be sent.
00183 
00184       // Before we build the packet from the queue_, let's make sure that
00185       // there is actually something on the queue_ to build from.
00186       if (this->queue_.size() == 0) {
00187         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00188                   "But the queue is empty.  We have cleared the "
00189                   "backpressure situation.\n"),5);
00190         // We are here because the queue_ is empty, and there isn't
00191         // any "partial packet" bytes left to send.  We have overcome
00192         // the backpressure situation and don't have anything to do
00193         // right now.
00194 
00195         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00196                   "Flip mode to MODE_DIRECT, and return "
00197                   "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00198 
00199         // Flip the mode back to MODE_DIRECT.
00200         this->mode_ = MODE_DIRECT;
00201 
00202         // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
00203         // perform_work() doesn't need to be called again (at this time).
00204         return WORK_OUTCOME_NO_MORE_TO_DO;
00205       }
00206 
00207       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00208                 "There is at least one elem in the queue - get the packet "
00209                 "elems from the queue.\n"), 5);
00210 
00211       // There is stuff in the queue_ if we get to this point in the logic.
00212       // Build-up the current packet using element(s) from the queue_.
00213       this->get_packet_elems_from_queue();
00214 
00215       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00216                 "Prepare the packet from the packet elems_.\n"), 5);
00217 
00218       // Now we can prepare the new packet to be sent.
00219       this->prepare_packet();
00220 
00221       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00222                 "Packet has been prepared from packet elems_.\n"), 5);
00223 
00224     } else {
00225       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00226                 "We have a current packet that still has unsent bytes.\n"), 5);
00227     }
00228 
00229     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00230               "Attempt to send the current packet.\n"), 5);
00231 
00232     // Now we can attempt to send the current packet - whether it is
00233     // a "partially sent" packet or one that we just built-up using elements
00234     // from the queue_ (and subsequently prepared for sending) - it doesn't
00235     // matter.  Just attempt to send as many of the "unsent" bytes in the
00236     // packet as possible.
00237     outcome = this->send_packet();
00238 
00239     // If we sent the whole packet (eg, partial_send is false), and the queue_
00240     // is now empty, then we've cleared the backpressure situation.
00241     if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_.size() == 0)) {
00242       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00243                 "Flip the mode to MODE_DIRECT, and then return "
00244                 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00245 
00246       // Revert back to MODE_DIRECT mode.
00247       this->mode_ = MODE_DIRECT;
00248       no_more_work = true;
00249     }
00250   } // End of scope for guard(this->lock_);
00251 
00252   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00253             "The outcome of the send_packet() was %d.\n", outcome), 5);
00254 
00255   send_delayed_notifications();
00256 
00257   // If we sent the whole packet (eg, partial_send is false), and the queue_
00258   // is now empty, then we've cleared the backpressure situation.
00259   if (no_more_work) {
00260     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00261               "We sent the whole packet, and there is nothing left on "
00262               "the queue now.\n"), 5);
00263 
00264     // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00265     // don't desire another call to this perform_work() method.
00266     return WORK_OUTCOME_NO_MORE_TO_DO;
00267   }
00268 
00269   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00270             "We still have unsent bytes in the current packet AND/OR there "
00271             "are still elements in the queue.\n"), 5);
00272 
00273   if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00274     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00275               "We lost our connection, or had some fatal connection "
00276               "error.  Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00277 
00278     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00279               "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00280 
00281     bool do_suspend = true;
00282     this->relink(do_suspend);
00283 
00284     if (this->mode_ == MODE_SUSPEND) {
00285       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00286                 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00287                 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00288       // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00289       // don't desire another call to this perform_work() method.
00290       return WORK_OUTCOME_NO_MORE_TO_DO;
00291 
00292     } else if (this->mode_ == MODE_TERMINATED) {
00293       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00294                 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00295       return WORK_OUTCOME_BROKEN_RESOURCE;
00296 
00297     } else {
00298       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00299                 "Reconnect succeeded, Notify synch thread of work "
00300                 "availability.\n"), 5);
00301       // If the datalink is re-established then notify the synch
00302       // thread to perform work.  We do not hold the object lock at
00303       // this point.
00304       this->synch_->work_available();
00305     }
00306   }
00307 
00308   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00309             "We still have an 'unbroken' connection.\n"), 5);
00310 
00311   if (outcome == OUTCOME_BACKPRESSURE) {
00312     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00313               "We experienced backpressure on our attempt to send the "
00314               "packet.  Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00315     // We have a "clogged resource".
00316     return WORK_OUTCOME_CLOGGED_RESOURCE;
00317   }
00318 
00319   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00320             "We may have sent the whole current packet, but still have "
00321             "elements on the queue.\n"), 5);
00322   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00323             "Or, we may have only partially sent the current packet.\n"), 5);
00324 
00325   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00326             "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00327 
00328   // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
00329   // in the queue_ to be sent.  *OR* we have have had an OUTCOME_PARTIAL_SEND,
00330   // which equates to the same thing - we still have work to do.
00331 
00332   // We are still in MODE_QUEUE mode, thus there is still work to be
00333   // done to service the queue_ and/or a partially sent current packet.
00334   // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
00335   // want it to call this perform_work() method.
00336   return WORK_OUTCOME_MORE_TO_DO;
00337 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::prepare_header (  )  [private]

This method is responsible for updating the packet header. Called exclusively by prepare_packet.

Definition at line 1578 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.

Referenced by prepare_packet().

01579 {
01580   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01581 
01582   // Increment header sequence for packet:
01583   this->header_.sequence_ = ++this->header_sequence_;
01584 
01585   // Allow the specific implementation the opportunity to set
01586   // values in the packet header.
01587   this->prepare_header_i();
01588 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::prepare_header_i (  )  [protected, virtual]

Specific implementation processing of prepared packet header.

Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.

Definition at line 1591 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_header().

01592 {
01593   DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01594 
01595   // Default implementation does nothing.
01596 }

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet (  )  [private]

This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.

Definition at line 1599 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::BuildChainVisitor::chain(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, ACE_Message_Block::duplicate(), elems_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header_block_, header_complete_, header_db_allocator_, header_mb_allocator_, LM_DEBUG, marshal_transport_header(), max_header_size_, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, pkt_chain_, prepare_header(), prepare_packet_i(), ACE_Message_Block::release(), VDBG, and ACE_Time_Value::zero.

Referenced by direct_send(), and perform_work().

01600 {
01601   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01602 
01603   // Prepare the header for sending.
01604   this->prepare_header();
01605 
01606   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01607         "Marshal the packet header.\n"));
01608 
01609   if (this->header_block_ != 0) {
01610     this->header_block_->release();
01611   }
01612 
01613   ACE_NEW_MALLOC(this->header_block_,
01614     static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01615     ACE_Message_Block(this->max_header_size_,
01616                       ACE_Message_Block::MB_DATA,
01617                       0,
01618                       0,
01619                       0,
01620                       0,
01621                       ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01622                       ACE_Time_Value::zero,
01623                       ACE_Time_Value::max_time,
01624                       this->header_db_allocator_.get(),
01625                       this->header_mb_allocator_.get()));
01626 
01627   marshal_transport_header(this->header_block_);
01628 
01629   this->pkt_chain_ = this->header_block_->duplicate();
01630 
01631   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01632         "Use a BuildChainVisitor to visit the packet elems_.\n"));
01633 
01634   // Build up a chain of blocks by duplicating the message block chain
01635   // held by each element (in elems_), and then chaining the new duplicate
01636   // blocks together to form one long chain.
01637   BuildChainVisitor visitor;
01638   this->elems_.accept_visitor(visitor);
01639 
01640   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01641         "Attach the visitor's chain of blocks to the lone (packet "
01642         "header) block currently in the pkt_chain_.\n"));
01643 
01644   // Attach the visitor's chain of blocks to the packet header block.
01645   this->pkt_chain_->cont(visitor.chain());
01646 
01647   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01648         "Increment header sequence for next packet.\n"));
01649 
01650   // Allow the specific implementation the opportunity to process the
01651   // newly prepared packet.
01652   this->prepare_packet_i();
01653 
01654   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01655         "Set the header_complete_ flag to false.\n"));
01656 
01657   // Set the header_complete_ to false to indicate
01658   // that the first block in the pkt_chain_ is the packet header block
01659   // (actually a duplicate() of the packet header_block_).
01660   this->header_complete_ = false;
01661 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet_i (  )  [protected, virtual]

Specific implementation processing of prepared packet.

Definition at line 1670 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_packet().

01671 {
01672   DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01673 
01674   // Default implementation does nothing.
01675 }

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::relink ( bool  do_suspend = true  )  [virtual]

The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 51 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL.

Referenced by direct_send(), and perform_work().

00052 {
00053   DBG_ENTRY_LVL("TransportSendStrategy","relink",6);
00054   // The subsclass needs implement this function for re-establishing
00055   // the link upon send failure.
00056 }

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 1264 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, do_remove_sample(), lock_, OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().

01265 {
01266   DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01267 
01268   const TransportQueueElement::MatchOnPubId match(pub_id);
01269   send_delayed_notifications(&match);
01270 
01271   GuardType guard(this->lock_);
01272 
01273   if (this->send_buffer_ != 0) {
01274     // If a secondary send buffer is bound, removed samples must
01275     // be retained in order to properly maintain the buffer:
01276     this->send_buffer_->retain_all(pub_id);
01277   }
01278 
01279   do_remove_sample(pub_id, match, 0);
01280 }

Here is the call graph for this function:

RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample ( const DataSampleElement sample,
void *  context 
)

Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.

Definition at line 1283 of file TransportSendStrategy.cpp.

References ACE_Message_Block::cont(), DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.

01284 {
01285   DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01286 
01287   VDBG_LVL((LM_DEBUG, "(%P|%t)  Removing sample: %@\n", sample->get_sample()), 5);
01288 
01289   // The sample to remove is either in temporary delayed notification list or
01290   // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
01291   // notification list by transport thread, it needs acquire WriterDataContainer lock for
01292   // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
01293   // complete as this call already hold the WriterContainer's lock. So this call is safe to
01294   // access the sample to remove. If it's going to be removed by this remove_sample() calling
01295   // thread, it will be removed either from delayed notification list or from internal list
01296   // in which case the element carry the info if the sample is released so the datalinkset
01297   // can stop calling rest datalinks to remove this sample if it's already released..
01298 
01299   const char* const payload = sample->get_sample()->cont()->rd_ptr();
01300   RepoId pub_id = sample->get_pub_id();
01301   const TransportQueueElement::MatchOnDataPayload modp(payload);
01302   if (send_delayed_notifications(&modp)) {
01303     return REMOVE_RELEASED;
01304   }
01305 
01306   GuardType guard(this->lock_);
01307   return do_remove_sample(pub_id, modp, context);
01308 }

Here is the call graph for this function:

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send (  ) 

This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.

Definition at line 71 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, lock_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, queue_, OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, and synch_.

00072 {
00073   DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6);
00074   GuardType guard(this->lock_);
00075 
00076   // If this send strategy is reused when the connection is reestablished, then
00077   // we need re-initialize the mode_ and mode_before_suspend_.
00078   if (this->mode_ == MODE_TERMINATED) {
00079     this->header_.length_ = 0;
00080     this->pkt_chain_ = 0;
00081     this->header_complete_ = false;
00082     this->start_counter_ = 0;
00083     this->mode_ = MODE_DIRECT;
00084     this->mode_before_suspend_ = MODE_NOT_SET;
00085     this->delayed_delivered_notification_queue_.clear();
00086 
00087   } else if (this->mode_ == MODE_SUSPEND) {
00088     this->mode_ = this->mode_before_suspend_;
00089     this->mode_before_suspend_ = MODE_NOT_SET;
00090     if (this->queue_.size() > 0) {
00091       this->mode_ = MODE_QUEUE;
00092       this->synch_->work_available();
00093     }
00094 
00095   } else {
00096     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send  The suspend or terminate"
00097                " is not called previously.\n"));
00098   }
00099 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::send ( TransportQueueElement element,
bool  relink = true 
)

Our DataLink has been requested by some particular TransportClient to send the element.

Definition at line 890 of file TransportSendStrategy.cpp.

References ACE_TEXT(), add_delayed_notification(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, LM_TRACE, lock_, max_header_size_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_message_size(), max_samples_, max_size_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), optimum_size_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), space_available(), synch_, ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

00891 {
00892   if (Transport_debug_level > 9) {
00893     ACE_DEBUG((LM_DEBUG,
00894                ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00895                ACE_TEXT("sending data at 0x%x.\n"),
00896                id(), element));
00897   }
00898 
00899   DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00900 
00901   {
00902     GuardType guard(this->lock_);
00903 
00904     if (this->link_released_) {
00905       this->add_delayed_notification(element);
00906 
00907     } else {
00908       if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00909         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00910               "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00911               "graceful disconnecting, so discard message.\n"));
00912         element->data_dropped(true);
00913         return;
00914       }
00915 
00916       size_t element_length = element->msg()->total_length();
00917 
00918       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00919             "Send element msg() has total_length() == [%d].\n",
00920             element_length));
00921 
00922       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00923             "this->max_header_size_ == [%d].\n",
00924             this->max_header_size_));
00925 
00926       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00927             "this->max_size_ == [%d].\n",
00928             this->max_size_));
00929 
00930       const size_t max_message_size = this->max_message_size();
00931 
00932       // Really an assert.  We can't accept any element that wouldn't fit into
00933       // a transport packet by itself (ie, it would be the only element in the
00934       // packet).  This max_size_ is the user-configurable maximum, not based
00935       // on the transport's inherent maximum message size.  If max_message_size
00936       // is non-zero, we will fragment so max_size_ doesn't apply per-element.
00937       if (max_message_size == 0 &&
00938           this->max_header_size_ + element_length > this->max_size_) {
00939         ACE_ERROR((LM_ERROR,
00940                    "(%P|%t) ERROR: Element too large (%Q) "
00941                    "- won't fit into packet.\n", ACE_UINT64(element_length)));
00942         return;
00943       }
00944 
00945       // Check the mode_ to see if we simply put the element on the queue.
00946       if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00947         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00948                   "this->mode_ == %C, so queue elem and leave.\n",
00949                   mode_as_str(this->mode_)), 5);
00950 
00951         this->queue_.put(element);
00952 
00953         if (this->mode_ != MODE_SUSPEND) {
00954           this->synch_->work_available();
00955         }
00956 
00957         return;
00958       }
00959 
00960       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00961             "this->mode_ == MODE_DIRECT.\n"));
00962 
00963       // We are in the MODE_DIRECT send mode.  When in this mode, the send()
00964       // calls will "build up" the transport packet to be sent directly when it
00965       // reaches the optimal size, contains the maximum number of samples, etc.
00966 
00967       // We need to check if the current element (the arg passed-in to this
00968       // send() method) should be appended to the transport packet, or if the
00969       // transport packet should be sent (directly) first, dealing with the
00970       // current element afterwards.
00971 
00972       // We will decide to send the packet as it is now, under two circumstances:
00973       //
00974       //    Either:
00975       //
00976       //    (1) The current element won't fit into the current packet since it
00977       //        would violate the max_packet_size_.
00978       //
00979       //    -OR-
00980       //
00981       //    (2) There is at least one element already in the current packet,
00982       //        and the current element says that it must be sent in an
00983       //        exclusive packet (ie, in a packet all by itself).
00984       //
00985       const bool exclusive = element->requires_exclusive_packet();
00986 
00987       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00988             "The element %C require an exclusive packet.\n",
00989             (exclusive ? "DOES" : "does NOT")
00990           ));
00991 
00992       const size_t space_needed =
00993         (max_message_size > 0)
00994         ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG
00995         : /* not fragmenting */ element_length;
00996 
00997       if ((exclusive && (this->elems_.size() != 0))
00998           || (this->space_available() < space_needed)) {
00999 
01000         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01001               "Element won't fit in current packet or requires exclusive"
01002               " - send current packet (directly) now.\n"));
01003 
01004         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01005               "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01006               , this->max_header_size_, this->header_.length_, element_length));
01007 
01008         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01009               "Tot possible length: %d, max_len: %d\n"
01010               , this->max_header_size_ + this->header_.length_ + element_length
01011               , this->max_size_));
01012         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01013               "current elem size: %d\n"
01014               , this->elems_.size()));
01015 
01016         // Send the current packet, and deal with the current element
01017         // afterwards.
01018         // The invocation's relink status should dictate the direct_send's
01019         // relink. We don't want a (relink == false) invocation to end up
01020         // doing a relink. Think of (relink == false) as a non-blocking call.
01021         this->direct_send(relink);
01022 
01023         // Now check to see if we flipped into MODE_QUEUE, which would mean
01024         // that the direct_send() experienced backpressure, and the
01025         // packet was only partially sent.  If this has happened, we deal with
01026         // the current element by placing it on the queue (and then we are done).
01027         //
01028         // Otherwise, if the mode_ is still MODE_DIRECT, we can just
01029         // "drop" through to the next step in the logic where we append the
01030         // current element to the current packet.
01031         if (this->mode_ == MODE_QUEUE) {
01032           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01033                     "We experienced backpressure on that direct send, as "
01034                     "the mode_ is now MODE_QUEUE or MODE_SUSPEND.  "
01035                     "Queue elem and leave.\n"), 5);
01036           this->queue_.put(element);
01037           this->synch_->work_available();
01038 
01039           return;
01040         }
01041       }
01042 
01043       // Loop for sending 'element', in fragments if needed
01044       bool first_pkt = true; // enter the loop 1st time through unconditionally
01045       for (TransportQueueElement* next_fragment = 0;
01046            (first_pkt || next_fragment)
01047            && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01048            // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
01049 
01050         if (next_fragment) {
01051           element = next_fragment;
01052           element_length = next_fragment->msg()->total_length();
01053           this->header_.first_fragment_ = false;
01054         }
01055 
01056         this->header_.last_fragment_ = false;
01057         if (max_message_size) { // fragmentation enabled
01058           const size_t avail = this->space_available();
01059           if (element_length > avail) {
01060             VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting\n"), 0);
01061             ElementPair ep = element->fragment(avail);
01062             element = ep.first;
01063             element_length = element->msg()->total_length();
01064             next_fragment = ep.second;
01065             this->header_.first_fragment_ = first_pkt;
01066           } else if (next_fragment) {
01067             // We are sending the "tail" element of a previous fragment()
01068             // operation, and this element didn't itself require fragmentation
01069             this->header_.last_fragment_ = true;
01070             next_fragment = 0;
01071           }
01072         }
01073         first_pkt = false;
01074 
01075         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01076               "Start the 'append elem' to current packet logic.\n"));
01077 
01078         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01079               "Put element into current packet elems_.\n"));
01080 
01081         // Now that we know the current element should go into the current
01082         // packet, we can just go ahead and "append" the current element to
01083         // the current packet.
01084 
01085         // Add the current element to the collection of packet elements.
01086         this->elems_.put(element);
01087 
01088         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01089               "Before, the header_.length_ == [%d].\n",
01090               this->header_.length_));
01091 
01092         // Adjust the header_.length_ to account for the length of the element.
01093         this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01094         const size_t message_length = this->header_.length_;
01095 
01096         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01097               "After adding element's length, the header_.length_ == [%d].\n",
01098               message_length));
01099 
01100         // The current packet now contains the current element.  We need to
01101         // check to see if the conditions are such that we should go ahead and
01102         // attempt to send the packet "directly" now, or if we can just leave
01103         // and send the current packet later (in another send() call or in a
01104         // send_stop() call).
01105 
01106         // There a few conditions that will cause us to attempt to send the
01107         // packet (directly) right now:
01108         // - Fragmentation was needed
01109         // - The current packet has the maximum number of samples per packet.
01110         // - The current packet's total length exceeds the optimum packet size.
01111         // - The current element (currently part of the packet elems_)
01112         //   requires an exclusive packet.
01113         //
01114         if (next_fragment || (this->elems_.size() >= this->max_samples_)
01115             || (this->max_header_size_ + message_length > this->optimum_size_)
01116             || exclusive) {
01117           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01118                 "Now the current packet looks full - send it (directly).\n"));
01119 
01120           this->direct_send(relink);
01121 
01122           if (next_fragment && this->mode_ != MODE_DIRECT) {
01123             if (this->mode_ == MODE_QUEUE) {
01124               this->queue_.put(next_fragment);
01125               this->synch_->work_available();
01126 
01127             } else {
01128               next_fragment->data_dropped(true /* dropped by transport */);
01129             }
01130           } else if (mode_ == MODE_QUEUE) {
01131             // Background thread handles packets in progress
01132             this->synch_->work_available();
01133           }
01134 
01135           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01136                 "Back from the direct_send() attempt.\n"));
01137 
01138           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01139                 "And we %C as a result of the direct_send() call.\n",
01140                 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01141                                              : "stayed in MODE_DIRECT")));
01142 
01143         } else {
01144           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01145                 "Packet not sent. Send conditions weren't satisfied.\n"));
01146           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01147                 "elems_.size(): %d, max_samples_: %d\n",
01148                 int(this->elems_.size()), int(this->max_samples_)));
01149           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01150                 "header_size_: %d, optimum_size_: %d\n",
01151                 int(this->max_header_size_ + message_length),
01152                 int(this->optimum_size_)));
01153           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01154                 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01155 
01156           if (this->mode_ == MODE_QUEUE) {
01157             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01158                   "We flipped into MODE_QUEUE.\n"));
01159 
01160           } else {
01161             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01162                   "We stayed in MODE_DIRECT.\n"));
01163           }
01164         }
01165       }
01166     }
01167   }
01168 
01169   send_delayed_notifications();
01170 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::send_buffer ( TransportSendBuffer send_buffer  ) 

Assigns an optional send buffer.

Definition at line 112 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::open().

00113 {
00114   this->send_buffer_ = send_buffer;
00115 
00116   if (this->send_buffer_ != 0) {
00117     this->send_buffer_->bind(this);
00118   }
00119 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes ( const iovec  iov[],
int  n,
int &  bp 
) [protected, virtual]

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 122 of file TransportSendStrategy.inl.

References send_bytes_i().

Referenced by do_send_packet().

00125 {
00126   return send_bytes_i(iov, n);
00127 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
) [protected, pure virtual]
bool OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications ( const TransportQueueElement::MatchCriteria match = 0  )  [protected]

If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.

Definition at line 647 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::is_shut_down(), lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_.

Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().

00648 {
00649   DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00650   TransportQueueElement* sample = 0;
00651   SendMode mode = MODE_NOT_SET;
00652 
00653   OPENDDS_VECTOR(TQESendModePair) samples;
00654 
00655   size_t num_delayed_notifications = 0;
00656   bool found_element = false;
00657 
00658   {
00659     GuardType guard(lock_);
00660 
00661     num_delayed_notifications = delayed_delivered_notification_queue_.size();
00662 
00663     if (num_delayed_notifications == 0) {
00664       return false;
00665 
00666     } else if (num_delayed_notifications == 1) {
00667       // Optimization for the most common case (doesn't need vectors)
00668 
00669       if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00670         found_element = true;
00671         sample = delayed_delivered_notification_queue_[0].first;
00672         mode = delayed_delivered_notification_queue_[0].second;
00673 
00674         delayed_delivered_notification_queue_.clear();
00675       }
00676 
00677     } else {
00678       OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00679       for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00680         sample = iter->first;
00681         mode = iter->second;
00682         if (!match || match->matches(*sample)) {
00683           found_element = true;
00684           samples.push_back(*iter);
00685           iter = delayed_delivered_notification_queue_.erase(iter);
00686         } else {
00687           ++iter;
00688         }
00689       }
00690     }
00691   }
00692 
00693   if (!found_element)
00694     return false;
00695 
00696   bool transport_shutdown = this->transport_.is_shut_down();
00697 
00698   if (num_delayed_notifications == 1) {
00699     // optimization for the common case
00700     if (mode == MODE_TERMINATED) {
00701       if (!transport_shutdown || sample->owned_by_transport()) {
00702         sample->data_dropped(true);
00703       }
00704     } else {
00705       if (!transport_shutdown || sample->owned_by_transport()) {
00706         sample->data_delivered();
00707       }
00708     }
00709 
00710   } else {
00711     for (size_t i = 0; i < samples.size(); ++i) {
00712       if (samples[i].second == MODE_TERMINATED) {
00713         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00714           samples[i].first->data_dropped(true);
00715         }
00716       } else {
00717         if (!transport_shutdown || samples[i].first->owned_by_transport()) {
00718           samples[i].first->data_delivered();
00719         }
00720       }
00721     }
00722   }
00723   return true;
00724 }

Here is the call graph for this function:

Here is the caller graph for this function:

TransportSendStrategy::SendPacketOutcome OpenDDS::DCPS::TransportSendStrategy::send_packet (  )  [private]

This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().

Definition at line 1738 of file TransportSendStrategy.cpp.

References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), header_, OpenDDS::DCPS::TransportSendBuffer::insert(), LM_DEBUG, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, pkt_chain_, send_buffer_, OpenDDS::DCPS::TransportHeader::sequence_, VDBG, and VDBG_LVL.

Referenced by direct_send(), and perform_work().

01739 {
01740   DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01741 
01742   int bp_flag = 0;
01743   const ssize_t num_bytes_sent =
01744     this->do_send_packet(this->pkt_chain_, bp_flag);
01745 
01746   if (num_bytes_sent == 0) {
01747     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01748               "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01749     // This means that the peer has disconnected.
01750     return OUTCOME_PEER_LOST;
01751   }
01752 
01753   if (num_bytes_sent < 0) {
01754     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01755               "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01756 
01757     // Check for backpressure...
01758     if (bp_flag == 1) {
01759       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01760                 "Since backpressure flag is true, return "
01761                 "OUTCOME_BACKPRESSURE.\n"), 5);
01762       // Ok.  Not really an error - just backpressure.
01763       return OUTCOME_BACKPRESSURE;
01764     }
01765 
01766     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01767               "Since backpressure flag is false, return "
01768               "OUTCOME_SEND_ERROR.\n"), 5);
01769 
01770     // Not backpressure - it's a real error.
01771     // Note: moved this to send_bytes so the errno msg could be written.
01772     //ACE_ERROR((LM_ERROR,
01773     //           "(%P|%t) ERROR: Call to peer().send() failed with negative "
01774     //           "return code.\n"));
01775 
01776     return OUTCOME_SEND_ERROR;
01777   }
01778 
01779   if (this->send_buffer_ != 0) {
01780     // If a secondary send buffer is bound, sent samples must
01781     // be inserted in order to properly maintain the buffer:
01782     this->send_buffer_->insert(this->header_.sequence_,
01783       &this->elems_, this->pkt_chain_);
01784   }
01785 
01786   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01787             "Since num_bytes_sent > 0, adjust the packet to account for "
01788             "the bytes that did get sent.\n"),5);
01789 
01790   // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
01791   // to account for the bytes that have been sent.
01792   const int result =
01793     this->adjust_packet_after_send(num_bytes_sent);
01794 
01795   if (result == 0) {
01796     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01797           "The adjustment logic says that the complete packet was "
01798           "sent.  Return OUTCOME_COMPLETE_SEND.\n"));
01799     return OUTCOME_COMPLETE_SEND;
01800   }
01801 
01802   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01803         "The adjustment logic says that only a part of the packet was "
01804         "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01805 
01806   return OUTCOME_PARTIAL_SEND;
01807 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start (  ) 

Invoked prior to one or more send() invocations from a particular TransportClient.

Definition at line 31 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, link_released_, lock_, and start_counter_.

00032 {
00033   DBG_ENTRY_LVL("TransportSendStrategy","send_start",6);
00034 
00035   GuardType guard(this->lock_);
00036 
00037   if (!this->link_released_)
00038     ++this->start_counter_;
00039 }

void OpenDDS::DCPS::TransportSendStrategy::send_stop ( RepoId  repoId  ) 

Invoked after one or more send() invocations from a particular TransportClient.

Definition at line 1173 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, direct_send(), elems_, graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, lock_, mode_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, synch_, VDBG, and VDBG_LVL.

01174 {
01175   DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01176   {
01177     GuardType guard(this->lock_);
01178 
01179     if (this->link_released_)
01180       return;
01181 
01182     if (this->start_counter_ == 0) {
01183       // This is an indication of a logic error.  This is more of an assert.
01184       VDBG_LVL((LM_ERROR,
01185                 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01186       return;
01187     }
01188 
01189     --this->start_counter_;
01190 
01191     if (this->start_counter_ != 0) {
01192       // This wasn't the last send_stop() that we are expecting.  We only
01193       // really honor the first send_start() and the last send_stop().
01194       // We can return without doing anything else in this case.
01195       return;
01196     }
01197 
01198     if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01199       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01200             "TransportSendStrategy::send_stop: dont try to send current packet "
01201             "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01202       return;
01203     }
01204 
01205     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01206           "This is an 'important' send_stop() event since our "
01207           "start_counter_ is 0.\n"));
01208 
01209     // We just caused the start_counter_ to become zero.  This
01210     // means that we aren't expecting another send() or send_stop() at any
01211     // time in the near future (ie, it isn't imminent).
01212 
01213     // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
01214     // anything to do here because samples have already been going to the
01215     // queue.
01216 
01217     // We only need to do something if the mode_ is
01218     // MODE_DIRECT.  It means that we may have some sample(s) in the
01219     // current packet that have never been sent.  This is our
01220     // opportunity to send the current packet directly if this is the case.
01221     if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01222       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01223             "But since we are in %C, we don't have to do "
01224             "anything more in this important send_stop().\n",
01225             mode_as_str(this->mode_)));
01226       // We don't do anything if we are in MODE_QUEUE.  Just leave.
01227       return;
01228     }
01229 
01230     size_t header_length = this->header_.length_;
01231     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01232           "We are in MODE_DIRECT in an important send_stop() - "
01233           "header_.length_ == [%d].\n", header_length));
01234 
01235     // Only attempt to send the current packet (directly) if the current
01236     // packet actually contains something (it could be empty).
01237     if ((header_length > 0) &&
01238         //(this->elems_.size ()+this->not_yet_pac_q_->size() > 0))
01239         (this->elems_.size() > 0)) {
01240       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01241             "There is something in the current packet - attempt to send "
01242             "it (directly) now.\n"));
01243       // If a relink needs to be done for this packet to be sent, do it.
01244       this->direct_send(true);
01245       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01246             "Back from the attempt to send leftover packet directly.\n"));
01247 
01248       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01249             "But we %C as a result.\n",
01250             ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01251                                           "stayed in MODE_DIRECT" )));
01252       if (this->mode_ == MODE_QUEUE  && this->mode_ != MODE_SUSPEND) {
01253         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01254               "Notify Synch thread of work availability\n"));
01255         this->synch_->work_available();
01256       }
01257     }
01258   }
01259 
01260   send_delayed_notifications();
01261 }

Here is the call graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting ( bool  flag  )  [protected]

Set graceful disconnecting flag.

Definition at line 1678 of file TransportSendStrategy.cpp.

References graceful_disconnecting_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().

01679 {
01680   this->graceful_disconnecting_ = flag;
01681 }

Here is the caller graph for this function:

size_t OpenDDS::DCPS::TransportSendStrategy::space_available (  )  const [private]

How much space is available in the current packet before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]

Definition at line 1879 of file TransportSendStrategy.cpp.

References header_, OpenDDS::DCPS::TransportHeader::length_, max_header_size_, max_message_size(), and max_size_.

Referenced by get_packet_elems_from_queue(), and send().

01880 {
01881   const size_t used = this->max_header_size_ + this->header_.length_,
01882     max_msg = this->max_message_size();
01883   if (max_msg) {
01884     return std::min(this->max_size_ - used, max_msg - used);
01885   }
01886   return this->max_size_ - used;
01887 }

Here is the call graph for this function:

Here is the caller graph for this function:

int OpenDDS::DCPS::TransportSendStrategy::start ( void   ) 

Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.

Definition at line 808 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, header_db_allocator_, header_mb_allocator_, LM_ERROR, lock_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), send_buffer_, start_i(), and synch_.

Referenced by OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i().

00809 {
00810   DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00811 
00812   {
00813     GuardType guard(this->lock_);
00814 
00815     if (!this->start_i()) {
00816       return -1;
00817     }
00818   }
00819 
00820   size_t header_chunks(1);
00821 
00822   // If a secondary send buffer is bound, sent headers should
00823   // be cached to properly maintain the buffer:
00824   if (this->send_buffer_ != 0) {
00825     header_chunks += this->send_buffer_->capacity();
00826 
00827   } else {
00828     header_chunks += 1;
00829   }
00830 
00831   this->header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
00832   this->header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
00833 
00834   // Since we (the TransportSendStrategy object) are a reference-counted
00835   // object, but the synch_ object doesn't necessarily know this, we need
00836   // to give a "copy" of a reference to ourselves to the synch_ object here.
00837   // We will do the reverse when we unregister ourselves (as a worker) from
00838   // the synch_ object.
00839 
00840   if (this->synch_->register_worker(*this) == -1) {
00841 
00842     ACE_ERROR_RETURN((LM_ERROR,
00843                       "(%P|%t) ERROR: TransportSendStrategy failed to register "
00844                       "as a worker with the ThreadSynch object.\n"),
00845                      -1);
00846   }
00847 
00848   return 0;
00849 }

Here is the call graph for this function:

Here is the caller graph for this function:

virtual bool OpenDDS::DCPS::TransportSendStrategy::start_i (  )  [inline, virtual]

Let the subclass start.

Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.

Definition at line 123 of file TransportSendStrategy.h.

Referenced by start().

00123 { return true; }

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::stop ( void   ) 

Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.

Definition at line 852 of file TransportSendStrategy.cpp.

References ACE_TEXT(), DBG_ENTRY_LVL, header_block_, LM_WARNING, lock_, pkt_chain_, ACE_Message_Block::release(), size, stop_i(), synch_, and ACE_Message_Block::total_length().

00853 {
00854   DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00855 
00856   if (this->header_block_ != 0) {
00857     this->header_block_->release ();
00858     this->header_block_ = 0;
00859   }
00860 
00861   this->synch_->unregister_worker();
00862 
00863   {
00864     GuardType guard(this->lock_);
00865 
00866     if (this->pkt_chain_ != 0) {
00867       size_t size = this->pkt_chain_->total_length();
00868 
00869       if (size > 0) {
00870         this->pkt_chain_->release();
00871         ACE_DEBUG((LM_WARNING,
00872                    ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00873                    ACE_TEXT("terminating with %d unsent bytes.\n"),
00874                    size));
00875       }
00876     }
00877   }
00878 
00879   {
00880     GuardType guard(this->lock_);
00881 
00882     this->stop_i();
00883   }
00884 
00885   // TBD SOON - What about all of the samples that may still be stuck in
00886   //            our queue_ and/or elems_?
00887 }

Here is the call graph for this function:

virtual void OpenDDS::DCPS::TransportSendStrategy::stop_i (  )  [pure virtual]

Let the subclass stop.

Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Referenced by stop().

Here is the caller graph for this function:

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send (  ) 

This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.

Definition at line 59 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, lock_, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.

00060 {
00061   DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6);
00062   GuardType guard(this->lock_);
00063 
00064   if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) {
00065     this->mode_before_suspend_ = this->mode_;
00066     this->mode_ = MODE_SUSPEND;
00067   }
00068 }

ACE_INLINE OpenDDS::DCPS::ThreadSynch * OpenDDS::DCPS::TransportSendStrategy::synch (  )  const [protected]

Definition at line 23 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), and synch_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().

00024 {
00025   DBG_ENTRY_LVL("TransportSendStrategy","synch",6);
00026 
00027   return synch_.get();
00028 }

Here is the call graph for this function:

Here is the caller graph for this function:

void OpenDDS::DCPS::TransportSendStrategy::terminate_send ( bool  graceful_disconnecting = false  ) 

Remove all samples in the backpressure queue and packet queue.

This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.

Definition at line 728 of file TransportSendStrategy.cpp.

References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, LM_DEBUG, lock_, mode_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.

00729 {
00730   DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00731 
00732   bool reset_flag = true;
00733 
00734   {
00735     GuardType guard(this->lock_);
00736 
00737     // If the terminate_send call due to a non-graceful disconnection before
00738     // a datalink shutdown then we will not try to send the graceful disconnect
00739     // message.
00740     if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00741         && !this->graceful_disconnecting_) {
00742       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00743             "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00744       reset_flag = false;
00745     }
00746   }
00747 
00748   VDBG((LM_DEBUG, "(%P|%t) DBG:  Now flip to MODE_TERMINATED \n"));
00749 
00750   this->clear(MODE_TERMINATED);
00751 
00752   if (reset_flag) {
00753     GuardType guard(this->lock_);
00754     this->graceful_disconnecting_ = graceful_disconnecting;
00755   }
00756 }

Here is the call graph for this function:


Friends And Related Function Documentation

friend class TransportSendBuffer [friend]

Definition at line 398 of file TransportSendStrategy.h.


Member Data Documentation

Current elements that have contributed blocks to the current transport packet.

Definition at line 337 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), send(), and send_stop().

Current transport packet header, marshalled.

Definition at line 330 of file TransportSendStrategy.h.

Referenced by do_remove_sample(), prepare_packet(), and stop().

Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.

Definition at line 345 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().

Allocator for header message block.

Definition at line 374 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

Allocator for header data block.

Definition at line 371 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

Current transport header sequence number.

Definition at line 333 of file TransportSendStrategy.h.

Referenced by prepare_header().

Definition at line 391 of file TransportSendStrategy.h.

Referenced by link_released(), send(), send_start(), and send_stop().

This lock will protect critical sections of code that play a role in the sending of data.

Definition at line 381 of file TransportSendStrategy.h.

Referenced by clear(), deliver_ack_request(), link_released(), perform_work(), remove_all_msgs(), remove_sample(), resume_send(), send(), send_delayed_notifications(), send_start(), send_stop(), start(), stop(), suspend_send(), and terminate_send().

Maximum marshalled size of the transport packet header.

Definition at line 327 of file TransportSendStrategy.h.

Referenced by prepare_packet(), send(), space_available(), and TransportSendStrategy().

Configuration - max number of samples per transport packet.

Definition at line 311 of file TransportSendStrategy.h.

Referenced by add_delayed_notification(), get_packet_elems_from_queue(), send(), and TransportSendStrategy().

Configuration - max transport packet size (bytes).

Definition at line 317 of file TransportSendStrategy.h.

Referenced by send(), and space_available().

This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.

Definition at line 364 of file TransportSendStrategy.h.

Referenced by clear(), direct_send(), resume_send(), and suspend_send().

Configuration - optimum transport packet size (bytes).

Definition at line 314 of file TransportSendStrategy.h.

Referenced by get_packet_elems_from_queue(), and send().

Current (head of chain) block containing unsent bytes for the current transport packet.

Definition at line 341 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), prepare_packet(), resume_send(), send_packet(), and stop().

Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.

Definition at line 324 of file TransportSendStrategy.h.

Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), perform_work(), resume_send(), and send().

Definition at line 385 of file TransportSendStrategy.h.

Referenced by do_remove_sample().

Cached allocator for TransportReplaceElement.

Definition at line 384 of file TransportSendStrategy.h.

Referenced by do_remove_sample().

Definition at line 393 of file TransportSendStrategy.h.

Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().

Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accomodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().

Definition at line 356 of file TransportSendStrategy.h.

Referenced by clear(), resume_send(), send_start(), and send_stop().

The thread synch object.

Definition at line 377 of file TransportSendStrategy.h.

Referenced by perform_work(), resume_send(), send(), send_stop(), start(), stop(), synch(), and TransportSendStrategy().

Definition at line 387 of file TransportSendStrategy.h.

Referenced by direct_send(), and send_delayed_notifications().

Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.

Definition at line 181 of file TransportSendStrategy.h.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1