OpenDDS::DCPS::TransportSendStrategy Class Reference

#include <TransportSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::TransportSendStrategy:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportSendStrategy:

Collaboration graph
[legend]
List of all members.

Public Types

typedef BasicQueue< TransportQueueElementQueueType
 MODE_NOT_SET
 MODE_DIRECT
 MODE_QUEUE
 MODE_SUSPEND
 MODE_TERMINATED
enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}

Public Member Functions

virtual ~TransportSendStrategy ()
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer.
int start ()
void stop ()
void send_start ()
void send (TransportQueueElement *element, bool relink=true)
void send_stop (RepoId repoId)
RemoveResult remove_sample (const DataSampleElement *sample)
void remove_all_msgs (RepoId pub_id)
virtual WorkOutcome perform_work ()
virtual void relink (bool do_suspend=true)
void suspend_send ()
void resume_send ()
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue.
virtual void stop_i ()=0
 Let the subclass stop.
virtual bool start_i ()
 Let the subclass start.
void link_released (bool flag)
bool isDirectMode ()
void transport_shutdown ()
virtual ACE_HANDLE get_handle ()
void clear (SendMode mode=MODE_DIRECT)
 Clear queued messages and messages in current packet.
SendMode mode () const
 Access the current sending mode.

Static Public Member Functions

static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)

Protected Member Functions

 TransportSendStrategy (std::size_t id, const TransportInst_rch &transport_inst, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
virtual ssize_t send_bytes_i (const iovec iov[], int n)=0
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header.
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet.
TransportQueueElementcurrent_packet_first_element () const
virtual size_t max_message_size () const
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag.
virtual void add_delayed_notification (TransportQueueElement *element)
virtual RemoveResult do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria)
 Implement framework chain visitations to remove a sample.
ThreadSynchsynch () const

Protected Attributes

TransportHeader header_
 Current transport packet header.

Static Protected Attributes

static const size_t UDP_MAX_MESSAGE_SIZE = 65466

Private Types

typedef ACE_SYNCH_MUTEX LockType
typedef ACE_Guard< LockTypeGuardType
typedef std::pair< TransportQueueElement *,
SendMode
TQESendModePair
 Used for delayed notifications when performing work.
 OUTCOME_COMPLETE_SEND
 OUTCOME_PARTIAL_SEND
 OUTCOME_BACKPRESSURE
 OUTCOME_PEER_LOST
 OUTCOME_SEND_ERROR
enum  SendPacketOutcome {
  OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST,
  OUTCOME_SEND_ERROR
}

Private Member Functions

void direct_send (bool relink)
void get_packet_elems_from_queue ()
void prepare_header ()
void prepare_packet ()
SendPacketOutcome send_packet ()
ssize_t do_send_packet (const ACE_Message_Block *packet, int &bp)
 Form an IOV and call the send_bytes() template method.
int adjust_packet_after_send (ssize_t num_bytes_sent)
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
size_t space_available () const
virtual void marshal_transport_header (ACE_Message_Block *mb)
 OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_

Static Private Member Functions

static const char * mode_as_str (SendMode mode)
 Helper function to debugging.

Private Attributes

size_t max_samples_
 Configuration - max number of samples per transport packet.
ACE_UINT32 optimum_size_
 Configuration - optimum transport packet size (bytes).
ACE_UINT32 max_size_
 Configuration - max transport packet size (bytes).
QueueTypequeue_
size_t max_header_size_
 Maximum marshalled size of the transport packet header.
ACE_Message_Block * header_block_
 Current transport packet header, marshalled.
SequenceNumber header_sequence_
 Current transport header sequence number.
QueueTypeelems_
ACE_Message_Block * pkt_chain_
bool header_complete_
unsigned start_counter_
SendMode mode_
 This mode determines how send() calls will be handled.
SendMode mode_before_suspend_
TransportMessageBlockAllocatorheader_mb_allocator_
 Allocator for header data block.
TransportDataBlockAllocatorheader_db_allocator_
 Allocator for header message block.
ThreadSynchsynch_
 The thread synch object.
LockType lock_
TransportReplacedElementAllocator replaced_element_allocator_
 Cached allocator for TransportReplaceElement.
MessageBlockAllocator replaced_element_mb_allocator_
DataBlockAllocator replaced_element_db_allocator_
TransportRetainedElementAllocatorretained_element_allocator_
TransportInst_rch transport_inst_
bool graceful_disconnecting_
bool link_released_
TransportSendBuffersend_buffer_
bool transport_shutdown_

Friends

class TransportSendBuffer

Detailed Description

This class provides methods to fill packets with samples for sending and handles backpressure. It maintains the list of samples in current packets and also the list of samples queued during backpressure. A thread per connection is created to handle the queued samples.

Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.

Definition at line 48 of file TransportSendStrategy.h.


Member Typedef Documentation

typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportSendStrategy::GuardType [private]

Definition at line 257 of file TransportSendStrategy.h.

typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportSendStrategy::LockType [private]

Definition at line 256 of file TransportSendStrategy.h.

typedef BasicQueue<TransportQueueElement> OpenDDS::DCPS::TransportSendStrategy::QueueType

Definition at line 133 of file TransportSendStrategy.h.

typedef std::pair<TransportQueueElement*, SendMode> OpenDDS::DCPS::TransportSendStrategy::TQESendModePair [private]

Used for delayed notifications when performing work.

Definition at line 356 of file TransportSendStrategy.h.


Member Enumeration Documentation

enum OpenDDS::DCPS::TransportSendStrategy::SendMode

Enumerator:
MODE_NOT_SET 
MODE_DIRECT 
MODE_QUEUE 
MODE_SUSPEND 
MODE_TERMINATED 

Definition at line 260 of file TransportSendStrategy.h.

00260                 {
00261     // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
00262     // we can check if the resume_send is paired with suspend_send.
00263     MODE_NOT_SET,
00264     // Send out the sample with current packet.
00265     MODE_DIRECT,
00266     // The samples need be queued because of the backpressure or partial send.
00267     MODE_QUEUE,
00268     // The samples need be queued because the connection is lost and we are
00269     // trying to reconnect.
00270     MODE_SUSPEND,
00271     // The samples need be dropped since we lost connection and could not
00272     // reconnect.
00273     MODE_TERMINATED
00274   };

enum OpenDDS::DCPS::TransportSendStrategy::SendPacketOutcome [private]

Enumerator:
OUTCOME_COMPLETE_SEND 
OUTCOME_PARTIAL_SEND 
OUTCOME_BACKPRESSURE 
OUTCOME_PEER_LOST 
OUTCOME_SEND_ERROR 

Definition at line 191 of file TransportSendStrategy.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::TransportSendStrategy::~TransportSendStrategy (  )  [virtual]

Definition at line 115 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, elems_, queue_, and synch_.

00116 {
00117   DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
00118 
00119   delete this->synch_;
00120 
00121   this->delayed_delivered_notification_queue_.clear();
00122 
00123   delete this->elems_;
00124   delete this->queue_;
00125 }

OpenDDS::DCPS::TransportSendStrategy::TransportSendStrategy ( std::size_t  id,
const TransportInst_rch transport_inst,
ThreadSynchResource synch_resource,
Priority  priority,
const ThreadSynchStrategy_rch thread_sync_strategy 
) [protected]

Definition at line 56 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, max_header_size_, max_marshaled_size(), NUM_REPLACED_ELEMENT_CHUNKS, replaced_element_allocator_, synch_, TheServiceParticipant, OpenDDS::DCPS::DirectPriorityMapper::thread_priority(), and OpenDDS::DCPS::Transport_debug_level.

00062   : ThreadSynchWorker(id),
00063     max_samples_(transport_inst->max_samples_per_packet_),
00064     optimum_size_(transport_inst->optimum_packet_size_),
00065     max_size_(transport_inst->max_packet_size_),
00066     queue_(new QueueType(transport_inst->queue_messages_per_pool_,
00067                          transport_inst->queue_initial_pools_)),
00068     max_header_size_(0),
00069     header_block_(0),
00070     elems_(new QueueType(1, transport_inst->max_samples_per_packet_)),
00071     pkt_chain_(0),
00072     header_complete_(false),
00073     start_counter_(0),
00074     mode_(MODE_DIRECT),
00075     mode_before_suspend_(MODE_NOT_SET),
00076     header_mb_allocator_(0),
00077     header_db_allocator_(0),
00078     synch_(0),
00079     lock_(),
00080     replaced_element_allocator_(NUM_REPLACED_ELEMENT_CHUNKS),
00081     replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00082     replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2),
00083     retained_element_allocator_(0),
00084     transport_inst_(transport_inst),
00085     graceful_disconnecting_(false),
00086     link_released_(true),
00087     send_buffer_(0),
00088     transport_shutdown_(false)
00089 {
00090   DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
00091 
00092   // Create a ThreadSynch object just for us.
00093   DirectPriorityMapper mapper(priority);
00094   this->synch_ = thread_sync_strategy->create_synch_object(
00095                    synch_resource,
00096 #ifdef ACE_WIN32
00097                    ACE_DEFAULT_THREAD_PRIORITY,
00098 #else
00099                    mapper.thread_priority(),
00100 #endif
00101                    TheServiceParticipant->scheduler());
00102 
00103   // We cache this value in data member since it doesn't change, and we
00104   // don't want to keep asking for it over and over.
00105   this->max_header_size_ = TransportHeader::max_marshaled_size();
00106 
00107   if (Transport_debug_level >= 2) {
00108     ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportSendStrategy replaced_element_allocator %x with %d chunks\n",
00109                &replaced_element_allocator_, NUM_REPLACED_ELEMENT_CHUNKS));
00110   }
00111 
00112   delayed_delivered_notification_queue_.reserve(this->max_samples_);
00113 }


Member Function Documentation

void OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification ( TransportQueueElement element  )  [protected, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1872 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification(), adjust_packet_after_send(), and send().

01873 {
01874   if (Transport_debug_level) {
01875     size_t size = this->delayed_delivered_notification_queue_.size();
01876     if ((size > 0) && (size % this->max_samples_ == 0)) {
01877       ACE_DEBUG((LM_DEBUG,
01878                  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
01879                  size));
01880     }
01881   }
01882 
01883   this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_));
01884 }

int OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send ( ssize_t  num_bytes_sent  )  [private]

This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.

Definition at line 369 of file TransportSendStrategy.cpp.

References add_delayed_notification(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, and VDBG.

Referenced by clear(), and send_packet().

00370 {
00371   DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
00372 
00373   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00374         "Adjusting the current packet because %d bytes of the packet "
00375         "have been sent.\n", num_bytes_sent));
00376 
00377   ssize_t num_bytes_left = num_bytes_sent;
00378   ssize_t num_non_header_bytes_sent = 0;
00379 
00380   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00381         "Set num_bytes_left to %d.\n", num_bytes_left));
00382   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00383         "Set num_non_header_bytes_sent to %d.\n",
00384         num_non_header_bytes_sent));
00385 
00386   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00387         "Peek at the element at the front of the packet elems_.\n"));
00388 
00389   // This is the element currently at the front of elems_.
00390   TransportQueueElement* element = this->elems_->peek();
00391 
00392   if(!element){
00393     ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
00394   } else {
00395     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00396           "Use the element's msg() to find the last block in "
00397           "the msg() chain.\n"));
00398 
00399     // Get a pointer to the last message block in the element.
00400     const ACE_Message_Block* elem_tail_block = element->msg();
00401 
00402     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00403           "Start with tail block == element->msg().\n"));
00404 
00405     while (elem_tail_block->cont() != 0) {
00406       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00407             "Set tail block to its cont() block (next in chain).\n"));
00408       elem_tail_block = elem_tail_block->cont();
00409     }
00410 
00411     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00412           "Tail block now set (because tail block's cont() is 0).\n"));
00413 
00414     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00415           "Start the 'while (num_bytes_left > 0)' loop.\n"));
00416 
00417     while (num_bytes_left > 0) {
00418       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00419             "At top of 'num bytes left' loop.  num_bytes_left == [%d].\n",
00420             num_bytes_left));
00421 
00422       const int block_length = static_cast<int>(this->pkt_chain_->length());
00423 
00424       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00425             "Length of block at front of pkt_chain_ is [%d].\n",
00426             block_length));
00427 
00428       if (block_length <= num_bytes_left) {
00429         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00430               "The whole block at the front of pkt_chain_ was sent.\n"));
00431 
00432         // The entire message block at the front of the chain has been sent.
00433         // Detach the head message block from the chain and adjust
00434         // the pkt_chain_ to point to the next block (if any) in
00435         // the chain.
00436         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00437               "Extract the fully sent block from the pkt_chain_.\n"));
00438 
00439         ACE_Message_Block* fully_sent_block = this->pkt_chain_;
00440 
00441         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00442               "Set pkt_chain_ to pkt_chain_->cont().\n"));
00443 
00444         this->pkt_chain_ = this->pkt_chain_->cont();
00445 
00446         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00447               "Set the fully sent block's cont() to 0.\n"));
00448 
00449         fully_sent_block->cont(0);
00450 
00451         // Update the num_bytes_left to indicate that we have
00452         // processed the entire length of the block.
00453         num_bytes_left -= block_length;
00454 
00455         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00456               "Updated num_bytes_left to account for fully sent "
00457               "block (block_length == [%d]).\n", block_length));
00458         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00459               "Now, num_bytes_left == [%d].\n", num_bytes_left));
00460 
00461         if (!this->header_complete_) {
00462           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00463                 "Since the header_complete_ flag is false, it means "
00464                 "that the packet header block was still in the "
00465                 "pkt_chain_.\n"));
00466 
00467           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00468                 "Not anymore...  Set the header_complete_ flag "
00469                 "to true.\n"));
00470 
00471           // That was the packet header block.  And now we know that it
00472           // has been completely sent.
00473           this->header_complete_ = true;
00474 
00475           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00476                 "Release the fully sent block.\n"));
00477 
00478           // Release the fully_sent_block
00479           fully_sent_block->release();
00480 
00481         } else {
00482           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00483                 "Since the header_complete_ flag is true, it means "
00484                 "that the packet header block was not in the "
00485                 "pkt_chain_.\n"));
00486           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00487                 "So, the fully sent block was part of an element.\n"));
00488 
00489           // That wasn't the packet header block.  It was from the
00490           // element currently at the front of the elems_
00491           // collection.  If it was the last block from the
00492           // element, then we need to extract the element from the
00493           // elems_ collection and invoke data_delivered() on it.
00494           num_non_header_bytes_sent += block_length;
00495 
00496           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00497                 "Updated num_non_header_bytes_sent to account for "
00498                 "fully sent block (block_length == [%d]).\n",
00499                 block_length));
00500 
00501           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00502                 "Now, num_non_header_bytes_sent == [%d].\n",
00503                 num_non_header_bytes_sent));
00504 
00505           if (fully_sent_block->base() == elem_tail_block->base()) {
00506             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00507                   "Ok.  The fully sent block was a duplicate of "
00508                   "the tail block of the element that is at the "
00509                   "front of the packet elems_.\n"));
00510 
00511             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00512                   "This means that we have completely sent the "
00513                   "element at the front of the packet elems_.\n"));
00514 
00515             // This means that we have completely sent the element
00516             // that is currently at the front of the elems_ collection.
00517 
00518             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00519                   "We can release the fully sent block now.\n"));
00520 
00521             // Release the fully_sent_block
00522             fully_sent_block->release();
00523 
00524             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00525                   "We can extract the element from the front of "
00526                   "the packet elems_ (we were just peeking).\n"));
00527 
00528             // Extract the element from the elems_ collection
00529             element = this->elems_->get();
00530 
00531             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00532                   "Tell the element that a decision has been made "
00533                   "regarding its fate - data_delivered().\n"));
00534 
00535             // Inform the element that the data has been delivered.
00536             this->add_delayed_notification(element);
00537 
00538             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00539                   "Peek at the next element in the packet "
00540                   "elems_.\n"));
00541 
00542             // Set up for the next element in elems_ by peek()'ing.
00543             element = this->elems_->peek();
00544 
00545             if (element != 0) {
00546               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00547                     "The is an element still in the packet "
00548                     "elems_ (we are peeking at it now).\n"));
00549 
00550               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00551                     "We are going to find the tail block for the "
00552                     "current element (we are peeking at).\n"));
00553 
00554               // There was a "next element".  Determine the
00555               // elem_tail_block for it.
00556               elem_tail_block = element->msg();
00557 
00558               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00559                     "Start w/tail block == element->msg().\n"));
00560 
00561               while (elem_tail_block->cont() != 0) {
00562                 VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00563                       "Set tail block to next in chain.\n"));
00564                 elem_tail_block = elem_tail_block->cont();
00565               }
00566 
00567               VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00568                     "Done finding tail block.\n"));
00569             }
00570 
00571           } else {
00572             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00573                   "Ok.  The fully sent block is *not* a "
00574                   "duplicate of the tail block of the element "
00575                   "at the front of the packet elems_.\n"));
00576 
00577             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00578                   "Thus, we have not completely sent the "
00579                   "element yet.\n"));
00580 
00581             // We didn't completely send the element - it has more
00582             // message blocks that haven't been sent (that we know of).
00583 
00584             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00585                   "We can release the fully_sent_block now.\n"));
00586 
00587             // Release the fully_sent_block
00588             fully_sent_block->release();
00589           }
00590         }
00591 
00592       } else {
00593         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00594               "Only part of the block at the front of pkt_chain_ "
00595               "was sent.\n"));
00596 
00597         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00598               "Advance the rd_ptr() of the front block (of pkt_chain_) "
00599               "by the num_bytes_left (%d).\n", num_bytes_left));
00600 
00601         // Only part of the current block was sent.
00602         this->pkt_chain_->rd_ptr(num_bytes_left);
00603 
00604         if (this->header_complete_) {
00605           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00606                 "And since the packet header block has already been "
00607                 "completely sent, add num_bytes_left to the "
00608                 "num_non_header_bytes_sent.\n"));
00609 
00610           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00611                 "Before, num_non_header_bytes_sent == %d.\n",
00612                 num_non_header_bytes_sent));
00613 
00614           // We know that the current block isn't the packet header
00615           // block because the packet header block has already been
00616           // completely sent.  We need to count these bytes in the
00617           // num_non_header_bytes_sent.
00618           num_non_header_bytes_sent += num_bytes_left;
00619 
00620           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00621                 "After, num_non_header_bytes_sent == %d.\n",
00622                 num_non_header_bytes_sent));
00623         }
00624 
00625         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00626               "Set the num_bytes_left to 0 now.\n"));
00627 
00628         num_bytes_left = 0;
00629       }
00630     }
00631   }
00632 
00633   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00634         "The 'num_bytes_left' loop has completed.\n"));
00635 
00636   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00637         "Adjust the header_.length_ to account for the "
00638         "num_non_header_bytes_sent.\n"));
00639   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00640         "Before, header_.length_ == %d.\n",
00641         this->header_.length_));
00642 
00643   // Adjust the packet header_.length_ to indicate how many non header
00644   // bytes are left to send.
00645   this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
00646 
00647   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00648         "After, header_.length_ == %d.\n",
00649         this->header_.length_));
00650 
00651   // Returns 0 if the entire packet was sent, and returns 1 otherwise.
00652   int rc = (this->header_.length_ == 0) ? 0 : 1;
00653 
00654   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00655         "Adjustments all done.  Returning [%d].  0 means entire packet "
00656         "has been sent.  1 means otherwise.\n",
00657         rc));
00658 
00659   return rc;
00660 }

void OpenDDS::DCPS::TransportSendStrategy::clear ( SendMode  mode = MODE_DIRECT  ) 

Clear queued messages and messages in current packet.

Definition at line 773 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, and VDBG.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), and terminate_send().

00774 {
00775   DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
00776 
00777   send_delayed_notifications();
00778   QueueType* elems = 0;
00779   QueueType* queue = 0;
00780   {
00781     GuardType guard(this->lock_);
00782 
00783     if (this->header_.length_ > 0) {
00784       // Clear the messages in the pkt_chain_ that is partially sent.
00785       // We just reuse these functions for normal partial send except actual sending.
00786       int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length());
00787       int result = this->adjust_packet_after_send(num_bytes_left);
00788 
00789       if (result == 0) {
00790         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00791               "The adjustment logic says that the packet is cleared.\n"));
00792 
00793       } else {
00794         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00795               "The adjustment returned partial sent.\n"));
00796       }
00797     }
00798 
00799     elems = this->elems_;
00800     this->elems_ = new QueueType(1, this->transport_inst_->max_samples_per_packet_);
00801     queue = this->queue_;
00802     this->queue_ = new QueueType(this->transport_inst_->queue_messages_per_pool_,
00803                                  this->transport_inst_->queue_initial_pools_);
00804 
00805     this->header_.length_ = 0;
00806     this->pkt_chain_ = 0;
00807     this->header_complete_ = false;
00808     this->start_counter_ = 0;
00809     this->mode_ = mode;
00810     this->mode_before_suspend_ = MODE_NOT_SET;
00811   }
00812 
00813   // We need remove the queued elements outside the lock,
00814   // otherwise we have a deadlock situation when remove vistor
00815   // calls the data_droped on each dropped elements.
00816 
00817   // Clear all samples in queue.
00818   RemoveAllVisitor remove_all_visitor;
00819 
00820   elems->accept_remove_visitor(remove_all_visitor);
00821   queue->accept_remove_visitor(remove_all_visitor);
00822 
00823   delete elems;
00824   delete queue;
00825 }

ACE_INLINE OpenDDS::DCPS::TransportQueueElement * OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element (  )  const [protected]

Definition at line 148 of file TransportSendStrategy.inl.

References elems_, and OpenDDS::DCPS::BasicQueue< T >::peek().

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i().

00149 {
00150   return this->elems_->peek();
00151 }

void OpenDDS::DCPS::TransportSendStrategy::direct_send ( bool  relink  )  [private]

Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.

Definition at line 1449 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), OpenDDS::DCPS::Transport_debug_level, transport_inst_, VDBG, and VDBG_LVL.

Referenced by send(), and send_stop().

01450 {
01451   DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
01452 
01453   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01454         "Prepare the current packet for a direct send attempt.\n"));
01455 
01456   // Prepare the packet for sending.
01457   this->prepare_packet();
01458 
01459   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01460         "Now attempt to send the packet.\n"));
01461 
01462   // We will try resend the packet if the send() fails and then connection
01463   // is re-established.  Only loops if the "continue" line is hit.
01464   while (true) {
01465     // Attempt to send the packet
01466     const SendPacketOutcome outcome = this->send_packet();
01467 
01468     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01469           "The outcome of the send_packet() was %d.\n", outcome));
01470 
01471     if ((outcome == OUTCOME_BACKPRESSURE) ||
01472         (outcome == OUTCOME_PARTIAL_SEND)) {
01473       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01474                 "The outcome of the send_packet() was either "
01475                 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
01476 
01477       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01478                 "Flip into the MODE_QUEUE mode_.\n"), 5);
01479 
01480       // We encountered backpressure, or only sent part of the packet.
01481       this->mode_ = MODE_QUEUE;
01482 
01483     } else if ((outcome == OUTCOME_PEER_LOST) ||
01484                (outcome == OUTCOME_SEND_ERROR)) {
01485       if (outcome == OUTCOME_SEND_ERROR) {
01486         ACE_ERROR((LM_WARNING,
01487                    ACE_TEXT("(%P|%t) WARNING: Problem detected in ")
01488                    ACE_TEXT("send buffer management: %p.\n"),
01489                    ACE_TEXT("send_bytes")));
01490 
01491         if (Transport_debug_level > 0) {
01492           this->transport_inst_->dump();
01493         }
01494       } else {
01495         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01496               "The outcome of the send_packet() was "
01497               "OUTCOME_PEER_LOST.\n"));
01498       }
01499 
01500       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01501                 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
01502 
01503       if (this->mode_ != MODE_SUSPEND) {
01504         this->mode_before_suspend_ = this->mode_;
01505         this->mode_ = MODE_SUSPEND;
01506       }
01507 
01508       if (relink) {
01509         bool do_suspend = false;
01510         this->relink(do_suspend);
01511 
01512         if (this->mode_ == MODE_SUSPEND) {
01513           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01514                     "The reconnect has not done yet and we are "
01515                     "still in MODE_SUSPEND.\n"), 5);
01516 
01517         } else if (this->mode_ == MODE_TERMINATED) {
01518           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01519                     "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
01520 
01521         } else {
01522           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01523                     "Try send the packet again since the connection "
01524                     "is re-established.\n"), 5);
01525 
01526           // Try send the packet again since the connection is re-established.
01527           continue;
01528         }
01529       }
01530 
01531     } else {
01532       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01533             "The outcome of the send_packet() must have been "
01534             "OUTCOME_COMPLETE_SEND.\n"));
01535       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01536             "So, we will just stay in MODE_DIRECT.\n"));
01537     }
01538 
01539     break;
01540   }
01541 
01542   // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
01543 }

RemoveResult OpenDDS::DCPS::TransportSendStrategy::do_remove_sample ( const RepoId pub_id,
const TransportQueueElement::MatchCriteria criteria 
) [protected, virtual]

Implement framework chain visitations to remove a sample.

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1346 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, OpenDDS::DCPS::TransportHeader::length_, MODE_DIRECT, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::QueueRemoveVisitor::status(), and VDBG.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample(), remove_all_msgs(), and remove_sample().

01348 {
01349   DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
01350 
01351   //ciju: Tim had the idea that we could do the following check
01352   // if ((this->mode_ == MODE_DIRECT) ||
01353   //     ((this->pkt_chain_ == 0) && (queue_ == empty)))
01354   // then we can assume that the sample can be safely removed (no need for
01355   // replacement) from the elems_ queue.
01356   if ((this->mode_ == MODE_DIRECT)
01357       || ((this->pkt_chain_ == 0) && (this->queue_->size() == 0))) {
01358     //ciju: I believe this is the only mode where a safe
01359     // assumption can be made that the samples
01360     // in the elems_ queue aren't part of a packet.
01361     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01362           "The mode is MODE_DIRECT, or the queue is empty and no "
01363           "transport packet is in progress.\n"));
01364 
01365     QueueRemoveVisitor simple_rem_vis(criteria);
01366     this->elems_->accept_remove_visitor(simple_rem_vis);
01367 
01368     const RemoveResult status = simple_rem_vis.status();
01369 
01370     if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01371       this->header_.length_ -= simple_rem_vis.removed_bytes();
01372 
01373     } else if (status == REMOVE_NOT_FOUND) {
01374       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01375             "Failed to find the sample to remove.\n"));
01376     }
01377 
01378     return status;
01379   }
01380 
01381   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01382         "Visit the queue_ with the RemoveElementVisitor.\n"));
01383 
01384   QueueRemoveVisitor simple_rem_vis(criteria);
01385   this->queue_->accept_remove_visitor(simple_rem_vis);
01386 
01387   RemoveResult status = simple_rem_vis.status();
01388 
01389   if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
01390     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01391           "The sample was removed from the queue_.\n"));
01392     // This means that the visitor did not encounter any fatal error
01393     // along the way, *AND* the sample was found in the queue_,
01394     // and has now been removed.  We are done.
01395     return status;
01396   }
01397 
01398   if (status == REMOVE_ERROR) {
01399     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01400           "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
01401     // This means that the visitor encountered some fatal error along
01402     // the way (and it already reported something to the log).
01403     // Return our failure code.
01404     return status;
01405   }
01406 
01407   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01408         "The RemoveElementVisitor did not find the sample in queue_.\n"));
01409 
01410   // We get here if the visitor did not encounter any fatal error, but it
01411   // also didn't find the sample - and hence it didn't perform any
01412   // "remove sample" logic.
01413 
01414   // Now we need to turn our attention to the current transport packet,
01415   // since the packet is likely in a "partially sent" state, and the
01416   // sample may still be contributing unsent bytes in the pkt_chain_.
01417 
01418   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01419         "Visit our elems_ with the PacketRemoveVisitor.\n"));
01420 
01421   PacketRemoveVisitor pac_rem_vis(criteria,
01422                                   this->pkt_chain_,
01423                                   this->header_block_,
01424                                   this->replaced_element_allocator_,
01425                                   this->replaced_element_mb_allocator_,
01426                                   this->replaced_element_db_allocator_);
01427 
01428   this->elems_->accept_replace_visitor(pac_rem_vis);
01429 
01430   status = pac_rem_vis.status();
01431 
01432   if (status == REMOVE_ERROR) {
01433     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01434           "The PacketRemoveVisitor encountered a fatal error.\n"));
01435 
01436   } else if (status == REMOVE_NOT_FOUND) {
01437     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01438           "The PacketRemoveVisitor didn't find the sample.\n"));
01439 
01440   } else {
01441     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01442           "The PacketRemoveVisitor found the sample and removed it.\n"));
01443   }
01444 
01445   return status;
01446 }

ssize_t OpenDDS::DCPS::TransportSendStrategy::do_send_packet ( const ACE_Message_Block *  packet,
int &  bp 
) [private]

Form an IOV and call the send_bytes() template method.

Definition at line 1719 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), send_bytes(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().

01720 {
01721   if (Transport_debug_level > 9) {
01722     ACE_DEBUG((LM_DEBUG,
01723                ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
01724                ACE_TEXT("sending data at 0x%x.\n"),
01725                id(), packet));
01726   }
01727   DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
01728 
01729   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01730             "Populate the iovec array using the packet.\n"), 5);
01731 
01732   iovec iov[MAX_SEND_BLOCKS];
01733 
01734   int num_blocks = mb_to_iov(*packet, iov);
01735 
01736   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01737             "There are [%d] number of entries in the iovec array.\n",
01738             num_blocks), 5);
01739 
01740   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01741             "Attempt to send_bytes() now.\n"), 5);
01742 
01743   const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp);
01744 
01745   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01746             "The send_bytes() said that num_bytes_sent == [%d].\n",
01747             num_bytes_sent), 5);
01748 
01749   return num_bytes_sent;
01750 }

ACE_INLINE ACE_HANDLE OpenDDS::DCPS::TransportSendStrategy::get_handle (  )  [virtual]

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 129 of file TransportSendStrategy.inl.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().

00130 {
00131   return ACE_INVALID_HANDLE;
00132 }

void OpenDDS::DCPS::TransportSendStrategy::get_packet_elems_from_queue (  )  [private]

This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.

After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.

Definition at line 1546 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, elems_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, max_samples_, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), space_available(), and VDBG_LVL.

Referenced by perform_work().

01547 {
01548   DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
01549 
01550   for (TransportQueueElement* element = this->queue_->peek(); element != 0;
01551        element = this->queue_->peek()) {
01552 
01553     // Total number of bytes in the current element's message block chain.
01554     size_t element_length = element->msg()->total_length();
01555 
01556     // Flag used to determine if the element requires a packet all to itself.
01557     const bool exclusive_packet = element->requires_exclusive_packet();
01558 
01559     const size_t avail = this->space_available();
01560 
01561     bool frag = false;
01562     if (element_length > avail) {
01563       // The current element won't fit into the current packet
01564       if (this->max_message_size()) { // fragmentation enabled
01565         this->header_.first_fragment_ = !element->is_fragment();
01566         VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting from queue\n"), 0);
01567         ElementPair ep = element->fragment(avail);
01568         element = ep.first;
01569         element_length = element->msg()->total_length();
01570         this->queue_->replace_head(ep.second);
01571         frag = true; // queue_ is already taken care of, don't get() later
01572       } else {
01573         break;
01574       }
01575     }
01576 
01577     // If exclusive and the current packet is empty, we won't violate the
01578     // exclusive_packet requirement by put()'ing the element
01579     // into the elems_ collection.
01580     if ((exclusive_packet && this->elems_->size() == 0)
01581         || !exclusive_packet) {
01582       // At this point, we have passed all of the pre-conditions and we can
01583       // now extract the current element from the queue_, put it into the
01584       // packet elems_, and adjust the packet header_.length_.
01585       this->elems_->put(frag ? element : this->queue_->get());
01586       if (this->header_.length_ == 0) {
01587         this->header_.last_fragment_ = !frag && element->is_fragment();
01588       }
01589       this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01590       VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Packetizing from queue\n"), 0);
01591     }
01592 
01593     // With exclusive and (elems_.size() != 0), we don't use the current
01594     // element as part of the packet.  We know that there is already
01595     // at least one element in the packet, and the current element
01596     // is going to need its own (exclusive) packet.  We will just
01597     // use the packet elems_ as it is now.  Always break once
01598     // we've encountered and dealt with the exclusive_packet case.
01599     // Also break if fragmentation was required.
01600     if (exclusive_packet || frag
01601         // If the current number of packet elems_ has reached the maximum
01602         // number of samples per packet, then we are done.
01603         || this->elems_->size() == this->max_samples_
01604         // If the current value of the header_.length_ exceeds (or equals)
01605         // the optimum_size_ for a packet, then we are done.
01606         || this->header_.length_ >= this->optimum_size_) {
01607       break;
01608     }
01609   }
01610 }

ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode (  ) 

Definition at line 115 of file TransportSendStrategy.inl.

References mode_, and MODE_DIRECT.

00116 {
00117   return this->mode_ == MODE_DIRECT;
00118 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released ( bool  flag  ) 

Definition at line 41 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, and link_released_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::MulticastSendStrategy().

00042 {
00043   DBG_ENTRY_LVL("TransportSendStrategy","link_released",6);
00044 
00045   GuardType guard(this->lock_);
00046   this->link_released_ = flag;
00047 }

void OpenDDS::DCPS::TransportSendStrategy::marshal_transport_header ( ACE_Message_Block *  mb  )  [private, virtual]

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1699 of file TransportSendStrategy.cpp.

References header_.

Referenced by prepare_packet().

01700 {
01701   *mb << this->header_;
01702 }

ACE_INLINE size_t OpenDDS::DCPS::TransportSendStrategy::max_message_size (  )  const [protected, virtual]

The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.

Reimplemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Definition at line 142 of file TransportSendStrategy.inl.

Referenced by send(), and space_available().

00143 {
00144   return 0;
00145 }

int OpenDDS::DCPS::TransportSendStrategy::mb_to_iov ( const ACE_Message_Block &  msg,
iovec *  iov 
) [static]

Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.

Definition at line 1898 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::MAX_SEND_BLOCKS.

Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().

01899 {
01900   int num_blocks = 0;
01901 #ifdef _MSC_VER
01902 #pragma warning(push)
01903 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
01904 // since on other platforms iov_len is 64-bit
01905 #pragma warning(disable : 4267)
01906 #endif
01907   for (const ACE_Message_Block* block = &msg;
01908        block && num_blocks < MAX_SEND_BLOCKS;
01909        block = block->cont()) {
01910     iov[num_blocks].iov_len = block->length();
01911     iov[num_blocks++].iov_base = block->rd_ptr();
01912   }
01913 #ifdef _MSC_VER
01914 #pragma warning(pop)
01915 #endif
01916   return num_blocks;
01917 }

ACE_INLINE OpenDDS::DCPS::TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode (  )  const

Access the current sending mode.

Definition at line 13 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, and mode_.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().

00014 {
00015   DBG_ENTRY_LVL("TransportSendStrategy","mode",6);
00016 
00017   return mode_;
00018 }

ACE_INLINE const char * OpenDDS::DCPS::TransportSendStrategy::mode_as_str ( SendMode  mode  )  [static, private]

Helper function to debugging.

Definition at line 101 of file TransportSendStrategy.inl.

Referenced by perform_work(), send(), and send_stop().

00102 {
00103   static const char* SendModeStr[] = { "MODE_NOT_SET",
00104                                        "MODE_DIRECT",
00105                                        "MODE_QUEUE",
00106                                        "MODE_SUSPEND",
00107                                        "MODE_TERMINATED",
00108                                        "UNKNOWN"
00109                                      };
00110 
00111   return SendModeStr [mode];
00112 }

ssize_t OpenDDS::DCPS::TransportSendStrategy::non_blocking_send ( const iovec  iov[],
int  n,
int &  bp 
) [protected, virtual]

Definition at line 1825 of file TransportSendStrategy.cpp.

References get_handle(), send_bytes_i(), VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().

01826 {
01827   int val = 0;
01828   ACE_HANDLE handle = this->get_handle();
01829 
01830   if (handle == ACE_INVALID_HANDLE)
01831     return -1;
01832 
01833   ACE::record_and_set_non_blocking_mode(handle, val);
01834 
01835   // Set the back-pressure flag to false.
01836   bp = 0;
01837 
01838   // Clear errno
01839   errno = 0;
01840 
01841   ssize_t result = this->send_bytes_i(iov, n);
01842 
01843   if (result == -1) {
01844     if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
01845       VDBG((LM_DEBUG,"(%P|%t) DBG:   "
01846             "Backpressure encountered.\n"));
01847       // Set the back-pressure flag to true
01848       bp = 1;
01849 
01850     } else {
01851       VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
01852                 ACE_TEXT("sendv"), n),1);
01853 
01854       // try to get the application to core when "Bad Address" is returned
01855       // by looking at the iovec
01856       for (int ii = 0; ii < n; ii++) {
01857         ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
01858                    ii, iov[ii].iov_len, iov[ii].iov_base));
01859       }
01860     }
01861   }
01862 
01863   VDBG_LVL((LM_DEBUG,"(%P|%t) DBG:   "
01864             "The sendv() returned [%d].\n", result), 5);
01865 
01866   ACE::restore_non_blocking_mode(handle, val);
01867 
01868   return result;
01869 }

OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR ( TQESendModePair   )  [private]

Referenced by send_delayed_notifications().

ThreadSynchWorker::WorkOutcome OpenDDS::DCPS::TransportSendStrategy::perform_work (  )  [virtual]

Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.

Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.

Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.

Implements OpenDDS::DCPS::ThreadSynchWorker.

Definition at line 138 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_delayed_notifications(), send_packet(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynch::work_available(), OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.

00139 {
00140   DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
00141 
00142   SendPacketOutcome outcome;
00143   bool no_more_work = false;
00144 
00145   { // scope for the guard(this->lock_);
00146     GuardType guard(this->lock_);
00147 
00148     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5);
00149 
00150     if (this->mode_ == MODE_TERMINATED) {
00151       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00152                 "Entered perform_work() and mode_ is MODE_TERMINATED - "
00153                 "we lost connection and could not reconnect, just return "
00154                 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00155       return WORK_OUTCOME_BROKEN_RESOURCE;
00156     }
00157 
00158     // The perform_work() is called by our synch_ object using
00159     // a thread designated to call this method when it thinks
00160     // we need to be called in order to "service" the queue_ and/or
00161     // deal with a partially sent current packet.
00162     //
00163     // We will return a 0 if we don't see a need to have our perform_work()
00164     // called again, and we will return a 1 if we do see the need to have our
00165     // perform_work() method called again.
00166 
00167     // First, make sure that the mode_ indicates that we are, indeed, in
00168     // the MODE_QUEUE mode.  If we are not in MODE_QUEUE mode (meaning we are
00169     // in MODE_DIRECT), then it means we didn't need to have this perform_work()
00170     // method called - in this case, do nothing other than return
00171     // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
00172     // see a need for it to call our perform_work() again (at least not
00173     // right now).
00174     if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) {
00175       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00176                 "Entered perform_work() and mode_ is %C - just return "
00177                 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5);
00178       return WORK_OUTCOME_NO_MORE_TO_DO;
00179     }
00180 
00181     // Check the "state" of the current packet.  We will either find that the
00182     // current packet is in a state of being "partially sent", or we will find
00183     // it in a state of being "empty".  When the current packet is "empty", it
00184     // means that it is time to build up the current packet using elements
00185     // extracted from the queue_, and then we will attempt to send the
00186     // packet.  When we find the current packet in the "partially sent" state,
00187     // we will not touch the queue_ - we will just try to send the unsent
00188     // bytes in the current (partially sent) packet.
00189     const size_t header_length = this->header_.length_;
00190 
00191     if (header_length == 0) {
00192       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00193                 "The current packet doesn't have any unsent bytes - we "
00194                 "need to 'populate' the current packet with elems from "
00195                 "the queue.\n"), 5);
00196 
00197       // The current packet is "empty".  Build up the current packet using
00198       // elements from the queue_, and prepare the current packet to be sent.
00199 
00200       // Before we build the packet from the queue_, let's make sure that
00201       // there is actually something on the queue_ to build from.
00202       if (this->queue_->size() == 0) {
00203         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00204                   "But the queue is empty.  We have cleared the "
00205                   "backpressure situation.\n"),5);
00206         // We are here because the queue_ is empty, and there isn't
00207         // any "partial packet" bytes left to send.  We have overcome
00208         // the backpressure situation and don't have anything to do
00209         // right now.
00210 
00211         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00212                   "Flip mode to MODE_DIRECT, and return "
00213                   "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00214 
00215         // Flip the mode back to MODE_DIRECT.
00216         this->mode_ = MODE_DIRECT;
00217 
00218         // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
00219         // perform_work() doesn't need to be called again (at this time).
00220         return WORK_OUTCOME_NO_MORE_TO_DO;
00221       }
00222 
00223       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00224                 "There is at least one elem in the queue - get the packet "
00225                 "elems from the queue.\n"), 5);
00226 
00227       // There is stuff in the queue_ if we get to this point in the logic.
00228       // Build-up the current packet using element(s) from the queue_.
00229       this->get_packet_elems_from_queue();
00230 
00231       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00232                 "Prepare the packet from the packet elems_.\n"), 5);
00233 
00234       // Now we can prepare the new packet to be sent.
00235       this->prepare_packet();
00236 
00237       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00238                 "Packet has been prepared from packet elems_.\n"), 5);
00239 
00240     } else {
00241       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00242                 "We have a current packet that still has unsent bytes.\n"), 5);
00243     }
00244 
00245     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00246               "Attempt to send the current packet.\n"), 5);
00247 
00248     // Now we can attempt to send the current packet - whether it is
00249     // a "partially sent" packet or one that we just built-up using elements
00250     // from the queue_ (and subsequently prepared for sending) - it doesn't
00251     // matter.  Just attempt to send as many of the "unsent" bytes in the
00252     // packet as possible.
00253     outcome = this->send_packet();
00254 
00255     // If we sent the whole packet (eg, partial_send is false), and the queue_
00256     // is now empty, then we've cleared the backpressure situation.
00257     if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_->size() == 0)) {
00258       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00259                 "Flip the mode to MODE_DIRECT, and then return "
00260                 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
00261 
00262       // Revert back to MODE_DIRECT mode.
00263       this->mode_ = MODE_DIRECT;
00264       no_more_work = true;
00265     }
00266   } // End of scope for guard(this->lock_);
00267 
00268   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00269             "The outcome of the send_packet() was %d.\n", outcome), 5);
00270 
00271   send_delayed_notifications();
00272 
00273   // If we sent the whole packet (eg, partial_send is false), and the queue_
00274   // is now empty, then we've cleared the backpressure situation.
00275   if (no_more_work) {
00276     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00277               "We sent the whole packet, and there is nothing left on "
00278               "the queue now.\n"), 5);
00279 
00280     // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00281     // don't desire another call to this perform_work() method.
00282     return WORK_OUTCOME_NO_MORE_TO_DO;
00283   }
00284 
00285   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00286             "We still have unsent bytes in the current packet AND/OR there "
00287             "are still elements in the queue.\n"), 5);
00288 
00289   if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
00290     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00291               "We lost our connection, or had some fatal connection "
00292               "error.  Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
00293 
00294     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00295               "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
00296 
00297     bool do_suspend = true;
00298     this->relink(do_suspend);
00299 
00300     if (this->mode_ == MODE_SUSPEND) {
00301       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00302                 "The reconnect has not done yet and we are still in MODE_SUSPEND. "
00303                 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00304       // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
00305       // don't desire another call to this perform_work() method.
00306       return WORK_OUTCOME_NO_MORE_TO_DO;
00307 
00308     } else if (this->mode_ == MODE_TERMINATED) {
00309       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00310                 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
00311       return WORK_OUTCOME_BROKEN_RESOURCE;
00312 
00313     } else {
00314       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00315                 "Reconnect succeeded, Notify synch thread of work "
00316                 "availability.\n"), 5);
00317       // If the datalink is re-established then notify the synch
00318       // thread to perform work.  We do not hold the object lock at
00319       // this point.
00320       this->synch_->work_available();
00321     }
00322   }
00323 
00324   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00325             "We still have an 'unbroken' connection.\n"), 5);
00326 
00327   if (outcome == OUTCOME_BACKPRESSURE) {
00328     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00329               "We experienced backpressure on our attempt to send the "
00330               "packet.  Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
00331     // We have a "clogged resource".
00332     return WORK_OUTCOME_CLOGGED_RESOURCE;
00333   }
00334 
00335   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00336             "We may have sent the whole current packet, but still have "
00337             "elements on the queue.\n"), 5);
00338   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00339             "Or, we may have only partially sent the current packet.\n"), 5);
00340 
00341   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00342             "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
00343 
00344   // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
00345   // in the queue_ to be sent.  *OR* we have have had an OUTCOME_PARTIAL_SEND,
00346   // which equates to the same thing - we still have work to do.
00347 
00348   // We are still in MODE_QUEUE mode, thus there is still work to be
00349   // done to service the queue_ and/or a partially sent current packet.
00350   // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
00351   // want it to call this perform_work() method.
00352   return WORK_OUTCOME_MORE_TO_DO;
00353 }

void OpenDDS::DCPS::TransportSendStrategy::prepare_header (  )  [private]

This method is responsible for updating the packet header. Called exclusively by prepare_packet.

Definition at line 1613 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.

Referenced by prepare_packet().

01614 {
01615   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
01616 
01617   // Increment header sequence for packet:
01618   this->header_.sequence_ = ++this->header_sequence_;
01619 
01620   // Allow the specific implementation the opportunity to set
01621   // values in the packet header.
01622   this->prepare_header_i();
01623 }

void OpenDDS::DCPS::TransportSendStrategy::prepare_header_i (  )  [protected, virtual]

Specific implementation processing of prepared packet header.

Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.

Definition at line 1626 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_header().

01627 {
01628   DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
01629 
01630   // Default implementation does nothing.
01631 }

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet (  )  [private]

This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.

Definition at line 1634 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::BuildChainVisitor::chain(), DBG_ENTRY_LVL, elems_, header_block_, header_complete_, marshal_transport_header(), pkt_chain_, prepare_header(), prepare_packet_i(), and VDBG.

Referenced by direct_send(), and perform_work().

01635 {
01636   DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
01637 
01638   // Prepare the header for sending.
01639   this->prepare_header();
01640 
01641   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01642         "Marshal the packet header.\n"));
01643 
01644   if (this->header_block_ != 0) {
01645     this->header_block_->release();
01646   }
01647 
01648   ACE_NEW_MALLOC(this->header_block_,
01649     static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()),
01650     ACE_Message_Block(this->max_header_size_,
01651                       ACE_Message_Block::MB_DATA,
01652                       0,
01653                       0,
01654                       0,
01655                       0,
01656                       ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
01657                       ACE_Time_Value::zero,
01658                       ACE_Time_Value::max_time,
01659                       this->header_db_allocator_,
01660                       this->header_mb_allocator_));
01661 
01662   marshal_transport_header(this->header_block_);
01663 
01664   this->pkt_chain_ = this->header_block_->duplicate();
01665 
01666   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01667         "Use a BuildChainVisitor to visit the packet elems_.\n"));
01668 
01669   // Build up a chain of blocks by duplicating the message block chain
01670   // held by each element (in elems_), and then chaining the new duplicate
01671   // blocks together to form one long chain.
01672   BuildChainVisitor visitor;
01673   this->elems_->accept_visitor(visitor);
01674 
01675   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01676         "Attach the visitor's chain of blocks to the lone (packet "
01677         "header) block currently in the pkt_chain_.\n"));
01678 
01679   // Attach the visitor's chain of blocks to the packet header block.
01680   this->pkt_chain_->cont(visitor.chain());
01681 
01682   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01683         "Increment header sequence for next packet.\n"));
01684 
01685   // Allow the specific implementation the opportunity to process the
01686   // newly prepared packet.
01687   this->prepare_packet_i();
01688 
01689   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01690         "Set the header_complete_ flag to false.\n"));
01691 
01692   // Set the header_complete_ to false to indicate
01693   // that the first block in the pkt_chain_ is the packet header block
01694   // (actually a duplicate() of the packet header_block_).
01695   this->header_complete_ = false;
01696 }

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet_i (  )  [protected, virtual]

Specific implementation processing of prepared packet.

Definition at line 1705 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_packet().

01706 {
01707   DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
01708 
01709   // Default implementation does nothing.
01710 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::relink ( bool  do_suspend = true  )  [virtual]

The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 50 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL.

Referenced by direct_send(), and perform_work().

00051 {
00052   DBG_ENTRY_LVL("TransportSendStrategy","relink",6);
00053   // The subsclass needs implement this function for re-establishing
00054   // the link upon send failure.
00055 }

void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs ( RepoId  pub_id  ) 

Definition at line 1299 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().

01300 {
01301   DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
01302 
01303   const TransportQueueElement::MatchOnPubId match(pub_id);
01304   send_delayed_notifications(&match);
01305 
01306   GuardType guard(this->lock_);
01307 
01308   if (this->send_buffer_ != 0) {
01309     // If a secondary send buffer is bound, removed samples must
01310     // be retained in order to properly maintain the buffer:
01311     this->send_buffer_->retain_all(pub_id);
01312   }
01313 
01314   do_remove_sample(pub_id, match);
01315 }

RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample ( const DataSampleElement sample  ) 

Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.

Definition at line 1318 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.

01319 {
01320   DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
01321 
01322   VDBG_LVL((LM_DEBUG, "(%P|%t)  Removing sample: %@\n", sample->get_sample()), 5);
01323 
01324   // The sample to remove is either in temporary delayed notification list or
01325   // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
01326   // notification list by transport thread, it needs acquire WriterDataContainer lock for
01327   // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
01328   // complete as this call already hold the WriterContainer's lock. So this call is safe to
01329   // access the sample to remove. If it's going to be removed by this remove_sample() calling
01330   // thread, it will be removed either from delayed notification list or from internal list
01331   // in which case the element carry the info if the sample is released so the datalinkset
01332   // can stop calling rest datalinks to remove this sample if it's already released..
01333 
01334   const char* const payload = sample->get_sample()->cont()->rd_ptr();
01335   RepoId pub_id = sample->get_pub_id();
01336   const TransportQueueElement::MatchOnDataPayload modp(payload);
01337   if (send_delayed_notifications(&modp)) {
01338     return REMOVE_RELEASED;
01339   }
01340 
01341   GuardType guard(this->lock_);
01342   return do_remove_sample(pub_id, modp);
01343 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send (  ) 

This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.

Definition at line 70 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, and start_counter_.

00071 {
00072   DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6);
00073   GuardType guard(this->lock_);
00074 
00075   // If this send strategy is reused when the connection is reestablished, then
00076   // we need re-initialize the mode_ and mode_before_suspend_.
00077   if (this->mode_ == MODE_TERMINATED) {
00078     this->header_.length_ = 0;
00079     this->pkt_chain_ = 0;
00080     this->header_complete_ = false;
00081     this->start_counter_ = 0;
00082     this->mode_ = MODE_DIRECT;
00083     this->mode_before_suspend_ = MODE_NOT_SET;
00084     this->delayed_delivered_notification_queue_.clear();
00085 
00086   } else if (this->mode_ == MODE_SUSPEND) {
00087     this->mode_ = this->mode_before_suspend_;
00088     this->mode_before_suspend_ = MODE_NOT_SET;
00089     if (this->queue_->size() > 0) {
00090       this->mode_ = MODE_QUEUE;
00091       this->synch_->work_available();
00092     }
00093 
00094   } else {
00095     ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send  The suspend or terminate"
00096                " is not called previously.\n"));
00097   }
00098 }

void OpenDDS::DCPS::TransportSendStrategy::send ( TransportQueueElement element,
bool  relink = true 
)

Our DataLink has been requested by some particular TransportClient to send the element.

Definition at line 925 of file TransportSendStrategy.cpp.

References add_delayed_notification(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_message_size(), max_samples_, OpenDDS::DCPS::MIN_FRAG, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), space_available(), synch_, OpenDDS::DCPS::Transport_debug_level, VDBG, VDBG_LVL, and OpenDDS::DCPS::ThreadSynch::work_available().

00926 {
00927   if (Transport_debug_level > 9) {
00928     ACE_DEBUG((LM_DEBUG,
00929                ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
00930                ACE_TEXT("sending data at 0x%x.\n"),
00931                id(), element));
00932   }
00933 
00934   DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
00935 
00936   {
00937     GuardType guard(this->lock_);
00938 
00939     if (this->link_released_) {
00940       this->add_delayed_notification(element);
00941 
00942     } else {
00943       if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
00944         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00945               "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
00946               "graceful disconnecting, so discard message.\n"));
00947         element->data_dropped(true);
00948         return;
00949       }
00950 
00951       size_t element_length = element->msg()->total_length();
00952 
00953       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00954             "Send element msg() has total_length() == [%d].\n",
00955             element_length));
00956 
00957       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00958             "this->max_header_size_ == [%d].\n",
00959             this->max_header_size_));
00960 
00961       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00962             "this->max_size_ == [%d].\n",
00963             this->max_size_));
00964 
00965       const size_t max_message_size = this->max_message_size();
00966 
00967       // Really an assert.  We can't accept any element that wouldn't fit into
00968       // a transport packet by itself (ie, it would be the only element in the
00969       // packet).  This max_size_ is the user-configurable maximum, not based
00970       // on the transport's inherent maximum message size.  If max_message_size
00971       // is non-zero, we will fragment so max_size_ doesn't apply per-element.
00972       if (max_message_size == 0 &&
00973           this->max_header_size_ + element_length > this->max_size_) {
00974         ACE_ERROR((LM_ERROR,
00975                    "(%P|%t) ERROR: Element too large (%Q) "
00976                    "- won't fit into packet.\n", ACE_UINT64(element_length)));
00977         return;
00978       }
00979 
00980       // Check the mode_ to see if we simply put the element on the queue.
00981       if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
00982         VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
00983                   "this->mode_ == %C, so queue elem and leave.\n",
00984                   mode_as_str(this->mode_)), 5);
00985 
00986         this->queue_->put(element);
00987 
00988         if (this->mode_ != MODE_SUSPEND) {
00989           this->synch_->work_available();
00990         }
00991 
00992         return;
00993       }
00994 
00995       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00996             "this->mode_ == MODE_DIRECT.\n"));
00997 
00998       // We are in the MODE_DIRECT send mode.  When in this mode, the send()
00999       // calls will "build up" the transport packet to be sent directly when it
01000       // reaches the optimal size, contains the maximum number of samples, etc.
01001 
01002       // We need to check if the current element (the arg passed-in to this
01003       // send() method) should be appended to the transport packet, or if the
01004       // transport packet should be sent (directly) first, dealing with the
01005       // current element afterwards.
01006 
01007       // We will decide to send the packet as it is now, under two circumstances:
01008       //
01009       //    Either:
01010       //
01011       //    (1) The current element won't fit into the current packet since it
01012       //        would violate the max_packet_size_.
01013       //
01014       //    -OR-
01015       //
01016       //    (2) There is at least one element already in the current packet,
01017       //        and the current element says that it must be sent in an
01018       //        exclusive packet (ie, in a packet all by itself).
01019       //
01020       const bool exclusive = element->requires_exclusive_packet();
01021 
01022       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01023             "The element %C require an exclusive packet.\n",
01024             (exclusive ? "DOES" : "does NOT")
01025           ));
01026 
01027       const size_t space_needed =
01028         (max_message_size > 0)
01029         ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG
01030         : /* not fragmenting */ element_length;
01031 
01032       if ((exclusive && (this->elems_->size() != 0))
01033           || (this->space_available() < space_needed)) {
01034 
01035         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01036               "Element won't fit in current packet or requires exclusive"
01037               " - send current packet (directly) now.\n"));
01038 
01039         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01040               "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
01041               , this->max_header_size_, this->header_.length_, element_length));
01042 
01043         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01044               "Tot possible length: %d, max_len: %d\n"
01045               , this->max_header_size_ + this->header_.length_ + element_length
01046               , this->max_size_));
01047         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01048               "current elem size: %d\n"
01049               , this->elems_->size()));
01050 
01051         // Send the current packet, and deal with the current element
01052         // afterwards.
01053         // The invocation's relink status should dictate the direct_send's
01054         // relink. We don't want a (relink == false) invocation to end up
01055         // doing a relink. Think of (relink == false) as a non-blocking call.
01056         this->direct_send(relink);
01057 
01058         // Now check to see if we flipped into MODE_QUEUE, which would mean
01059         // that the direct_send() experienced backpressure, and the
01060         // packet was only partially sent.  If this has happened, we deal with
01061         // the current element by placing it on the queue (and then we are done).
01062         //
01063         // Otherwise, if the mode_ is still MODE_DIRECT, we can just
01064         // "drop" through to the next step in the logic where we append the
01065         // current element to the current packet.
01066         if (this->mode_ == MODE_QUEUE) {
01067           VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01068                     "We experienced backpressure on that direct send, as "
01069                     "the mode_ is now MODE_QUEUE or MODE_SUSPEND.  "
01070                     "Queue elem and leave.\n"), 5);
01071           this->queue_->put(element);
01072           this->synch_->work_available();
01073 
01074           return;
01075         }
01076       }
01077 
01078       // Loop for sending 'element', in fragments if needed
01079       bool first_pkt = true; // enter the loop 1st time through unconditionally
01080       for (TransportQueueElement* next_fragment = 0;
01081            (first_pkt || next_fragment)
01082            && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) {
01083            // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
01084 
01085         if (next_fragment) {
01086           element = next_fragment;
01087           element_length = next_fragment->msg()->total_length();
01088           this->header_.first_fragment_ = false;
01089         }
01090 
01091         this->header_.last_fragment_ = false;
01092         if (max_message_size) { // fragmentation enabled
01093           const size_t avail = this->space_available();
01094           if (element_length > avail) {
01095             VDBG_LVL((LM_TRACE, "(%P|%t) DBG:   Fragmenting\n"), 0);
01096             ElementPair ep = element->fragment(avail);
01097             element = ep.first;
01098             element_length = element->msg()->total_length();
01099             next_fragment = ep.second;
01100             this->header_.first_fragment_ = first_pkt;
01101           } else if (next_fragment) {
01102             // We are sending the "tail" element of a previous fragment()
01103             // operation, and this element didn't itself require fragmentation
01104             this->header_.last_fragment_ = true;
01105             next_fragment = 0;
01106           }
01107         }
01108         first_pkt = false;
01109 
01110         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01111               "Start the 'append elem' to current packet logic.\n"));
01112 
01113         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01114               "Put element into current packet elems_.\n"));
01115 
01116         // Now that we know the current element should go into the current
01117         // packet, we can just go ahead and "append" the current element to
01118         // the current packet.
01119 
01120         // Add the current element to the collection of packet elements.
01121         this->elems_->put(element);
01122 
01123         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01124               "Before, the header_.length_ == [%d].\n",
01125               this->header_.length_));
01126 
01127         // Adjust the header_.length_ to account for the length of the element.
01128         this->header_.length_ += static_cast<ACE_UINT32>(element_length);
01129         const size_t message_length = this->header_.length_;
01130 
01131         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01132               "After adding element's length, the header_.length_ == [%d].\n",
01133               message_length));
01134 
01135         // The current packet now contains the current element.  We need to
01136         // check to see if the conditions are such that we should go ahead and
01137         // attempt to send the packet "directly" now, or if we can just leave
01138         // and send the current packet later (in another send() call or in a
01139         // send_stop() call).
01140 
01141         // There a few conditions that will cause us to attempt to send the
01142         // packet (directly) right now:
01143         // - Fragmentation was needed
01144         // - The current packet has the maximum number of samples per packet.
01145         // - The current packet's total length exceeds the optimum packet size.
01146         // - The current element (currently part of the packet elems_)
01147         //   requires an exclusive packet.
01148         //
01149         if (next_fragment || (this->elems_->size() >= this->max_samples_)
01150             || (this->max_header_size_ + message_length > this->optimum_size_)
01151             || exclusive) {
01152           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01153                 "Now the current packet looks full - send it (directly).\n"));
01154 
01155           this->direct_send(relink);
01156 
01157           if (next_fragment && this->mode_ != MODE_DIRECT) {
01158             if (this->mode_ == MODE_QUEUE) {
01159               this->queue_->put(next_fragment);
01160               this->synch_->work_available();
01161 
01162             } else {
01163               next_fragment->data_dropped(true /* dropped by transport */);
01164             }
01165           } else if (mode_ == MODE_QUEUE) {
01166             // Background thread handles packets in progress
01167             this->synch_->work_available();
01168           }
01169 
01170           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01171                 "Back from the direct_send() attempt.\n"));
01172 
01173           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01174                 "And we %C as a result of the direct_send() call.\n",
01175                 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
01176                                              : "stayed in MODE_DIRECT")));
01177 
01178         } else {
01179           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01180                 "Packet not sent. Send conditions weren't satisfied.\n"));
01181           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01182                 "elems_->size(): %d, max_samples_: %d\n",
01183                 int(this->elems_->size()), int(this->max_samples_)));
01184           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01185                 "header_size_: %d, optimum_size_: %d\n",
01186                 int(this->max_header_size_ + message_length),
01187                 int(this->optimum_size_)));
01188           VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01189                 "element_requires_exclusive_packet: %d\n", int(exclusive)));
01190 
01191           if (this->mode_ == MODE_QUEUE) {
01192             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01193                   "We flipped into MODE_QUEUE.\n"));
01194 
01195           } else {
01196             VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01197                   "We stayed in MODE_DIRECT.\n"));
01198           }
01199         }
01200       }
01201     }
01202   }
01203 
01204   send_delayed_notifications();
01205 }

void OpenDDS::DCPS::TransportSendStrategy::send_buffer ( TransportSendBuffer send_buffer  ) 

Assigns an optional send buffer.

Definition at line 128 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.

Referenced by OpenDDS::DCPS::MulticastDataLink::send_strategy().

00129 {
00130   this->send_buffer_ = send_buffer;
00131 
00132   if (this->send_buffer_ != 0) {
00133     this->send_buffer_->bind(this);
00134   }
00135 }

ACE_INLINE ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes ( const iovec  iov[],
int  n,
int &  bp 
) [protected, virtual]

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 121 of file TransportSendStrategy.inl.

References send_bytes_i().

Referenced by do_send_packet().

00124 {
00125   return send_bytes_i(iov, n);
00126 }

virtual ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
) [protected, pure virtual]

Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Referenced by non_blocking_send(), and send_bytes().

bool OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications ( const TransportQueueElement::MatchCriteria match = 0  )  [private]

If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.

Definition at line 663 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_shutdown_.

Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().

00664 {
00665   DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
00666   TransportQueueElement* sample = 0;
00667   SendMode mode = MODE_NOT_SET;
00668 
00669   OPENDDS_VECTOR(TQESendModePair) samples;
00670 
00671   size_t num_delayed_notifications = 0;
00672   bool found_element = false;
00673 
00674   {
00675     GuardType guard(lock_);
00676 
00677     num_delayed_notifications = delayed_delivered_notification_queue_.size();
00678 
00679     if (num_delayed_notifications == 0) {
00680       return false;
00681 
00682     } else if (num_delayed_notifications == 1) {
00683       // Optimization for the most common case (doesn't need vectors)
00684 
00685       if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
00686         found_element = true;
00687         sample = delayed_delivered_notification_queue_[0].first;
00688         mode = delayed_delivered_notification_queue_[0].second;
00689 
00690         delayed_delivered_notification_queue_.clear();
00691       }
00692 
00693     } else {
00694       OPENDDS_VECTOR(TQESendModePair)::iterator iter;
00695       for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
00696         sample = iter->first;
00697         mode = iter->second;
00698         if (!match || match->matches(*sample)) {
00699           found_element = true;
00700           samples.push_back(*iter);
00701           iter = delayed_delivered_notification_queue_.erase(iter);
00702         } else {
00703           ++iter;
00704         }
00705       }
00706     }
00707   }
00708 
00709   if (!found_element)
00710     return false;
00711 
00712   if (num_delayed_notifications == 1) {
00713     // optimization for the common case
00714     if (mode == MODE_TERMINATED) {
00715       if (!transport_shutdown_ || sample->owned_by_transport()) {
00716         sample->data_dropped(true);
00717       }
00718     } else {
00719       if (!transport_shutdown_ || sample->owned_by_transport()) {
00720         sample->data_delivered();
00721       }
00722     }
00723 
00724   } else {
00725     for (size_t i = 0; i < samples.size(); ++i) {
00726       if (samples[i].second == MODE_TERMINATED) {
00727         if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00728           samples[i].first->data_dropped(true);
00729         }
00730       } else {
00731         if (!transport_shutdown_ || samples[i].first->owned_by_transport()) {
00732           samples[i].first->data_delivered();
00733         }
00734       }
00735     }
00736   }
00737   return true;
00738 }

TransportSendStrategy::SendPacketOutcome OpenDDS::DCPS::TransportSendStrategy::send_packet (  )  [private]

This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().

Definition at line 1753 of file TransportSendStrategy.cpp.

References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), OpenDDS::DCPS::TransportSendBuffer::insert(), OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, send_buffer_, VDBG, and VDBG_LVL.

Referenced by direct_send(), and perform_work().

01754 {
01755   DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
01756 
01757   int bp_flag = 0;
01758   const ssize_t num_bytes_sent =
01759     this->do_send_packet(this->pkt_chain_, bp_flag);
01760 
01761   if (num_bytes_sent == 0) {
01762     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01763               "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
01764     // This means that the peer has disconnected.
01765     return OUTCOME_PEER_LOST;
01766   }
01767 
01768   if (num_bytes_sent < 0) {
01769     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01770               "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
01771 
01772     // Check for backpressure...
01773     if (bp_flag == 1) {
01774       VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01775                 "Since backpressure flag is true, return "
01776                 "OUTCOME_BACKPRESSURE.\n"), 5);
01777       // Ok.  Not really an error - just backpressure.
01778       return OUTCOME_BACKPRESSURE;
01779     }
01780 
01781     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01782               "Since backpressure flag is false, return "
01783               "OUTCOME_SEND_ERROR.\n"), 5);
01784 
01785     // Not backpressure - it's a real error.
01786     // Note: moved this to send_bytes so the errno msg could be written.
01787     //ACE_ERROR((LM_ERROR,
01788     //           "(%P|%t) ERROR: Call to peer().send() failed with negative "
01789     //           "return code.\n"));
01790 
01791     return OUTCOME_SEND_ERROR;
01792   }
01793 
01794   if (this->send_buffer_ != 0) {
01795     // If a secondary send buffer is bound, sent samples must
01796     // be inserted in order to properly maintain the buffer:
01797     this->send_buffer_->insert(this->header_.sequence_,
01798       this->elems_, this->pkt_chain_);
01799   }
01800 
01801   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   "
01802             "Since num_bytes_sent > 0, adjust the packet to account for "
01803             "the bytes that did get sent.\n"),5);
01804 
01805   // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
01806   // to account for the bytes that have been sent.
01807   const int result =
01808     this->adjust_packet_after_send(num_bytes_sent);
01809 
01810   if (result == 0) {
01811     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01812           "The adjustment logic says that the complete packet was "
01813           "sent.  Return OUTCOME_COMPLETE_SEND.\n"));
01814     return OUTCOME_COMPLETE_SEND;
01815   }
01816 
01817   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01818         "The adjustment logic says that only a part of the packet was "
01819         "sent. Return OUTCOME_PARTIAL_SEND.\n"));
01820 
01821   return OUTCOME_PARTIAL_SEND;
01822 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start (  ) 

Invoked prior to one or more send() invocations from a particular TransportClient.

Definition at line 30 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, and start_counter_.

00031 {
00032   DBG_ENTRY_LVL("TransportSendStrategy","send_start",6);
00033 
00034   GuardType guard(this->lock_);
00035 
00036   if (!this->link_released_)
00037     ++this->start_counter_;
00038 }

void OpenDDS::DCPS::TransportSendStrategy::send_stop ( RepoId  repoId  ) 

Invoked after one or more send() invocations from a particular TransportClient.

Definition at line 1208 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, direct_send(), header_, OpenDDS::DCPS::TransportHeader::length_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), start_counter_, synch_, VDBG, VDBG_LVL, and OpenDDS::DCPS::ThreadSynch::work_available().

01209 {
01210   DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
01211   {
01212     GuardType guard(this->lock_);
01213 
01214     if (this->link_released_)
01215       return;
01216 
01217     if (this->start_counter_ == 0) {
01218       // This is an indication of a logic error.  This is more of an assert.
01219       VDBG_LVL((LM_ERROR,
01220                 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
01221       return;
01222     }
01223 
01224     --this->start_counter_;
01225 
01226     if (this->start_counter_ != 0) {
01227       // This wasn't the last send_stop() that we are expecting.  We only
01228       // really honor the first send_start() and the last send_stop().
01229       // We can return without doing anything else in this case.
01230       return;
01231     }
01232 
01233     if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) {
01234       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01235             "TransportSendStrategy::send_stop: dont try to send current packet "
01236             "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
01237       return;
01238     }
01239 
01240     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01241           "This is an 'important' send_stop() event since our "
01242           "start_counter_ is 0.\n"));
01243 
01244     // We just caused the start_counter_ to become zero.  This
01245     // means that we aren't expecting another send() or send_stop() at any
01246     // time in the near future (ie, it isn't imminent).
01247 
01248     // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
01249     // anything to do here because samples have already been going to the
01250     // queue.
01251 
01252     // We only need to do something if the mode_ is
01253     // MODE_DIRECT.  It means that we may have some sample(s) in the
01254     // current packet that have never been sent.  This is our
01255     // opportunity to send the current packet directly if this is the case.
01256     if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) {
01257       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01258             "But since we are in %C, we don't have to do "
01259             "anything more in this important send_stop().\n",
01260             mode_as_str(this->mode_)));
01261       // We don't do anything if we are in MODE_QUEUE.  Just leave.
01262       return;
01263     }
01264 
01265     size_t header_length = this->header_.length_;
01266     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01267           "We are in MODE_DIRECT in an important send_stop() - "
01268           "header_.length_ == [%d].\n", header_length));
01269 
01270     // Only attempt to send the current packet (directly) if the current
01271     // packet actually contains something (it could be empty).
01272     if ((header_length > 0) &&
01273         //(this->elems_->size ()+this->not_yet_pac_q_->size() > 0))
01274         (this->elems_->size() > 0)) {
01275       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01276             "There is something in the current packet - attempt to send "
01277             "it (directly) now.\n"));
01278       // If a relink needs to be done for this packet to be sent, do it.
01279       this->direct_send(true);
01280       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01281             "Back from the attempt to send leftover packet directly.\n"));
01282 
01283       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01284             "But we %C as a result.\n",
01285             ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
01286                                           "stayed in MODE_DIRECT" )));
01287       if (this->mode_ == MODE_QUEUE  && this->mode_ != MODE_SUSPEND) {
01288         VDBG((LM_DEBUG, "(%P|%t) DBG:   "
01289               "Notify Synch thread of work availability\n"));
01290         this->synch_->work_available();
01291       }
01292     }
01293   }
01294 
01295   send_delayed_notifications();
01296 }

void OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting ( bool  flag  )  [protected]

Set graceful disconnecting flag.

Definition at line 1713 of file TransportSendStrategy.cpp.

References graceful_disconnecting_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().

01714 {
01715   this->graceful_disconnecting_ = flag;
01716 }

size_t OpenDDS::DCPS::TransportSendStrategy::space_available (  )  const [private]

How much space is available in the current packet before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]

Definition at line 1887 of file TransportSendStrategy.cpp.

References header_, OpenDDS::DCPS::TransportHeader::length_, max_header_size_, max_message_size(), and max_size_.

Referenced by get_packet_elems_from_queue(), and send().

01888 {
01889   const size_t used = this->max_header_size_ + this->header_.length_,
01890     max_msg = this->max_message_size();
01891   if (max_msg) {
01892     return std::min(this->max_size_ - used, max_msg - used);
01893   }
01894   return this->max_size_ - used;
01895 }

int OpenDDS::DCPS::TransportSendStrategy::start (  ) 

Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.

Definition at line 828 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, and send_buffer_.

00829 {
00830   DBG_ENTRY_LVL("TransportSendStrategy","start",6);
00831 
00832   {
00833     GuardType guard(this->lock_);
00834 
00835     if (!this->start_i()) {
00836       return -1;
00837     }
00838   }
00839 
00840   size_t header_chunks(1);
00841 
00842   // If a secondary send buffer is bound, sent headers should
00843   // be cached to properly maintain the buffer:
00844   if (this->send_buffer_ != 0) {
00845     header_chunks += this->send_buffer_->capacity();
00846 
00847   } else {
00848     header_chunks += 1;
00849   }
00850 
00851   ACE_NEW_RETURN(this->header_db_allocator_,
00852                  TransportDataBlockAllocator(header_chunks),
00853                  -1);
00854 
00855   ACE_NEW_RETURN(this->header_mb_allocator_,
00856                  TransportMessageBlockAllocator(header_chunks),
00857                  -1);
00858 
00859   // Since we (the TransportSendStrategy object) are a reference-counted
00860   // object, but the synch_ object doesn't necessarily know this, we need
00861   // to give a "copy" of a reference to ourselves to the synch_ object here.
00862   // We will do the reverse when we unregister ourselves (as a worker) from
00863   // the synch_ object.
00864   //MJM: The synch thingie knows to not "delete" us, right?
00865   this->_add_ref();
00866 
00867   if (this->synch_->register_worker(this) == -1) {
00868     // Take back our "copy".
00869     this->_remove_ref();
00870     ACE_ERROR_RETURN((LM_ERROR,
00871                       "(%P|%t) ERROR: TransportSendStrategy failed to register "
00872                       "as a worker with the ThreadSynch object.\n"),
00873                      -1);
00874   }
00875 
00876   return 0;
00877 }

virtual bool OpenDDS::DCPS::TransportSendStrategy::start_i (  )  [inline, virtual]

Let the subclass start.

Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.

Definition at line 123 of file TransportSendStrategy.h.

00123 { return true; }

void OpenDDS::DCPS::TransportSendStrategy::stop (  ) 

Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.

Definition at line 880 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), DBG_ENTRY_LVL, header_block_, header_db_allocator_, header_mb_allocator_, pkt_chain_, stop_i(), synch_, and OpenDDS::DCPS::ThreadSynch::unregister_worker().

00881 {
00882   DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
00883 
00884   if (this->header_block_ != 0) {
00885     this->header_block_->release ();
00886     this->header_block_ = 0;
00887   }
00888 
00889   this->synch_->unregister_worker();
00890 
00891   // Since we gave the synch_ a "copy" of a reference to ourselves, we need
00892   // to take it back now.
00893   this->_remove_ref();
00894 
00895   {
00896     GuardType guard(this->lock_);
00897 
00898     if (this->pkt_chain_ != 0) {
00899       size_t size = this->pkt_chain_->total_length();
00900 
00901       if (size > 0) {
00902         this->pkt_chain_->release();
00903         ACE_DEBUG((LM_WARNING,
00904                    ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
00905                    ACE_TEXT("terminating with %d unsent bytes.\n"),
00906                    size));
00907       }
00908     }
00909   }
00910 
00911   delete this->header_mb_allocator_;
00912   delete this->header_db_allocator_;
00913 
00914   {
00915     GuardType guard(this->lock_);
00916 
00917     this->stop_i();
00918   }
00919 
00920   // TBD SOON - What about all of the samples that may still be stuck in
00921   //            our queue_ and/or elems_?
00922 }

virtual void OpenDDS::DCPS::TransportSendStrategy::stop_i (  )  [pure virtual]

Let the subclass stop.

Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Referenced by stop().

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send (  ) 

This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.

Definition at line 58 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.

00059 {
00060   DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6);
00061   GuardType guard(this->lock_);
00062 
00063   if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) {
00064     this->mode_before_suspend_ = this->mode_;
00065     this->mode_ = MODE_SUSPEND;
00066   }
00067 }

ACE_INLINE OpenDDS::DCPS::ThreadSynch * OpenDDS::DCPS::TransportSendStrategy::synch (  )  const [protected]

Definition at line 22 of file TransportSendStrategy.inl.

References DBG_ENTRY_LVL, and synch_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().

00023 {
00024   DBG_ENTRY_LVL("TransportSendStrategy","synch",6);
00025 
00026   return synch_;
00027 }

void OpenDDS::DCPS::TransportSendStrategy::terminate_send ( bool  graceful_disconnecting = false  ) 

Remove all samples in the backpressure queue and packet queue.

This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.

Definition at line 742 of file TransportSendStrategy.cpp.

References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.

00743 {
00744   DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
00745 
00746   bool reset_flag = true;
00747 
00748   {
00749     GuardType guard(this->lock_);
00750 
00751     // If the terminate_send call due to a non-graceful disconnection before
00752     // a datalink shutdown then we will not try to send the graceful disconnect
00753     // message.
00754     if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND)
00755         && !this->graceful_disconnecting_) {
00756       VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00757             "It was already terminated non gracefully, will not set to graceful disconnecting \n"));
00758       reset_flag = false;
00759     }
00760   }
00761 
00762   VDBG((LM_DEBUG, "(%P|%t) DBG:  Now flip to MODE_TERMINATED \n"));
00763 
00764   this->clear(MODE_TERMINATED);
00765 
00766   if (reset_flag) {
00767     GuardType guard(this->lock_);
00768     this->graceful_disconnecting_ = graceful_disconnecting;
00769   }
00770 }

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::transport_shutdown (  ) 

Informed transport shutdown so no more notifications to listener.

Definition at line 136 of file TransportSendStrategy.inl.

References transport_shutdown_.

00137 {
00138   this->transport_shutdown_ = true;
00139 }


Friends And Related Function Documentation

friend class TransportSendBuffer [friend]

Definition at line 393 of file TransportSendStrategy.h.


Member Data Documentation

QueueType* OpenDDS::DCPS::TransportSendStrategy::elems_ [private]

Current elements that have contributed blocks to the current transport packet.

Definition at line 326 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), send(), and ~TransportSendStrategy().

bool OpenDDS::DCPS::TransportSendStrategy::graceful_disconnecting_ [private]

Definition at line 384 of file TransportSendStrategy.h.

Referenced by set_graceful_disconnecting(), and terminate_send().

TransportHeader OpenDDS::DCPS::TransportSendStrategy::header_ [protected]

Current transport packet header.

Definition at line 401 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), get_packet_elems_from_queue(), marshal_transport_header(), perform_work(), prepare_header(), OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i(), resume_send(), send(), send_stop(), and space_available().

ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::header_block_ [private]

Current transport packet header, marshalled.

Definition at line 319 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and stop().

bool OpenDDS::DCPS::TransportSendStrategy::header_complete_ [private]

Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.

Definition at line 334 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().

TransportDataBlockAllocator* OpenDDS::DCPS::TransportSendStrategy::header_db_allocator_ [private]

Allocator for header message block.

Definition at line 363 of file TransportSendStrategy.h.

Referenced by stop().

TransportMessageBlockAllocator* OpenDDS::DCPS::TransportSendStrategy::header_mb_allocator_ [private]

Allocator for header data block.

Definition at line 360 of file TransportSendStrategy.h.

Referenced by stop().

SequenceNumber OpenDDS::DCPS::TransportSendStrategy::header_sequence_ [private]

Current transport header sequence number.

Definition at line 322 of file TransportSendStrategy.h.

Referenced by prepare_header().

bool OpenDDS::DCPS::TransportSendStrategy::link_released_ [private]

Definition at line 386 of file TransportSendStrategy.h.

Referenced by link_released().

LockType OpenDDS::DCPS::TransportSendStrategy::lock_ [private]

This lock will protect critical sections of code that play a role in the sending of data.

Definition at line 370 of file TransportSendStrategy.h.

Referenced by send_delayed_notifications().

size_t OpenDDS::DCPS::TransportSendStrategy::max_header_size_ [private]

Maximum marshalled size of the transport packet header.

Definition at line 316 of file TransportSendStrategy.h.

Referenced by space_available(), and TransportSendStrategy().

size_t OpenDDS::DCPS::TransportSendStrategy::max_samples_ [private]

Configuration - max number of samples per transport packet.

Definition at line 300 of file TransportSendStrategy.h.

Referenced by get_packet_elems_from_queue(), and send().

ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::max_size_ [private]

Configuration - max transport packet size (bytes).

Definition at line 306 of file TransportSendStrategy.h.

Referenced by space_available().

SendMode OpenDDS::DCPS::TransportSendStrategy::mode_ [private]

This mode determines how send() calls will be handled.

Definition at line 348 of file TransportSendStrategy.h.

Referenced by clear(), direct_send(), isDirectMode(), mode(), perform_work(), resume_send(), send(), and suspend_send().

SendMode OpenDDS::DCPS::TransportSendStrategy::mode_before_suspend_ [private]

This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.

Definition at line 353 of file TransportSendStrategy.h.

Referenced by clear(), direct_send(), resume_send(), and suspend_send().

ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::optimum_size_ [private]

Configuration - optimum transport packet size (bytes).

Definition at line 303 of file TransportSendStrategy.h.

Referenced by get_packet_elems_from_queue().

ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::pkt_chain_ [private]

Current (head of chain) block containing unsent bytes for the current transport packet.

Definition at line 330 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), prepare_packet(), resume_send(), and stop().

QueueType* OpenDDS::DCPS::TransportSendStrategy::queue_ [private]

Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.

Definition at line 313 of file TransportSendStrategy.h.

Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), send(), and ~TransportSendStrategy().

TransportReplacedElementAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_allocator_ [private]

Cached allocator for TransportReplaceElement.

Definition at line 373 of file TransportSendStrategy.h.

Referenced by TransportSendStrategy().

DataBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_db_allocator_ [private]

Definition at line 375 of file TransportSendStrategy.h.

MessageBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_mb_allocator_ [private]

Definition at line 374 of file TransportSendStrategy.h.

TransportRetainedElementAllocator* OpenDDS::DCPS::TransportSendStrategy::retained_element_allocator_ [private]

Cached allocator for TransportRetainedElements used by reliable datagram transports to retain PDUs after they have been sent. This is created in start if the transport needs it.

Definition at line 380 of file TransportSendStrategy.h.

TransportSendBuffer* OpenDDS::DCPS::TransportSendStrategy::send_buffer_ [private]

Definition at line 388 of file TransportSendStrategy.h.

Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().

unsigned OpenDDS::DCPS::TransportSendStrategy::start_counter_ [private]

Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accomodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().

Definition at line 345 of file TransportSendStrategy.h.

Referenced by clear(), resume_send(), send_start(), and send_stop().

ThreadSynch* OpenDDS::DCPS::TransportSendStrategy::synch_ [private]

The thread synch object.

Definition at line 366 of file TransportSendStrategy.h.

Referenced by perform_work(), send(), send_stop(), stop(), synch(), TransportSendStrategy(), and ~TransportSendStrategy().

TransportInst_rch OpenDDS::DCPS::TransportSendStrategy::transport_inst_ [private]

Definition at line 382 of file TransportSendStrategy.h.

Referenced by direct_send().

bool OpenDDS::DCPS::TransportSendStrategy::transport_shutdown_ [private]

Definition at line 395 of file TransportSendStrategy.h.

Referenced by send_delayed_notifications(), and transport_shutdown().

const size_t OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE = 65466 [static, protected]

Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.

Definition at line 182 of file TransportSendStrategy.h.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:33 2016 for OpenDDS by  doxygen 1.4.7