OpenDDS  Snapshot(2023/04/07-19:43)
Public Types | Public Member Functions | Static Public Member Functions | Static Public Attributes | Protected Member Functions | Private Types | Private Member Functions | Static Private Member Functions | Private Attributes | Friends | List of all members
OpenDDS::DCPS::TransportSendStrategy Class Referenceabstract

#include <TransportSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::TransportSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TransportSendStrategy:
Collaboration graph
[legend]

Public Types

enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}
 
typedef BasicQueue< TransportQueueElementQueueType
 
- Public Types inherited from OpenDDS::DCPS::ThreadSynchWorker
enum  WorkOutcome { WORK_OUTCOME_MORE_TO_DO, WORK_OUTCOME_NO_MORE_TO_DO, WORK_OUTCOME_CLOGGED_RESOURCE, WORK_OUTCOME_BROKEN_RESOURCE }
 

Public Member Functions

virtual ~TransportSendStrategy ()
 
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer. More...
 
int start ()
 
void stop ()
 
void send_start ()
 
void send (TransportQueueElement *element, bool relink=true)
 
void send_stop (GUID_t repoId)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
virtual WorkOutcome perform_work ()
 
virtual void relink (bool do_suspend=true)
 
void suspend_send ()
 
void resume_send ()
 
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue. More...
 
virtual void terminate_send_if_suspended ()
 
virtual void stop_i ()=0
 Let the subclass stop. More...
 
virtual bool start_i ()
 Let the subclass start. More...
 
void link_released (bool flag)
 
bool isDirectMode ()
 
virtual ACE_HANDLE get_handle ()
 
void deliver_ack_request (TransportQueueElement *element)
 
bool fragmentation_helper (TransportQueueElement *original_element, TqeVector &elements_to_send)
 
void clear (SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
 
SendMode mode () const
 Access the current sending mode. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
virtual ~ThreadSynchWorker ()
 
virtual void schedule_output ()
 Indicate that queued data is available to be sent. More...
 
std::size_t id () const
 DataLink reference value for diagnostics. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Static Public Member Functions

static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)
 

Static Public Attributes

static const size_t UDP_MAX_MESSAGE_SIZE = 65466
 

Protected Member Functions

 TransportSendStrategy (std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
 
virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
 
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
 
virtual ssize_t send_bytes_i (const iovec iov[], int n)=0
 
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header. More...
 
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet. More...
 
TransportQueueElementcurrent_packet_first_element () const
 
virtual size_t max_message_size () const
 
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag. More...
 
virtual void add_delayed_notification (TransportQueueElement *element)
 
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
 
virtual Security::SecurityConfig_rch security_config () const
 
virtual RemoveResult do_remove_sample (const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
 Implement framework chain visitations to remove a sample. More...
 
ThreadSynchsynch () const
 
void set_header_source (ACE_INT64 source)
 
- Protected Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
 ThreadSynchWorker (std::size_t id=0)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Types

enum  SendPacketOutcome {
  OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST,
  OUTCOME_SEND_ERROR
}
 
typedef ACE_SYNCH_MUTEX LockType
 
typedef ACE_Guard< LockTypeGuardType
 
typedef std::pair< TransportQueueElement *, SendModeTQESendModePair
 Used for delayed notifications when performing work. More...
 
typedef Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_MutexDataAllocator
 Allocator for data buffers. More...
 

Private Member Functions

void direct_send (bool relink)
 
void get_packet_elems_from_queue ()
 
void prepare_header ()
 
void prepare_packet ()
 
SendPacketOutcome send_packet ()
 
ssize_t do_send_packet (const ACE_Message_Block *packet, int &bp)
 Form an IOV and call the send_bytes() template method. More...
 
virtual ACE_Message_Blockpre_send_packet (const ACE_Message_Block *m)
 
int adjust_packet_after_send (ssize_t num_bytes_sent)
 
size_t space_available (size_t already_used=0) const
 
size_t current_space_available () const
 
virtual bool marshal_transport_header (ACE_Message_Block *mb)
 
 OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_
 

Static Private Member Functions

static const char * mode_as_str (SendMode mode)
 Helper function to debugging. More...
 

Private Attributes

size_t max_samples_
 Configuration - max number of samples per transport packet. More...
 
ACE_UINT32 optimum_size_
 Configuration - optimum transport packet size (bytes) More...
 
ACE_UINT32 max_size_
 Configuration - max transport packet size (bytes) More...
 
QueueType queue_
 
size_t max_header_size_
 Maximum marshalled size of the transport packet header. More...
 
ACE_Message_Blockheader_block_
 Current transport packet header, marshalled. More...
 
SequenceNumber header_sequence_
 Current transport header sequence number. More...
 
QueueType elems_
 
ACE_Message_Blockpkt_chain_
 
bool header_complete_
 
unsigned start_counter_
 
Atomic< SendModemode_
 This mode determines how send() calls will be handled. More...
 
SendMode mode_before_suspend_
 
unique_ptr< TransportMessageBlockAllocatorheader_mb_allocator_
 Allocator for header data block. More...
 
unique_ptr< TransportDataBlockAllocatorheader_db_allocator_
 Allocator for header message block. More...
 
unique_ptr< DataBlockLockPoolheader_db_lock_pool_
 DataBlockLockPool. More...
 
unique_ptr< DataAllocatorheader_data_allocator_
 
unique_ptr< ThreadSynchsynch_
 The thread synch object. More...
 
LockType lock_
 
MessageBlockAllocator replaced_element_mb_allocator_
 Cached allocator for TransportReplaceElement. More...
 
DataBlockAllocator replaced_element_db_allocator_
 
WeakRcHandle< TransportImpltransport_
 
bool graceful_disconnecting_
 
bool link_released_
 
TransportSendBuffersend_buffer_
 
TransportHeader header_
 Current transport packet header. More...
 

Friends

class TransportSendBuffer
 

Detailed Description

This class provides methods to fill packets with samples for sending and handles backpressure. It maintains the list of samples in current packets and also the list of samples queued during backpressure. A thread per connection is created to handle the queued samples.

Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.

Definition at line 61 of file TransportSendStrategy.h.

Member Typedef Documentation

◆ DataAllocator

Allocator for data buffers.

Definition at line 411 of file TransportSendStrategy.h.

◆ GuardType

Definition at line 302 of file TransportSendStrategy.h.

◆ LockType

Definition at line 301 of file TransportSendStrategy.h.

◆ QueueType

Definition at line 142 of file TransportSendStrategy.h.

◆ TQESendModePair

Used for delayed notifications when performing work.

Definition at line 398 of file TransportSendStrategy.h.

Member Enumeration Documentation

◆ SendMode

Enumerator
MODE_NOT_SET 
MODE_DIRECT 
MODE_QUEUE 
MODE_SUSPEND 
MODE_TERMINATED 

Definition at line 305 of file TransportSendStrategy.h.

305  {
306  // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
307  // we can check if the resume_send is paired with suspend_send.
308  MODE_NOT_SET,
309  // Send out the sample with current packet.
310  MODE_DIRECT,
311  // The samples need be queued because of the backpressure or partial send.
312  MODE_QUEUE,
313  // The samples need be queued because the connection is lost and we are
314  // trying to reconnect.
315  MODE_SUSPEND,
316  // The samples need be dropped since we lost connection and could not
317  // reconnect.
319  };

◆ SendPacketOutcome

Constructor & Destructor Documentation

◆ ~TransportSendStrategy()

OpenDDS::DCPS::TransportSendStrategy::~TransportSendStrategy ( )
virtual

Definition at line 113 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

114 {
115  DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6);
116 
117 
118  delayed_delivered_notification_queue_.clear();
119 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ TransportSendStrategy()

OpenDDS::DCPS::TransportSendStrategy::TransportSendStrategy ( std::size_t  id,
const TransportImpl_rch transport,
ThreadSynchResource synch_resource,
Priority  priority,
const ThreadSynchStrategy_rch thread_sync_strategy 
)
protected

Definition at line 61 of file TransportSendStrategy.cpp.

References ACE_DEFAULT_THREAD_PRIORITY, OpenDDS::DCPS::TransportImpl::config(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportHeader::get_max_serialized_size(), max_header_size_, OpenDDS::DCPS::TransportInst::max_packet_size_, max_samples_, OpenDDS::DCPS::TransportInst::max_samples_per_packet_, max_size_, OpenDDS::DCPS::TransportInst::optimum_packet_size_, optimum_size_, synch_, TheServiceParticipant, and OpenDDS::DCPS::DirectPriorityMapper::thread_priority().

67  : ThreadSynchWorker(id),
72  header_block_(0),
73  pkt_chain_(0),
74  header_complete_(false),
75  start_counter_(0),
78  lock_(),
81  transport_(transport),
83  link_released_(true),
84  send_buffer_(0)
85 {
86  DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6);
87 
88  TransportInst_rch cfg = transport->config();
89  if (cfg) {
90  max_samples_ = cfg->max_samples_per_packet_;
91  optimum_size_ = cfg->optimum_packet_size_;
92  max_size_ = cfg->max_packet_size_;
93  }
94 
95  // Create a ThreadSynch object just for us.
96  DirectPriorityMapper mapper(priority);
97  synch_.reset(thread_sync_strategy->create_synch_object(
98  synch_resource,
99 #ifdef ACE_WIN32
100  ACE_DEFAULT_THREAD_PRIORITY,
101 #else
102  mapper.thread_priority(),
103 #endif
104  TheServiceParticipant->scheduler()));
105 
106  // We cache this value in data member since it doesn't change, and we
107  // don't want to keep asking for it over and over.
109 
110  delayed_delivered_notification_queue_.reserve(max_samples_);
111 }
size_t max_header_size_
Maximum marshalled size of the transport packet header.
unique_ptr< ThreadSynch > synch_
The thread synch object.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
MessageBlockAllocator replaced_element_mb_allocator_
Cached allocator for TransportReplaceElement.
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
RcHandle< TransportInst > TransportInst_rch
The type definition for the smart-pointer to the underlying type.
#define NUM_REPLACED_ELEMENT_CHUNKS
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)
size_t max_samples_
Configuration - max number of samples per transport packet.
WeakRcHandle< TransportImpl > transport_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define TheServiceParticipant
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)

Member Function Documentation

◆ add_delayed_notification()

void OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification ( TransportQueueElement element)
protectedvirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::TcpSendStrategy.

Definition at line 1926 of file TransportSendStrategy.cpp.

References ACE_DEBUG, LM_DEBUG, OpenDDS::DCPS::Atomic< T >::load(), max_samples_, mode_, and OpenDDS::DCPS::Transport_debug_level.

Referenced by OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(), OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification(), adjust_packet_after_send(), and send().

1927 {
1928  if (Transport_debug_level) {
1929  size_t size = delayed_delivered_notification_queue_.size();
1930  if ((size > 0) && (size % max_samples_ == 0)) {
1931  ACE_DEBUG((LM_DEBUG,
1932  "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
1933  size));
1934  }
1935  }
1936 
1937  delayed_delivered_notification_queue_.push_back(std::make_pair(element, mode_.load()));
1938 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
T load() const
Definition: Atomic.h:33
size_t max_samples_
Configuration - max number of samples per transport packet.

◆ adjust_packet_after_send()

int OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send ( ssize_t  num_bytes_sent)
private

This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.

Definition at line 363 of file TransportSendStrategy.cpp.

References ACE_DEBUG, add_delayed_notification(), ACE_Message_Block::base(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_INFO, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), and VDBG.

Referenced by clear(), and send_packet().

364 {
365  DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6);
366 
367  VDBG((LM_DEBUG, "(%P|%t) DBG: "
368  "Adjusting the current packet because %d bytes of the packet "
369  "have been sent.\n", num_bytes_sent));
370 
371  ssize_t num_bytes_left = num_bytes_sent;
372  ssize_t num_non_header_bytes_sent = 0;
373 
374  VDBG((LM_DEBUG, "(%P|%t) DBG: "
375  "Set num_bytes_left to %d.\n", num_bytes_left));
376  VDBG((LM_DEBUG, "(%P|%t) DBG: "
377  "Set num_non_header_bytes_sent to %d.\n",
378  num_non_header_bytes_sent));
379 
380  VDBG((LM_DEBUG, "(%P|%t) DBG: "
381  "Peek at the element at the front of the packet elems_.\n"));
382 
383  // This is the element currently at the front of elems_.
384  TransportQueueElement* element = elems_.peek();
385 
386  if(!element){
387  ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
388  } else {
389  VDBG((LM_DEBUG, "(%P|%t) DBG: "
390  "Use the element's msg() to find the last block in "
391  "the msg() chain.\n"));
392 
393  // Get a pointer to the last message block in the element.
394  const ACE_Message_Block* elem_tail_block = element->msg();
395 
396  VDBG((LM_DEBUG, "(%P|%t) DBG: "
397  "Start with tail block == element->msg().\n"));
398 
399  while (elem_tail_block->cont() != 0) {
400  VDBG((LM_DEBUG, "(%P|%t) DBG: "
401  "Set tail block to its cont() block (next in chain).\n"));
402  elem_tail_block = elem_tail_block->cont();
403  }
404 
405  VDBG((LM_DEBUG, "(%P|%t) DBG: "
406  "Tail block now set (because tail block's cont() is 0).\n"));
407 
408  VDBG((LM_DEBUG, "(%P|%t) DBG: "
409  "Start the 'while (num_bytes_left > 0)' loop.\n"));
410 
411  while (num_bytes_left > 0) {
412  VDBG((LM_DEBUG, "(%P|%t) DBG: "
413  "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
414  num_bytes_left));
415 
416  const int block_length = static_cast<int>(pkt_chain_->length());
417 
418  VDBG((LM_DEBUG, "(%P|%t) DBG: "
419  "Length of block at front of pkt_chain_ is [%d].\n",
420  block_length));
421 
422  if (block_length <= num_bytes_left) {
423  VDBG((LM_DEBUG, "(%P|%t) DBG: "
424  "The whole block at the front of pkt_chain_ was sent.\n"));
425 
426  // The entire message block at the front of the chain has been sent.
427  // Detach the head message block from the chain and adjust
428  // the pkt_chain_ to point to the next block (if any) in
429  // the chain.
430  VDBG((LM_DEBUG, "(%P|%t) DBG: "
431  "Extract the fully sent block from the pkt_chain_.\n"));
432 
433  ACE_Message_Block* fully_sent_block = pkt_chain_;
434 
435  VDBG((LM_DEBUG, "(%P|%t) DBG: "
436  "Set pkt_chain_ to pkt_chain_->cont().\n"));
437 
439 
440  VDBG((LM_DEBUG, "(%P|%t) DBG: "
441  "Set the fully sent block's cont() to 0.\n"));
442 
443  fully_sent_block->cont(0);
444 
445  // Update the num_bytes_left to indicate that we have
446  // processed the entire length of the block.
447  num_bytes_left -= block_length;
448 
449  VDBG((LM_DEBUG, "(%P|%t) DBG: "
450  "Updated num_bytes_left to account for fully sent "
451  "block (block_length == [%d]).\n", block_length));
452  VDBG((LM_DEBUG, "(%P|%t) DBG: "
453  "Now, num_bytes_left == [%d].\n", num_bytes_left));
454 
455  if (!header_complete_) {
456  VDBG((LM_DEBUG, "(%P|%t) DBG: "
457  "Since the header_complete_ flag is false, it means "
458  "that the packet header block was still in the "
459  "pkt_chain_.\n"));
460 
461  VDBG((LM_DEBUG, "(%P|%t) DBG: "
462  "Not anymore... Set the header_complete_ flag "
463  "to true.\n"));
464 
465  // That was the packet header block. And now we know that it
466  // has been completely sent.
467  header_complete_ = true;
468 
469  VDBG((LM_DEBUG, "(%P|%t) DBG: "
470  "Release the fully sent block.\n"));
471 
472  // Release the fully_sent_block
473  fully_sent_block->release();
474 
475  } else {
476  VDBG((LM_DEBUG, "(%P|%t) DBG: "
477  "Since the header_complete_ flag is true, it means "
478  "that the packet header block was not in the "
479  "pkt_chain_.\n"));
480  VDBG((LM_DEBUG, "(%P|%t) DBG: "
481  "So, the fully sent block was part of an element.\n"));
482 
483  // That wasn't the packet header block. It was from the
484  // element currently at the front of the elems_
485  // collection. If it was the last block from the
486  // element, then we need to extract the element from the
487  // elems_ collection and invoke data_delivered() on it.
488  num_non_header_bytes_sent += block_length;
489 
490  VDBG((LM_DEBUG, "(%P|%t) DBG: "
491  "Updated num_non_header_bytes_sent to account for "
492  "fully sent block (block_length == [%d]).\n",
493  block_length));
494 
495  VDBG((LM_DEBUG, "(%P|%t) DBG: "
496  "Now, num_non_header_bytes_sent == [%d].\n",
497  num_non_header_bytes_sent));
498 
499  if (fully_sent_block->base() == elem_tail_block->base()) {
500  VDBG((LM_DEBUG, "(%P|%t) DBG: "
501  "Ok. The fully sent block was a duplicate of "
502  "the tail block of the element that is at the "
503  "front of the packet elems_.\n"));
504 
505  VDBG((LM_DEBUG, "(%P|%t) DBG: "
506  "This means that we have completely sent the "
507  "element at the front of the packet elems_.\n"));
508 
509  // This means that we have completely sent the element
510  // that is currently at the front of the elems_ collection.
511 
512  VDBG((LM_DEBUG, "(%P|%t) DBG: "
513  "We can release the fully sent block now.\n"));
514 
515  // Release the fully_sent_block
516  fully_sent_block->release();
517 
518  VDBG((LM_DEBUG, "(%P|%t) DBG: "
519  "We can extract the element from the front of "
520  "the packet elems_ (we were just peeking).\n"));
521 
522  // Extract the element from the elems_ collection
523  element = elems_.get();
524 
525  VDBG((LM_DEBUG, "(%P|%t) DBG: "
526  "Tell the element that a decision has been made "
527  "regarding its fate - data_delivered().\n"));
528 
529  // Inform the element that the data has been delivered.
530  add_delayed_notification(element);
531 
532  VDBG((LM_DEBUG, "(%P|%t) DBG: "
533  "Peek at the next element in the packet "
534  "elems_.\n"));
535 
536  // Set up for the next element in elems_ by peek()'ing.
537  element = elems_.peek();
538 
539  if (element != 0) {
540  VDBG((LM_DEBUG, "(%P|%t) DBG: "
541  "The is an element still in the packet "
542  "elems_ (we are peeking at it now).\n"));
543 
544  VDBG((LM_DEBUG, "(%P|%t) DBG: "
545  "We are going to find the tail block for the "
546  "current element (we are peeking at).\n"));
547 
548  // There was a "next element". Determine the
549  // elem_tail_block for it.
550  elem_tail_block = element->msg();
551 
552  VDBG((LM_DEBUG, "(%P|%t) DBG: "
553  "Start w/tail block == element->msg().\n"));
554 
555  while (elem_tail_block->cont() != 0) {
556  VDBG((LM_DEBUG, "(%P|%t) DBG: "
557  "Set tail block to next in chain.\n"));
558  elem_tail_block = elem_tail_block->cont();
559  }
560 
561  VDBG((LM_DEBUG, "(%P|%t) DBG: "
562  "Done finding tail block.\n"));
563  }
564 
565  } else {
566  VDBG((LM_DEBUG, "(%P|%t) DBG: "
567  "Ok. The fully sent block is *not* a "
568  "duplicate of the tail block of the element "
569  "at the front of the packet elems_.\n"));
570 
571  VDBG((LM_DEBUG, "(%P|%t) DBG: "
572  "Thus, we have not completely sent the "
573  "element yet.\n"));
574 
575  // We didn't completely send the element - it has more
576  // message blocks that haven't been sent (that we know of).
577 
578  VDBG((LM_DEBUG, "(%P|%t) DBG: "
579  "We can release the fully_sent_block now.\n"));
580 
581  // Release the fully_sent_block
582  fully_sent_block->release();
583  }
584  }
585 
586  } else {
587  VDBG((LM_DEBUG, "(%P|%t) DBG: "
588  "Only part of the block at the front of pkt_chain_ "
589  "was sent.\n"));
590 
591  VDBG((LM_DEBUG, "(%P|%t) DBG: "
592  "Advance the rd_ptr() of the front block (of pkt_chain_) "
593  "by the num_bytes_left (%d).\n", num_bytes_left));
594 
595  // Only part of the current block was sent.
596  pkt_chain_->rd_ptr(num_bytes_left);
597 
598  if (header_complete_) {
599  VDBG((LM_DEBUG, "(%P|%t) DBG: "
600  "And since the packet header block has already been "
601  "completely sent, add num_bytes_left to the "
602  "num_non_header_bytes_sent.\n"));
603 
604  VDBG((LM_DEBUG, "(%P|%t) DBG: "
605  "Before, num_non_header_bytes_sent == %d.\n",
606  num_non_header_bytes_sent));
607 
608  // We know that the current block isn't the packet header
609  // block because the packet header block has already been
610  // completely sent. We need to count these bytes in the
611  // num_non_header_bytes_sent.
612  num_non_header_bytes_sent += num_bytes_left;
613 
614  VDBG((LM_DEBUG, "(%P|%t) DBG: "
615  "After, num_non_header_bytes_sent == %d.\n",
616  num_non_header_bytes_sent));
617  }
618 
619  VDBG((LM_DEBUG, "(%P|%t) DBG: "
620  "Set the num_bytes_left to 0 now.\n"));
621 
622  num_bytes_left = 0;
623  }
624  }
625  }
626 
627  VDBG((LM_DEBUG, "(%P|%t) DBG: "
628  "The 'num_bytes_left' loop has completed.\n"));
629 
630  VDBG((LM_DEBUG, "(%P|%t) DBG: "
631  "Adjust the header_.length_ to account for the "
632  "num_non_header_bytes_sent.\n"));
633  VDBG((LM_DEBUG, "(%P|%t) DBG: "
634  "Before, header_.length_ == %d.\n",
635  header_.length_));
636 
637  // Adjust the packet header_.length_ to indicate how many non header
638  // bytes are left to send.
639  header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent);
640 
641  VDBG((LM_DEBUG, "(%P|%t) DBG: "
642  "After, header_.length_ == %d.\n",
643  header_.length_));
644 
645  // Returns 0 if the entire packet was sent, and returns 1 otherwise.
646  int rc = (header_.length_ == 0) ? 0 : 1;
647 
648  VDBG((LM_DEBUG, "(%P|%t) DBG: "
649  "Adjustments all done. Returning [%d]. 0 means entire packet "
650  "has been sent. 1 means otherwise.\n",
651  rc));
652 
653  return rc;
654 }
#define ACE_DEBUG(X)
TransportHeader header_
Current transport packet header.
size_t length(void) const
int ssize_t
char * rd_ptr(void) const
virtual void add_delayed_notification(TransportQueueElement *element)
#define VDBG(DBG_ARGS)
virtual ACE_Message_Block * release(void)
ACE_Message_Block * cont(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
char * base(void) const

◆ clear()

void OpenDDS::DCPS::TransportSendStrategy::clear ( SendMode  new_mode,
SendMode  old_mode = MODE_NOT_SET 
)

Clear queued messages and messages in current packet and set the current mode to new_mod if the current mode equals old_mode or old_mode is MODE_NOT_SET.

Definition at line 779 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), ACE_Message_Block::total_length(), and VDBG.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), terminate_send(), and OpenDDS::DCPS::TcpSendStrategy::terminate_send_if_suspended().

780 {
781  DBG_ENTRY_LVL("TransportSendStrategy","clear",6);
782 
784  QueueType elems;
785  QueueType queue;
786  {
787  GuardType guard(lock_);
788 
789  if (old_mode != MODE_NOT_SET && mode_ != old_mode)
790  return;
791 
792  if (header_.length_ > 0 && pkt_chain_) {
793  // Clear the messages in the pkt_chain_ that is partially sent.
794  // We just reuse these functions for normal partial send except actual sending.
795  int num_bytes_left = static_cast<int>(pkt_chain_->total_length());
796  int result = adjust_packet_after_send(num_bytes_left);
797 
798  if (result == 0) {
799  VDBG((LM_DEBUG, "(%P|%t) DBG: "
800  "The adjustment logic says that the packet is cleared.\n"));
801 
802  } else {
803  VDBG((LM_DEBUG, "(%P|%t) DBG: "
804  "The adjustment returned partial sent.\n"));
805  }
806  }
807 
808  elems.swap(elems_);
809  queue.swap(queue_);
810 
811  header_.length_ = 0;
812  pkt_chain_ = 0;
813  header_complete_ = false;
814  start_counter_ = 0;
815  mode_ = new_mode;
817  }
818 
819  // We need remove the queued elements outside the lock,
820  // otherwise we have a deadlock situation when remove visitor
821  // calls the data_dropped on each dropped elements.
822 
823  // Clear all samples in queue.
824  RemoveAllVisitor remove_all_visitor;
825 
826  elems.accept_remove_visitor(remove_all_visitor);
827  queue.accept_remove_visitor(remove_all_visitor);
828 }
int adjust_packet_after_send(ssize_t num_bytes_sent)
BasicQueue< TransportQueueElement > QueueType
TransportHeader header_
Current transport packet header.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define VDBG(DBG_ARGS)
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
size_t total_length(void) const
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ current_packet_first_element()

ACE_INLINE TransportQueueElement * OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element ( ) const
protected

◆ current_space_available()

size_t OpenDDS::DCPS::TransportSendStrategy::current_space_available ( ) const
private

Like above, but use the current packet.

Definition at line 1961 of file TransportSendStrategy.cpp.

References header_, OpenDDS::DCPS::TransportHeader::length_, and space_available().

Referenced by get_packet_elems_from_queue(), and send().

1962 {
1964 }
TransportHeader header_
Current transport packet header.
size_t space_available(size_t already_used=0) const

◆ deliver_ack_request()

void OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request ( TransportQueueElement element)

Definition at line 1940 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), do_remove_sample(), OpenDDS::DCPS::GUID_UNKNOWN, and lock_.

1941 {
1942  const TransportQueueElement::MatchOnElement moe(element);
1943  {
1944  GuardType guard(lock_);
1946  }
1947 
1948  element->data_delivered();
1949 }
const GUID_t GUID_UNKNOWN
Nil value for GUID.
Definition: GuidUtils.h:59
virtual RemoveResult do_remove_sample(const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
Implement framework chain visitations to remove a sample.

◆ direct_send()

void OpenDDS::DCPS::TransportSendStrategy::direct_send ( bool  relink)
private

Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.

Definition at line 1465 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::dump(), LM_DEBUG, LM_WARNING, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), transport_, OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

Referenced by send(), and send_stop().

1466 {
1467  DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6);
1468 
1469  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1470  "Prepare the current packet for a direct send attempt.\n"));
1471 
1472  // Prepare the packet for sending.
1473  prepare_packet();
1474 
1475  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1476  "Now attempt to send the packet.\n"));
1477 
1478  // We will try resend the packet if the send() fails and then connection
1479  // is re-established. Only loops if the "continue" line is hit.
1480  while (true) {
1481  // Attempt to send the packet
1482  const SendPacketOutcome outcome = send_packet();
1483 
1484  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1485  "The outcome of the send_packet() was %d.\n", outcome));
1486 
1487  if ((outcome == OUTCOME_BACKPRESSURE) ||
1488  (outcome == OUTCOME_PARTIAL_SEND)) {
1489  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1490  "The outcome of the send_packet() was either "
1491  "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
1492 
1493  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1494  "Flip into the MODE_QUEUE mode_.\n"), 5);
1495 
1496  // We encountered backpressure, or only sent part of the packet.
1497  mode_ = MODE_QUEUE;
1498 
1499  } else if ((outcome == OUTCOME_PEER_LOST) ||
1500  (outcome == OUTCOME_SEND_ERROR)) {
1501  if (outcome == OUTCOME_SEND_ERROR) {
1502  VDBG_LVL((LM_WARNING,
1503  "(%P|%t) WARNING: Problem detected in "
1504  "send buffer management: %p.\n",
1505  "send_bytes"), 1);
1506 
1507  if (Transport_debug_level > 0) {
1508  TransportImpl_rch transport = transport_.lock();
1509  if (transport) {
1510  transport->dump();
1511  }
1512  }
1513  } else {
1514  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1515  "The outcome of the send_packet() was "
1516  "OUTCOME_PEER_LOST.\n"));
1517  }
1518 
1519  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1520  "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
1521 
1522  if (mode_ != MODE_SUSPEND) {
1524  mode_ = MODE_SUSPEND;
1525  }
1526 
1527  if (do_relink) {
1528  bool do_suspend = false;
1529  relink(do_suspend);
1530 
1531  if (mode_ == MODE_SUSPEND) {
1532  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1533  "The reconnect has not done yet and we are "
1534  "still in MODE_SUSPEND.\n"), 5);
1535 
1536  } else if (mode_ == MODE_TERMINATED) {
1537  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1538  "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
1539 
1540  } else {
1541  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1542  "Try send the packet again since the connection "
1543  "is re-established.\n"), 5);
1544 
1545  // Try send the packet again since the connection is re-established.
1546  continue;
1547  }
1548  }
1549 
1550  } else {
1551  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1552  "The outcome of the send_packet() must have been "
1553  "OUTCOME_COMPLETE_SEND.\n"));
1554  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1555  "So, we will just stay in MODE_DIRECT.\n"));
1556  }
1557 
1558  break;
1559  }
1560 
1561  // We stay in MODE_DIRECT mode if we didn't encounter any backpressure.
1562 }
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define VDBG(DBG_ARGS)
virtual void relink(bool do_suspend=true)
WeakRcHandle< TransportImpl > transport_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ do_remove_sample()

RemoveResult OpenDDS::DCPS::TransportSendStrategy::do_remove_sample ( const GUID_t pub_id,
const TransportQueueElement::MatchCriteria criteria,
bool  remove_all = false 
)
protectedvirtual

Implement framework chain visitations to remove a sample.

Definition at line 1362 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, header_block_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, mode_, MODE_DIRECT, pkt_chain_, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), replaced_element_db_allocator_, replaced_element_mb_allocator_, OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::QueueRemoveVisitor::status(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::TransportQueueElement::MatchCriteria::unique(), and VDBG.

Referenced by deliver_ack_request(), remove_all_msgs(), and remove_sample().

1364 {
1365  DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6);
1366 
1367  //ciju: Tim had the idea that we could do the following check
1368  // if ((mode_ == MODE_DIRECT) ||
1369  // ((pkt_chain_ == 0) && (queue_ == empty)))
1370  // then we can assume that the sample can be safely removed (no need for
1371  // replacement) from the elems_ queue.
1372  if ((mode_ == MODE_DIRECT)
1373  || ((pkt_chain_ == 0) && (queue_.size() == 0))) {
1374  //ciju: I believe this is the only mode where a safe
1375  // assumption can be made that the samples
1376  // in the elems_ queue aren't part of a packet.
1377  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1378  "The mode is MODE_DIRECT, or the queue is empty and no "
1379  "transport packet is in progress.\n"));
1380 
1381  QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1382  elems_.accept_remove_visitor(simple_rem_vis);
1383 
1384  const RemoveResult status = simple_rem_vis.status();
1385 
1386  if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1387  header_.length_ -= simple_rem_vis.removed_bytes();
1388 
1389  } else if (status == REMOVE_NOT_FOUND) {
1390  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1391  "Failed to find the sample to remove.\n"));
1392  }
1393 
1394  if (criteria.unique() || !remove_all) return status;
1395  }
1396 
1397  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1398  "Visit the queue_ with the RemoveElementVisitor.\n"));
1399 
1400  QueueRemoveVisitor simple_rem_vis(criteria, remove_all);
1401  queue_.accept_remove_visitor(simple_rem_vis);
1402 
1403  RemoveResult status = simple_rem_vis.status();
1404 
1405  if (status == REMOVE_RELEASED || status == REMOVE_FOUND) {
1406  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1407  "The sample was removed from the queue_.\n"));
1408  // This means that the visitor did not encounter any fatal error
1409  // along the way, *AND* the sample was found in the queue_,
1410  // and has now been removed. We are done.
1411  if (criteria.unique() || !remove_all) return status;
1412  }
1413 
1414  if (status == REMOVE_ERROR) {
1415  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1416  "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
1417  // This means that the visitor encountered some fatal error along
1418  // the way (and it already reported something to the log).
1419  // Return our failure code.
1420  return status;
1421  }
1422 
1423  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1424  "The RemoveElementVisitor did not find the sample in queue_.\n"));
1425 
1426  // We get here if the visitor did not encounter any fatal error, but it
1427  // also didn't find the sample - and hence it didn't perform any
1428  // "remove sample" logic.
1429 
1430  // Now we need to turn our attention to the current transport packet,
1431  // since the packet is likely in a "partially sent" state, and the
1432  // sample may still be contributing unsent bytes in the pkt_chain_.
1433 
1434  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1435  "Visit our elems_ with the PacketRemoveVisitor.\n"));
1436 
1437  PacketRemoveVisitor pac_rem_vis(criteria,
1438  pkt_chain_,
1439  header_block_,
1442  remove_all);
1443 
1444  elems_.accept_replace_visitor(pac_rem_vis);
1445 
1446  status = pac_rem_vis.status();
1447 
1448  if (status == REMOVE_ERROR) {
1449  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1450  "The PacketRemoveVisitor encountered a fatal error.\n"));
1451 
1452  } else if (status == REMOVE_NOT_FOUND) {
1453  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1454  "The PacketRemoveVisitor didn't find the sample.\n"));
1455 
1456  } else {
1457  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1458  "The PacketRemoveVisitor found the sample and removed it.\n"));
1459  }
1460 
1461  return status;
1462 }
void accept_remove_visitor(VisitorType &visitor)
Definition: BasicQueue_T.h:95
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
MessageBlockAllocator replaced_element_mb_allocator_
Cached allocator for TransportReplaceElement.
#define VDBG(DBG_ARGS)
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
void accept_replace_visitor(VisitorType &visitor)
Definition: BasicQueue_T.h:113
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ do_send_packet()

ssize_t OpenDDS::DCPS::TransportSendStrategy::do_send_packet ( const ACE_Message_Block packet,
int &  bp 
)
private

Form an IOV and call the send_bytes() template method.

Definition at line 1743 of file TransportSendStrategy.cpp.

References ACE_DEBUG, ACE_TEXT(), ACE_Message_Block::data_block(), DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), pre_send_packet(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), security_config(), send_bytes(), ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().

1744 {
1745  if (Transport_debug_level > 9) {
1746  ACE_DEBUG((LM_DEBUG,
1747  ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
1748  ACE_TEXT("sending data at 0x%x.\n"),
1749  id(), packet));
1750  }
1751  DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6);
1752 
1753 #ifdef OPENDDS_SECURITY
1754  Message_Block_Ptr substitute;
1755  if (security_config()) {
1756  const DDS::Security::CryptoTransform_var crypto = security_config()->get_crypto_transform();
1757  // pre_send_packet may provide different data that takes the place of the
1758  // original "packet" (used for security encryption/authentication)
1759  if (crypto) {
1760  substitute.reset(pre_send_packet(packet));
1761  if (!substitute) {
1762  VDBG((LM_DEBUG, "(%P|%t) DBG: pre_send_packet returned NULL, dropping.\n"));
1763  return packet->total_length();
1764  }
1765  }
1766  }
1767 #endif
1768 
1769  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1770  "Populate the iovec array using the packet.\n"), 5);
1771 
1772  iovec iov[MAX_SEND_BLOCKS];
1773 
1774 #ifdef OPENDDS_SECURITY
1775  const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov);
1776 #else
1777  const int num_blocks = mb_to_iov(*packet, iov);
1778 #endif
1779 
1780  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1781  "There are [%d] number of entries in the iovec array.\n",
1782  num_blocks), 5);
1783 
1784  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1785  "Attempt to send_bytes() now.\n"), 5);
1786 
1787  const ssize_t num_bytes_sent = send_bytes(iov, num_blocks, bp);
1788 
1789  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1790  "The send_bytes() said that num_bytes_sent == [%d].\n",
1791  num_bytes_sent), 5);
1792 
1793 #ifdef OPENDDS_SECURITY
1794  if (num_bytes_sent > 0 && substitute && packet->data_block() != substitute->data_block()) {
1795  // Although the "substitute" data took the place of "packet", the rest
1796  // of the framework needs to account for the bytes in "packet" being taken
1797  // care of, as if they were actually sent.
1798  // Since this is done with datagram sockets, partial sends aren't possible.
1799  return packet->total_length();
1800  }
1801 #endif
1802 
1803  return num_bytes_sent;
1804 }
#define ACE_DEBUG(X)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
virtual ACE_Message_Block * pre_send_packet(const ACE_Message_Block *m)
virtual Security::SecurityConfig_rch security_config() const
int ssize_t
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
#define VDBG(DBG_ARGS)
ACE_Data_Block * data_block(void) const
size_t total_length(void) const
virtual ssize_t send_bytes(const iovec iov[], int n, int &bp)
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
unique_ptr< ACE_Message_Block, Message_Block_Deleter > Message_Block_Ptr

◆ fragmentation_helper()

bool OpenDDS::DCPS::TransportSendStrategy::fragmentation_helper ( TransportQueueElement original_element,
TqeVector &  elements_to_send 
)

Alternative to TransportSendStrategy::send for fragmentation

Parameters
original_elementdata sample to send, may be larger than max msg size
elements_to_sendpopulated by this method with either original_element or fragments created from it. Elements need to be cleaned up by the caller using data_delivered or data_dropped.
Returns
operation succeeded

Definition at line 1988 of file TransportSendStrategy.cpp.

References ACE_ERROR, OpenDDS::DCPS::TransportQueueElement::increment_loan(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::null_tqe_pair, OPENDDS_END_VERSIONED_NAMESPACE_DECL, space_available(), and VDBG_LVL.

1990 {
1991  original_element->increment_loan();
1992  const size_t space = space_available();
1993  for (TransportQueueElement* e = original_element; e;) {
1994  const size_t esize = e->msg()->total_length();
1995  if (esize > space) {
1996  VDBG_LVL((LM_DEBUG, "(%P|%t) TransportSendStrategy::fragmentation_helper: "
1997  "message size %B > space %B: Fragmenting\n", esize, space), 0);
1998  const TqePair pair = e->fragment(space);
1999  if (pair == null_tqe_pair) {
2000  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::fragmentation_helper: "
2001  "Element Fragmentation Failed\n"));
2002  return false;
2003  }
2004  elements_to_send.push_back(pair.first);
2005  e = pair.second;
2006  } else {
2007  elements_to_send.push_back(e);
2008  e = 0;
2009  }
2010  }
2011  return true;
2012 }
#define ACE_ERROR(X)
const TqePair null_tqe_pair
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
size_t space_available(size_t already_used=0) const
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ get_handle()

ACE_INLINE ACE_HANDLE OpenDDS::DCPS::TransportSendStrategy::get_handle ( void  )
virtual

Implements OpenDDS::DCPS::ThreadSynchWorker.

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 140 of file TransportSendStrategy.inl.

References ACE_INLINE.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().

141 {
142  return ACE_INVALID_HANDLE;
143 }

◆ get_packet_elems_from_queue()

void OpenDDS::DCPS::TransportSendStrategy::get_packet_elems_from_queue ( )
private

This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.

After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.

Definition at line 1565 of file TransportSendStrategy.cpp.

References ACE_ERROR, current_space_available(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, LM_TRACE, max_message_size(), max_samples_, OpenDDS::DCPS::null_tqe_pair, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), OpenDDS::DCPS::BasicQueue< T >::size(), and VDBG_LVL.

Referenced by perform_work().

1566 {
1567  DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6);
1568 
1569  for (TransportQueueElement* element = queue_.peek(); element != 0;
1570  element = queue_.peek()) {
1571 
1572  // Total number of bytes in the current element's message block chain.
1573  size_t element_length = element->msg()->total_length();
1574 
1575  // Flag used to determine if the element requires a packet all to itself.
1576  const bool exclusive_packet = element->requires_exclusive_packet();
1577 
1578  const size_t avail = current_space_available();
1579 
1580  bool frag = false;
1581  if (element_length > avail) {
1582  // The current element won't fit into the current packet
1583  if (max_message_size()) { // fragmentation enabled
1584  header_.first_fragment_ = !element->is_fragment();
1585  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0);
1586  const TqePair ep = element->fragment(avail);
1587  if (ep == null_tqe_pair) {
1588  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::get_packet_elems_from_queue: "
1589  "Element Fragmentation Failed\n"));
1590  return;
1591  }
1592  element = ep.first;
1593  element_length = element->msg()->total_length();
1594  queue_.replace_head(ep.second);
1595  frag = true; // queue_ is already taken care of, don't get() later
1596  } else {
1597  break;
1598  }
1599  }
1600 
1601  // If exclusive and the current packet is empty, we won't violate the
1602  // exclusive_packet requirement by put()'ing the element
1603  // into the elems_ collection.
1604  if ((exclusive_packet && elems_.size() == 0)
1605  || !exclusive_packet) {
1606  // At this point, we have passed all of the pre-conditions and we can
1607  // now extract the current element from the queue_, put it into the
1608  // packet elems_, and adjust the packet header_.length_.
1609  elems_.put(frag ? element : queue_.get());
1610  if (header_.length_ == 0) {
1611  header_.last_fragment_ = !frag && element->is_fragment();
1612  }
1613  header_.length_ += static_cast<ACE_UINT32>(element_length);
1614  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0);
1615  }
1616 
1617  // With exclusive and (elems_.size() != 0), we don't use the current
1618  // element as part of the packet. We know that there is already
1619  // at least one element in the packet, and the current element
1620  // is going to need its own (exclusive) packet. We will just
1621  // use the packet elems_ as it is now. Always break once
1622  // we've encountered and dealt with the exclusive_packet case.
1623  // Also break if fragmentation was required.
1624  if (exclusive_packet || frag
1625  // If the current number of packet elems_ has reached the maximum
1626  // number of samples per packet, then we are done.
1627  || elems_.size() == max_samples_
1628  // If the current value of the header_.length_ exceeds (or equals)
1629  // the optimum_size_ for a packet, then we are done.
1630  || header_.length_ >= optimum_size_) {
1631  break;
1632  }
1633  }
1634 }
#define ACE_ERROR(X)
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
const TqePair null_tqe_pair
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
void replace_head(T *value)
Definition: BasicQueue_T.h:47
size_t max_samples_
Configuration - max number of samples per transport packet.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Definition: BasicQueue_T.h:36

◆ isDirectMode()

ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode ( )

Definition at line 128 of file TransportSendStrategy.inl.

References ACE_INLINE, mode_, and MODE_DIRECT.

Referenced by OpenDDS::DCPS::DataLink::resume_send().

129 {
130  return this->mode_ == MODE_DIRECT;
131 }
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.

◆ link_released()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released ( bool  flag)

◆ marshal_transport_header()

bool OpenDDS::DCPS::TransportSendStrategy::marshal_transport_header ( ACE_Message_Block mb)
privatevirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 1723 of file TransportSendStrategy.cpp.

References header_.

Referenced by prepare_packet().

1724 {
1725  return *mb << header_;
1726 }
TransportHeader header_
Current transport packet header.

◆ max_message_size()

ACE_INLINE size_t OpenDDS::DCPS::TransportSendStrategy::max_message_size ( void  ) const
protectedvirtual

The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::MulticastSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.

Definition at line 147 of file TransportSendStrategy.inl.

References ACE_INLINE.

Referenced by get_packet_elems_from_queue(), send(), and space_available().

148 {
149  return 0;
150 }

◆ mb_to_iov()

int OpenDDS::DCPS::TransportSendStrategy::mb_to_iov ( const ACE_Message_Block msg,
iovec *  iov 
)
static

Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.

Definition at line 1967 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::MAX_SEND_BLOCKS.

Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::RtpsUdpTransport::IceEndpoint::send(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().

1968 {
1969  int num_blocks = 0;
1970 #ifdef _MSC_VER
1971 #pragma warning(push)
1972 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here
1973 // since on other platforms iov_len is 64-bit
1974 #pragma warning(disable : 4267)
1975 #endif
1976  for (const ACE_Message_Block* block = &msg;
1977  block && num_blocks < MAX_SEND_BLOCKS;
1978  block = block->cont()) {
1979  iov[num_blocks].iov_len = block->length();
1980  iov[num_blocks++].iov_base = block->rd_ptr();
1981  }
1982 #ifdef _MSC_VER
1983 #pragma warning(pop)
1984 #endif
1985  return num_blocks;
1986 }

◆ mode()

ACE_INLINE TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode ( void  ) const

Access the current sending mode.

Definition at line 16 of file TransportSendStrategy.inl.

References ACE_INLINE, DBG_ENTRY_LVL, and mode_.

Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), mode_as_str(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().

17 {
18  DBG_ENTRY_LVL("TransportSendStrategy","mode",6);
19 
20  return mode_;
21 }
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ mode_as_str()

ACE_INLINE const char * OpenDDS::DCPS::TransportSendStrategy::mode_as_str ( SendMode  mode)
staticprivate

Helper function to debugging.

Definition at line 114 of file TransportSendStrategy.inl.

References ACE_INLINE, and mode().

Referenced by perform_work(), send(), and send_stop().

115 {
116  static const char* SendModeStr[] = { "MODE_NOT_SET",
117  "MODE_DIRECT",
118  "MODE_QUEUE",
119  "MODE_SUSPEND",
120  "MODE_TERMINATED",
121  "UNKNOWN"
122  };
123 
124  return SendModeStr[mode];
125 }
SendMode mode() const
Access the current sending mode.

◆ non_blocking_send()

ssize_t OpenDDS::DCPS::TransportSendStrategy::non_blocking_send ( const iovec  iov[],
int  n,
int &  bp 
)
protectedvirtual

Definition at line 1879 of file TransportSendStrategy.cpp.

References ACE_DEBUG, ACE_TEXT(), ENOBUFS, EWOULDBLOCK, get_handle(), LM_DEBUG, LM_ERROR, ACE::record_and_set_non_blocking_mode(), ACE::restore_non_blocking_mode(), send_bytes_i(), VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().

1880 {
1881  int val = 0;
1882  ACE_HANDLE handle = get_handle();
1883 
1884  if (handle == ACE_INVALID_HANDLE)
1885  return -1;
1886 
1888 
1889  // Set the back-pressure flag to false.
1890  bp = 0;
1891 
1892  // Clear errno
1893  errno = 0;
1894 
1895  ssize_t result = send_bytes_i(iov, n);
1896 
1897  if (result == -1) {
1898  if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) {
1899  VDBG((LM_DEBUG,"(%P|%t) DBG: "
1900  "Backpressure encountered.\n"));
1901  // Set the back-pressure flag to true
1902  bp = 1;
1903 
1904  } else {
1905  VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
1906  ACE_TEXT("sendv"), n),1);
1907 
1908  // try to get the application to core when "Bad Address" is returned
1909  // by looking at the iovec
1910  for (int ii = 0; ii < n; ii++) {
1911  ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
1912  ii, iov[ii].iov_len, iov[ii].iov_base));
1913  }
1914  }
1915  }
1916 
1917  VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: "
1918  "The sendv() returned [%d].\n", result), 5);
1919 
1920  ACE::restore_non_blocking_mode(handle, val);
1921 
1922  return result;
1923 }
virtual ssize_t send_bytes_i(const iovec iov[], int n)=0
#define ACE_DEBUG(X)
int ssize_t
void record_and_set_non_blocking_mode(ACE_HANDLE handle, int &val)
#define VDBG(DBG_ARGS)
ACE_TEXT("TCP_Factory")
void restore_non_blocking_mode(ACE_HANDLE handle, int val)
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ OPENDDS_VECTOR()

OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR ( TQESendModePair  )
private

◆ perform_work()

ThreadSynchWorker::WorkOutcome OpenDDS::DCPS::TransportSendStrategy::perform_work ( void  )
virtual

Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.

Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.

Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.

Implements OpenDDS::DCPS::ThreadSynchWorker.

Definition at line 132 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), queue_, relink(), send_delayed_notifications(), send_packet(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.

133 {
134  DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6);
135 
136  SendPacketOutcome outcome;
137  bool no_more_work = false;
138 
139  { // scope for the guard(lock_);
140  GuardType guard(lock_);
141 
142  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(mode_)), 5);
143 
144  if (mode_ == MODE_TERMINATED) {
145  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
146  "Entered perform_work() and mode_ is MODE_TERMINATED - "
147  "we lost connection and could not reconnect, just return "
148  "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
150  }
151 
152  // The perform_work() is called by our synch_ object using
153  // a thread designated to call this method when it thinks
154  // we need to be called in order to "service" the queue_ and/or
155  // deal with a partially sent current packet.
156  //
157  // We will return a 0 if we don't see a need to have our perform_work()
158  // called again, and we will return a 1 if we do see the need to have our
159  // perform_work() method called again.
160 
161  // First, make sure that the mode_ indicates that we are, indeed, in
162  // the MODE_QUEUE mode. If we are not in MODE_QUEUE mode (meaning we are
163  // in MODE_DIRECT), then it means we didn't need to have this perform_work()
164  // method called - in this case, do nothing other than return
165  // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't
166  // see a need for it to call our perform_work() again (at least not
167  // right now).
168  if (mode_ != MODE_QUEUE && mode_ != MODE_SUSPEND) {
169  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
170  "Entered perform_work() and mode_ is %C - just return "
171  "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(mode_)), 5);
173  }
174 
175  // Check the "state" of the current packet. We will either find that the
176  // current packet is in a state of being "partially sent", or we will find
177  // it in a state of being "empty". When the current packet is "empty", it
178  // means that it is time to build up the current packet using elements
179  // extracted from the queue_, and then we will attempt to send the
180  // packet. When we find the current packet in the "partially sent" state,
181  // we will not touch the queue_ - we will just try to send the unsent
182  // bytes in the current (partially sent) packet.
183  const size_t header_length = header_.length_;
184 
185  if (header_length == 0) {
186  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
187  "The current packet doesn't have any unsent bytes - we "
188  "need to 'populate' the current packet with elems from "
189  "the queue.\n"), 5);
190 
191  // The current packet is "empty". Build up the current packet using
192  // elements from the queue_, and prepare the current packet to be sent.
193 
194  // Before we build the packet from the queue_, let's make sure that
195  // there is actually something on the queue_ to build from.
196  if (queue_.size() == 0) {
197  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
198  "But the queue is empty. We have cleared the "
199  "backpressure situation.\n"),5);
200  // We are here because the queue_ is empty, and there isn't
201  // any "partial packet" bytes left to send. We have overcome
202  // the backpressure situation and don't have anything to do
203  // right now.
204 
205  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
206  "Flip mode to MODE_DIRECT, and return "
207  "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
208 
209  // Flip the mode back to MODE_DIRECT.
210  mode_ = MODE_DIRECT;
211 
212  // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that
213  // perform_work() doesn't need to be called again (at this time).
215  }
216 
217  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
218  "There is at least one elem in the queue - get the packet "
219  "elems from the queue.\n"), 5);
220 
221  // There is stuff in the queue_ if we get to this point in the logic.
222  // Build-up the current packet using element(s) from the queue_.
224 
225  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
226  "Prepare the packet from the packet elems_.\n"), 5);
227 
228  // Now we can prepare the new packet to be sent.
229  prepare_packet();
230 
231  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
232  "Packet has been prepared from packet elems_.\n"), 5);
233 
234  } else {
235  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
236  "We have a current packet that still has unsent bytes.\n"), 5);
237  }
238 
239  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
240  "Attempt to send the current packet.\n"), 5);
241 
242  // Now we can attempt to send the current packet - whether it is
243  // a "partially sent" packet or one that we just built-up using elements
244  // from the queue_ (and subsequently prepared for sending) - it doesn't
245  // matter. Just attempt to send as many of the "unsent" bytes in the
246  // packet as possible.
247  outcome = send_packet();
248 
249  // If we sent the whole packet (eg, partial_send is false), and the queue_
250  // is now empty, then we've cleared the backpressure situation.
251  if ((outcome == OUTCOME_COMPLETE_SEND) && (queue_.size() == 0)) {
252  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
253  "Flip the mode to MODE_DIRECT, and then return "
254  "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
255 
256  // Revert back to MODE_DIRECT mode.
257  mode_ = MODE_DIRECT;
258  no_more_work = true;
259  }
260  } // End of scope for guard(lock_);
261 
262  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
263  "The outcome of the send_packet() was %d.\n", outcome), 5);
264 
266 
267  // If we sent the whole packet (eg, partial_send is false), and the queue_
268  // is now empty, then we've cleared the backpressure situation.
269  if (no_more_work) {
270  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
271  "We sent the whole packet, and there is nothing left on "
272  "the queue now.\n"), 5);
273 
274  // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
275  // don't desire another call to this perform_work() method.
277  }
278 
279  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
280  "We still have unsent bytes in the current packet AND/OR there "
281  "are still elements in the queue.\n"), 5);
282 
283  if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) {
284  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
285  "We lost our connection, or had some fatal connection "
286  "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
287 
288  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
289  "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
290 
291  bool do_suspend = true;
292  relink(do_suspend);
293 
294  if (mode_ == MODE_SUSPEND) {
295  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
296  "The reconnect has not done yet and we are still in MODE_SUSPEND. "
297  "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
298  // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we
299  // don't desire another call to this perform_work() method.
301 
302  } else if (mode_ == MODE_TERMINATED) {
303  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
304  "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
306 
307  } else {
308  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
309  "Reconnect succeeded, Notify synch thread of work "
310  "availability.\n"), 5);
311  // If the datalink is re-established then notify the synch
312  // thread to perform work. We do not hold the object lock at
313  // this point.
314  synch_->work_available();
315  }
316  }
317 
318  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
319  "We still have an 'unbroken' connection.\n"), 5);
320 
321  if (outcome == OUTCOME_BACKPRESSURE) {
322  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
323  "We experienced backpressure on our attempt to send the "
324  "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
325  // We have a "clogged resource".
327  }
328 
329  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
330  "We may have sent the whole current packet, but still have "
331  "elements on the queue.\n"), 5);
332  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
333  "Or, we may have only partially sent the current packet.\n"), 5);
334 
335  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
336  "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
337 
338  // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff
339  // in the queue_ to be sent. *OR* we have have had an OUTCOME_PARTIAL_SEND,
340  // which equates to the same thing - we still have work to do.
341 
342  // We are still in MODE_QUEUE mode, thus there is still work to be
343  // done to service the queue_ and/or a partially sent current packet.
344  // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still
345  // want it to call this perform_work() method.
347 }
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
unique_ptr< ThreadSynch > synch_
The thread synch object.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
virtual void relink(bool do_suspend=true)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
static const char * mode_as_str(SendMode mode)
Helper function to debugging.

◆ pre_send_packet()

virtual ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::pre_send_packet ( const ACE_Message_Block m)
inlineprivatevirtual

Derived classes can override to transform the data right before it's sent. If the returned value is non-NULL it will be sent instead of sending the parameter. If the returned value is NULL the original message will be dropped.

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 274 of file TransportSendStrategy.h.

References ACE_Message_Block::duplicate().

Referenced by do_send_packet().

275  {
276  return m->duplicate();
277  }
virtual ACE_Message_Block * duplicate(void) const

◆ prepare_header()

void OpenDDS::DCPS::TransportSendStrategy::prepare_header ( )
private

This method is responsible for updating the packet header. Called exclusively by prepare_packet.

Definition at line 1637 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.

Referenced by prepare_packet().

1638 {
1639  DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6);
1640 
1641  // Increment header sequence for packet:
1643 
1644  // Allow the specific implementation the opportunity to set
1645  // values in the packet header.
1646  prepare_header_i();
1647 }
TransportHeader header_
Current transport packet header.
SequenceNumber header_sequence_
Current transport header sequence number.
virtual void prepare_header_i()
Specific implementation processing of prepared packet header.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ prepare_header_i()

void OpenDDS::DCPS::TransportSendStrategy::prepare_header_i ( )
protectedvirtual

Specific implementation processing of prepared packet header.

Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.

Definition at line 1650 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_header().

1651 {
1652  DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6);
1653 
1654  // Default implementation does nothing.
1655 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ prepare_packet()

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet ( )
private

This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.

Definition at line 1658 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC, OpenDDS::DCPS::BuildChainVisitor::chain(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, ACE_Message_Block::duplicate(), elems_, DataBlockLockPool::get_lock(), header_block_, header_complete_, header_data_allocator_, header_db_allocator_, header_db_lock_pool_, header_mb_allocator_, LM_DEBUG, marshal_transport_header(), max_header_size_, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, pkt_chain_, prepare_header(), prepare_packet_i(), ACE_Message_Block::release(), VDBG, and ACE_Time_Value::zero.

Referenced by direct_send(), and perform_work().

1659 {
1660  DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6);
1661 
1662  // Prepare the header for sending.
1663  prepare_header();
1664 
1665  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1666  "Marshal the packet header.\n"));
1667 
1668  if (header_block_ != 0) {
1670  }
1671 
1673  static_cast<ACE_Message_Block*>(header_mb_allocator_->malloc()),
1676  0, // cont
1677  0, // data
1678  header_data_allocator_.get(),
1683  header_db_allocator_.get(),
1684  header_mb_allocator_.get()));
1685 
1687 
1689 
1690  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1691  "Use a BuildChainVisitor to visit the packet elems_.\n"));
1692 
1693  // Build up a chain of blocks by duplicating the message block chain
1694  // held by each element (in elems_), and then chaining the new duplicate
1695  // blocks together to form one long chain.
1696  BuildChainVisitor visitor;
1697  elems_.accept_visitor(visitor);
1698 
1699  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1700  "Attach the visitor's chain of blocks to the lone (packet "
1701  "header) block currently in the pkt_chain_.\n"));
1702 
1703  // Attach the visitor's chain of blocks to the packet header block.
1704  pkt_chain_->cont(visitor.chain());
1705 
1706  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1707  "Increment header sequence for next packet.\n"));
1708 
1709  // Allow the specific implementation the opportunity to process the
1710  // newly prepared packet.
1711  prepare_packet_i();
1712 
1713  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1714  "Set the header_complete_ flag to false.\n"));
1715 
1716  // Set the header_complete_ to false to indicate
1717  // that the first block in the pkt_chain_ is the packet header block
1718  // (actually a duplicate() of the packet header_block_).
1719  header_complete_ = false;
1720 }
size_t max_header_size_
Maximum marshalled size of the transport packet header.
unique_ptr< DataAllocator > header_data_allocator_
static const ACE_Time_Value max_time
DataBlockLock * get_lock()
virtual bool marshal_transport_header(ACE_Message_Block *mb)
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
unique_ptr< TransportDataBlockAllocator > header_db_allocator_
Allocator for header message block.
unique_ptr< TransportMessageBlockAllocator > header_mb_allocator_
Allocator for header data block.
#define VDBG(DBG_ARGS)
void accept_visitor(VisitorType &visitor) const
Definition: BasicQueue_T.h:74
virtual ACE_Message_Block * release(void)
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
unique_ptr< DataBlockLockPool > header_db_lock_pool_
DataBlockLockPool.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
static const ACE_Time_Value zero
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
virtual void prepare_packet_i()
Specific implementation processing of prepared packet.

◆ prepare_packet_i()

void OpenDDS::DCPS::TransportSendStrategy::prepare_packet_i ( )
protectedvirtual

Specific implementation processing of prepared packet.

Definition at line 1729 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL.

Referenced by prepare_packet().

1730 {
1731  DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6);
1732 
1733  // Default implementation does nothing.
1734 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ relink()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::relink ( bool  do_suspend = true)
virtual

The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 58 of file TransportSendStrategy.inl.

References ACE_INLINE, and DBG_ENTRY_LVL.

Referenced by direct_send(), and perform_work().

59 {
60  DBG_ENTRY_LVL("TransportSendStrategy","relink",6);
61  // The subsclass needs implement this function for re-establishing
62  // the link upon send failure.
63 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ remove_all_msgs()

void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs ( const GUID_t pub_id)

Definition at line 1315 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, do_remove_sample(), lock_, OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().

Referenced by OpenDDS::DCPS::RtpsUdpDataLink::client_stop(), OpenDDS::DCPS::DataLink::remove_all_msgs(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_all_msgs().

1316 {
1317  DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6);
1318 
1319  const TransportQueueElement::MatchOnPubId match(pub_id);
1321 
1322  GuardType guard(lock_);
1323 
1324  if (send_buffer_ != 0) {
1325  // If a secondary send buffer is bound, removed samples must
1326  // be retained in order to properly maintain the buffer:
1327  send_buffer_->retain_all(pub_id);
1328  }
1329 
1330  do_remove_sample(pub_id, match, true);
1331 }
virtual RemoveResult do_remove_sample(const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
Implement framework chain visitations to remove a sample.
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
virtual void retain_all(const GUID_t &pub_id)

◆ remove_sample()

RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample ( const DataSampleElement sample)

Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.

Definition at line 1334 of file TransportSendStrategy.cpp.

References ACE_Message_Block::cont(), DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.

Referenced by OpenDDS::DCPS::DataLink::remove_sample(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_sample().

1335 {
1336  DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6);
1337 
1338  VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5);
1339 
1340  // The sample to remove is either in temporary delayed notification list or
1341  // internal list (elems_ or queue_). If it's going to be removed from temporary delayed
1342  // notification list by transport thread, it needs acquire WriterDataContainer lock for
1343  // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call
1344  // complete as this call already hold the WriterContainer's lock. So this call is safe to
1345  // access the sample to remove. If it's going to be removed by this remove_sample() calling
1346  // thread, it will be removed either from delayed notification list or from internal list
1347  // in which case the element carry the info if the sample is released so the datalinkset
1348  // can stop calling rest datalinks to remove this sample if it's already released..
1349 
1350  const char* const payload = sample->get_sample()->cont()->rd_ptr();
1351  GUID_t pub_id = sample->get_pub_id();
1352  const TransportQueueElement::MatchOnDataPayload modp(payload);
1353  if (send_delayed_notifications(&modp)) {
1354  return REMOVE_RELEASED;
1355  }
1356 
1357  GuardType guard(lock_);
1358  return do_remove_sample(pub_id, modp);
1359 }
virtual RemoveResult do_remove_sample(const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
Implement framework chain visitations to remove a sample.
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ resume_send()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send ( )

This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.

Definition at line 78 of file TransportSendStrategy.inl.

References ACE_ERROR, ACE_INLINE, DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, lock_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, queue_, OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), and synch_.

Referenced by OpenDDS::DCPS::DataLink::resume_send().

79 {
80  DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6);
81  GuardType guard(this->lock_);
82 
83  // If this send strategy is reused when the connection is reestablished, then
84  // we need re-initialize the mode_ and mode_before_suspend_.
85  if (this->mode_ == MODE_TERMINATED) {
86  this->header_.length_ = 0;
87  this->pkt_chain_ = 0;
88  this->header_complete_ = false;
89  this->start_counter_ = 0;
90  this->mode_ = MODE_DIRECT;
92  this->delayed_delivered_notification_queue_.clear();
93 
94  } else if (this->mode_ == MODE_SUSPEND) {
95  this->header_.length_ = 0;
96  this->pkt_chain_ = 0;
97  QueueType elems;
98  elems.swap(this->elems_);
99  this->mode_ = this->mode_before_suspend_;
100  this->header_complete_ = false;
102  if (this->queue_.size() > 0) {
103  this->mode_ = MODE_QUEUE;
104  this->synch_->work_available();
105  }
106 
107  } else {
108  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send The suspend or terminate"
109  " is not called previously.\n"));
110  }
111 }
BasicQueue< TransportQueueElement > QueueType
#define ACE_ERROR(X)
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
unique_ptr< ThreadSynch > synch_
The thread synch object.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
void swap(BasicQueue &other)
Definition: BasicQueue_T.h:122
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ security_config()

virtual Security::SecurityConfig_rch OpenDDS::DCPS::TransportSendStrategy::security_config ( ) const
inlineprotectedvirtual

Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.

Definition at line 219 of file TransportSendStrategy.h.

Referenced by do_send_packet().

219 { return Security::SecurityConfig_rch(); }
DCPS::RcHandle< SecurityConfig > SecurityConfig_rch

◆ send()

void OpenDDS::DCPS::TransportSendStrategy::send ( TransportQueueElement element,
bool  relink = true 
)

Our DataLink has been requested by some particular TransportClient to send the element.

Definition at line 935 of file TransportSendStrategy.cpp.

References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), add_delayed_notification(), current_space_available(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, LM_TRACE, lock_, max_header_size_, max_message_size(), max_samples_, max_size_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::null_tqe_pair, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::DataLink::send_i().

936 {
937  if (Transport_debug_level > 9) {
938  ACE_DEBUG((LM_DEBUG,
939  ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ")
940  ACE_TEXT("sending data at 0x%x.\n"),
941  id(), element));
942  }
943 
944  DBG_ENTRY_LVL("TransportSendStrategy", "send", 6);
945 
946  {
947  GuardType guard(lock_);
948 
949  if (link_released_) {
950  add_delayed_notification(element);
951 
952  } else {
954  VDBG((LM_DEBUG, "(%P|%t) DBG: "
955  "TransportSendStrategy::send: mode is MODE_TERMINATED and not in "
956  "graceful disconnecting, so discard message.\n"));
957  guard.release();
958  element->data_dropped(true);
959  return;
960  }
961 
962  size_t element_length = element->msg()->total_length();
963 
964  VDBG((LM_DEBUG, "(%P|%t) DBG: "
965  "Send element msg() has total_length() == [%d].\n",
966  element_length));
967 
968  VDBG((LM_DEBUG, "(%P|%t) DBG: "
969  "max_header_size_ == [%d].\n",
971 
972  VDBG((LM_DEBUG, "(%P|%t) DBG: "
973  "max_size_ == [%d].\n",
974  max_size_));
975 
976  const size_t max_msg_size = max_message_size();
977 
978  // Really an assert. We can't accept any element that wouldn't fit into
979  // a transport packet by itself (ie, it would be the only element in the
980  // packet). This max_size_ is the user-configurable maximum, not based
981  // on the transport's inherent maximum message size. If max_msg_size
982  // is non-zero, we will fragment so max_size_ doesn't apply per-element.
983  if (max_msg_size == 0 &&
984  max_header_size_ + element_length > max_size_) {
985  ACE_ERROR((LM_ERROR,
986  "(%P|%t) ERROR: Element too large (%Q) "
987  "- won't fit into packet.\n", ACE_UINT64(element_length)));
988  return;
989  }
990 
991  // Check the mode_ to see if we simply put the element on the queue.
992  if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
993  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
994  "mode_ == %C, so queue elem and leave.\n",
995  mode_as_str(mode_)), 5);
996 
997  queue_.put(element);
998 
999  if (mode_ != MODE_SUSPEND) {
1000  synch_->work_available();
1001  }
1002 
1003  return;
1004  }
1005 
1006  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1007  "mode_ == MODE_DIRECT.\n"));
1008 
1009  // We are in the MODE_DIRECT send mode. When in this mode, the send()
1010  // calls will "build up" the transport packet to be sent directly when it
1011  // reaches the optimal size, contains the maximum number of samples, etc.
1012 
1013  // We need to check if the current element (the arg passed-in to this
1014  // send() method) should be appended to the transport packet, or if the
1015  // transport packet should be sent (directly) first, dealing with the
1016  // current element afterwards.
1017 
1018  // We will decide to send the packet as it is now, under two circumstances:
1019  //
1020  // Either:
1021  //
1022  // (1) The current element won't fit into the current packet since it
1023  // would violate the max_packet_size_.
1024  //
1025  // -OR-
1026  //
1027  // (2) There is at least one element already in the current packet,
1028  // and the current element says that it must be sent in an
1029  // exclusive packet (ie, in a packet all by itself).
1030  //
1031  const bool exclusive = element->requires_exclusive_packet();
1032 
1033  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1034  "The element %C require an exclusive packet.\n",
1035  (exclusive ? "DOES" : "does NOT")
1036  ));
1037 
1038  const size_t space_needed =
1039  (max_msg_size > 0)
1040  ? /* fragmenting */ DataSampleHeader::get_max_serialized_size() + MIN_FRAG
1041  : /* not fragmenting */ element_length;
1042 
1043  if ((exclusive && (elems_.size() != 0))
1044  || (current_space_available() < space_needed)) {
1045 
1046  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1047  "Element won't fit in current packet or requires exclusive"
1048  " - send current packet (directly) now.\n"));
1049 
1050  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1051  "max_header_size_: %d, header_.length_: %d, element_length: %d\n"
1052  , max_header_size_, header_.length_, element_length));
1053 
1054  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1055  "Tot possible length: %d, max_len: %d\n"
1056  , max_header_size_ + header_.length_ + element_length
1057  , max_size_));
1058  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1059  "current elem size: %d\n"
1060  , elems_.size()));
1061 
1062  // Send the current packet, and deal with the current element
1063  // afterwards.
1064  // The invocation's relink status should dictate the direct_send's
1065  // do_relink. We don't want a (relink == false) invocation to end up
1066  // doing a relink. Think of (relink == false) as a non-blocking call.
1068 
1069  // Now check to see if we flipped into MODE_QUEUE, which would mean
1070  // that the direct_send() experienced backpressure, and the
1071  // packet was only partially sent. If this has happened, we deal with
1072  // the current element by placing it on the queue (and then we are done).
1073  //
1074  // Otherwise, if the mode_ is still MODE_DIRECT, we can just
1075  // "drop" through to the next step in the logic where we append the
1076  // current element to the current packet.
1077  if (mode_ == MODE_QUEUE) {
1078  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1079  "We experienced backpressure on that direct send, as "
1080  "the mode_ is now MODE_QUEUE or MODE_SUSPEND. "
1081  "Queue elem and leave.\n"), 5);
1082  queue_.put(element);
1083  synch_->work_available();
1084 
1085  return;
1086  }
1087  }
1088 
1089  // Loop for sending 'element', in fragments if needed
1090  bool first_pkt = true; // enter the loop 1st time through unconditionally
1091  for (TransportQueueElement* next_fragment = 0;
1092  (first_pkt || next_fragment)
1093  && (mode_ == MODE_DIRECT || mode_ == MODE_TERMINATED);) {
1094  // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg)
1095 
1096  if (next_fragment) {
1097  element = next_fragment;
1098  element_length = next_fragment->msg()->total_length();
1099  header_.first_fragment_ = false;
1100  }
1101 
1102  header_.last_fragment_ = false;
1103  if (max_msg_size) { // fragmentation enabled
1104  const size_t avail = current_space_available();
1105  if (element_length > avail) {
1106  VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting %B > %B\n", element_length, avail), 0);
1107  const TqePair ep = element->fragment(avail);
1108  if (ep == null_tqe_pair) {
1109  ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::send: "
1110  "Element Fragmentation Failed\n"));
1111  return;
1112  }
1113  element = ep.first;
1114  element_length = element->msg()->total_length();
1115  next_fragment = ep.second;
1116  header_.first_fragment_ = first_pkt;
1117  } else if (next_fragment) {
1118  // We are sending the "tail" element of a previous fragment()
1119  // operation, and this element didn't itself require fragmentation
1120  header_.last_fragment_ = true;
1121  next_fragment = 0;
1122  }
1123  }
1124  first_pkt = false;
1125 
1126  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1127  "Start the 'append elem' to current packet logic.\n"));
1128 
1129  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1130  "Put element into current packet elems_.\n"));
1131 
1132  // Now that we know the current element should go into the current
1133  // packet, we can just go ahead and "append" the current element to
1134  // the current packet.
1135 
1136  // Add the current element to the collection of packet elements.
1137  elems_.put(element);
1138 
1139  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1140  "Before, the header_.length_ == [%d].\n",
1141  header_.length_));
1142 
1143  // Adjust the header_.length_ to account for the length of the element.
1144  header_.length_ += static_cast<ACE_UINT32>(element_length);
1145  const size_t message_length = header_.length_;
1146 
1147  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1148  "After adding element's length, the header_.length_ == [%d].\n",
1149  message_length));
1150 
1151  // The current packet now contains the current element. We need to
1152  // check to see if the conditions are such that we should go ahead and
1153  // attempt to send the packet "directly" now, or if we can just leave
1154  // and send the current packet later (in another send() call or in a
1155  // send_stop() call).
1156 
1157  // There a few conditions that will cause us to attempt to send the
1158  // packet (directly) right now:
1159  // - Fragmentation was needed
1160  // - The current packet has the maximum number of samples per packet.
1161  // - The current packet's total length exceeds the optimum packet size.
1162  // - The current element (currently part of the packet elems_)
1163  // requires an exclusive packet.
1164  //
1165  if (next_fragment || (elems_.size() >= max_samples_)
1166  || (max_header_size_ + message_length > optimum_size_)
1167  || exclusive) {
1168  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1169  "Now the current packet looks full - send it (directly).\n"));
1170 
1172 
1173  if (next_fragment && mode_ != MODE_DIRECT) {
1174  if (mode_ == MODE_QUEUE) {
1175  queue_.put(next_fragment);
1176  synch_->work_available();
1177 
1178  } else {
1179  next_fragment->data_dropped(true /* dropped by transport */);
1180  }
1181  } else if (mode_ == MODE_QUEUE) {
1182  // Background thread handles packets in progress
1183  synch_->work_available();
1184  }
1185 
1186  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1187  "Back from the direct_send() attempt.\n"));
1188 
1189  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1190  "And we %C as a result of the direct_send() call.\n",
1191  ((mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE"
1192  : "stayed in MODE_DIRECT")));
1193 
1194  } else {
1195  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1196  "Packet not sent. Send conditions weren't satisfied.\n"));
1197  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1198  "elems_.size(): %d, max_samples_: %d\n",
1199  int(elems_.size()), int(max_samples_)));
1200  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1201  "header_size_: %d, optimum_size_: %d\n",
1202  int(max_header_size_ + message_length),
1203  int(optimum_size_)));
1204  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1205  "element_requires_exclusive_packet: %d\n", int(exclusive)));
1206 
1207  if (mode_ == MODE_QUEUE) {
1208  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1209  "We flipped into MODE_QUEUE.\n"));
1210 
1211  } else {
1212  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1213  "We stayed in MODE_DIRECT.\n"));
1214  }
1215  }
1216  }
1217  }
1218  }
1219 
1221 }
#define ACE_DEBUG(X)
size_t max_header_size_
Maximum marshalled size of the transport packet header.
#define ACE_ERROR(X)
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
unique_ptr< ThreadSynch > synch_
The thread synch object.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
const TqePair null_tqe_pair
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
virtual void add_delayed_notification(TransportQueueElement *element)
#define VDBG(DBG_ARGS)
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
virtual void relink(bool do_suspend=true)
ACE_TEXT("TCP_Factory")
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
unsigned long long ACE_UINT64
size_t max_samples_
Configuration - max number of samples per transport packet.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
static const char * mode_as_str(SendMode mode)
Helper function to debugging.
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Definition: BasicQueue_T.h:36

◆ send_buffer()

void OpenDDS::DCPS::TransportSendStrategy::send_buffer ( TransportSendBuffer send_buffer)

Assigns an optional send buffer.

Definition at line 122 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.

Referenced by OpenDDS::DCPS::MulticastDataLink::MulticastDataLink(), and OpenDDS::DCPS::MulticastDataLink::~MulticastDataLink().

123 {
125 
126  if (send_buffer_ != 0) {
127  send_buffer_->bind(this);
128  }
129 }
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
void bind(TransportSendStrategy *strategy)

◆ send_bytes()

ACE_INLINE ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes ( const iovec  iov[],
int  n,
int &  bp 
)
protectedvirtual

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 134 of file TransportSendStrategy.inl.

References ACE_INLINE, and send_bytes_i().

Referenced by do_send_packet().

135 {
136  return send_bytes_i(iov, n);
137 }
virtual ssize_t send_bytes_i(const iovec iov[], int n)=0

◆ send_bytes_i()

virtual ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
)
protectedpure virtual

◆ send_delayed_notifications()

bool OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications ( const TransportQueueElement::MatchCriteria match = 0)
protected

If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.

Definition at line 657 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::is_shut_down(), lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_.

Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().

658 {
659  DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6);
660  TransportQueueElement* sample = 0;
662 
664 
665  size_t num_delayed_notifications = 0;
666  bool found_element = false;
667 
668  {
669  GuardType guard(lock_);
670 
671  num_delayed_notifications = delayed_delivered_notification_queue_.size();
672 
673  if (num_delayed_notifications == 0) {
674  return false;
675 
676  } else if (num_delayed_notifications == 1) {
677  // Optimization for the most common case (doesn't need vectors)
678 
679  if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) {
680  found_element = true;
681  sample = delayed_delivered_notification_queue_[0].first;
682  mode = delayed_delivered_notification_queue_[0].second;
683 
684  delayed_delivered_notification_queue_.clear();
685  }
686 
687  } else {
688  OPENDDS_VECTOR(TQESendModePair)::iterator iter;
689  for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
690  sample = iter->first;
691  mode = iter->second;
692  if (!match || match->matches(*sample)) {
693  found_element = true;
694  samples.push_back(*iter);
695  iter = delayed_delivered_notification_queue_.erase(iter);
696  } else {
697  ++iter;
698  }
699  }
700  }
701  }
702 
703  if (!found_element) {
704  return false;
705  }
706 
707  bool transport_shutdown = true;
708  TransportImpl_rch transport = transport_.lock();
709  if (transport) {
710  transport_shutdown = transport->is_shut_down();
711  }
712 
713  if (num_delayed_notifications == 1) {
714  // optimization for the common case
715  if (mode == MODE_TERMINATED) {
716  if (!transport_shutdown || sample->owned_by_transport()) {
717  sample->data_dropped(true);
718  }
719  } else {
720  if (!transport_shutdown || sample->owned_by_transport()) {
721  sample->data_delivered();
722  }
723  }
724 
725  } else {
726  for (size_t i = 0; i < samples.size(); ++i) {
727  if (samples[i].second == MODE_TERMINATED) {
728  if (!transport_shutdown || samples[i].first->owned_by_transport()) {
729  samples[i].first->data_dropped(true);
730  }
731  } else {
732  if (!transport_shutdown || samples[i].first->owned_by_transport()) {
733  samples[i].first->data_delivered();
734  }
735  }
736  }
737  }
738  return true;
739 }
SendMode mode() const
Access the current sending mode.
RcHandle< TransportImpl > TransportImpl_rch
The type definition for the smart-pointer to the underlying type.
std::pair< TransportQueueElement *, SendMode > TQESendModePair
Used for delayed notifications when performing work.
OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_
WeakRcHandle< TransportImpl > transport_
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_packet()

TransportSendStrategy::SendPacketOutcome OpenDDS::DCPS::TransportSendStrategy::send_packet ( )
private

This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().

Definition at line 1807 of file TransportSendStrategy.cpp.

References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), elems_, header_, OpenDDS::DCPS::TransportSendBuffer::insert(), LM_DEBUG, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, pkt_chain_, send_buffer_, OpenDDS::DCPS::TransportHeader::sequence_, VDBG, and VDBG_LVL.

Referenced by direct_send(), and perform_work().

1808 {
1809  DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6);
1810 
1811  int bp_flag = 0;
1812  const ssize_t num_bytes_sent =
1813  do_send_packet(pkt_chain_, bp_flag);
1814 
1815  if (num_bytes_sent == 0) {
1816  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1817  "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
1818  // This means that the peer has disconnected.
1819  return OUTCOME_PEER_LOST;
1820  }
1821 
1822  if (num_bytes_sent < 0) {
1823  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1824  "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
1825 
1826  // Check for backpressure...
1827  if (bp_flag == 1) {
1828  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1829  "Since backpressure flag is true, return "
1830  "OUTCOME_BACKPRESSURE.\n"), 5);
1831  // Ok. Not really an error - just backpressure.
1832  return OUTCOME_BACKPRESSURE;
1833  }
1834 
1835  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1836  "Since backpressure flag is false, return "
1837  "OUTCOME_SEND_ERROR.\n"), 5);
1838 
1839  // Not backpressure - it's a real error.
1840  // Note: moved this to send_bytes so the errno msg could be written.
1841  //ACE_ERROR((LM_ERROR,
1842  // "(%P|%t) ERROR: Call to peer().send() failed with negative "
1843  // "return code.\n"));
1844 
1845  return OUTCOME_SEND_ERROR;
1846  }
1847 
1848  if (send_buffer_ != 0) {
1849  // If a secondary send buffer is bound, sent samples must
1850  // be inserted in order to properly maintain the buffer:
1852  &elems_, pkt_chain_);
1853  }
1854 
1855  VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: "
1856  "Since num_bytes_sent > 0, adjust the packet to account for "
1857  "the bytes that did get sent.\n"),5);
1858 
1859  // We sent some bytes - adjust the current packet (elems_ and pkt_chain_)
1860  // to account for the bytes that have been sent.
1861  const int result =
1862  adjust_packet_after_send(num_bytes_sent);
1863 
1864  if (result == 0) {
1865  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1866  "The adjustment logic says that the complete packet was "
1867  "sent. Return OUTCOME_COMPLETE_SEND.\n"));
1868  return OUTCOME_COMPLETE_SEND;
1869  }
1870 
1871  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1872  "The adjustment logic says that only a part of the packet was "
1873  "sent. Return OUTCOME_PARTIAL_SEND.\n"));
1874 
1875  return OUTCOME_PARTIAL_SEND;
1876 }
int adjust_packet_after_send(ssize_t num_bytes_sent)
ssize_t do_send_packet(const ACE_Message_Block *packet, int &bp)
Form an IOV and call the send_bytes() template method.
TransportHeader header_
Current transport packet header.
int ssize_t
virtual void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)=0
#define VDBG(DBG_ARGS)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)

◆ send_start()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start ( )

Invoked prior to one or more send() invocations from a particular TransportClient.

Definition at line 38 of file TransportSendStrategy.inl.

References ACE_INLINE, DBG_ENTRY_LVL, link_released_, lock_, and start_counter_.

Referenced by OpenDDS::DCPS::DataLink::send_start_i().

39 {
40  DBG_ENTRY_LVL("TransportSendStrategy","send_start",6);
41 
42  GuardType guard(this->lock_);
43 
44  if (!this->link_released_)
45  ++this->start_counter_;
46 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_stop()

void OpenDDS::DCPS::TransportSendStrategy::send_stop ( GUID_t  repoId)

Invoked after one or more send() invocations from a particular TransportClient.

Definition at line 1224 of file TransportSendStrategy.cpp.

References DBG_ENTRY_LVL, direct_send(), elems_, graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, lock_, mode_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, synch_, VDBG, and VDBG_LVL.

Referenced by OpenDDS::DCPS::DataLink::send_stop_i().

1225 {
1226  DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6);
1227  {
1228  GuardType guard(lock_);
1229 
1230  if (link_released_)
1231  return;
1232 
1233  if (start_counter_ == 0) {
1234  // This is an indication of a logic error. This is more of an assert.
1235  VDBG_LVL((LM_ERROR,
1236  "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
1237  return;
1238  }
1239 
1240  --start_counter_;
1241 
1242  if (start_counter_ != 0) {
1243  // This wasn't the last send_stop() that we are expecting. We only
1244  // really honor the first send_start() and the last send_stop().
1245  // We can return without doing anything else in this case.
1246  return;
1247  }
1248 
1250  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1251  "TransportSendStrategy::send_stop: dont try to send current packet "
1252  "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
1253  return;
1254  }
1255 
1256  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1257  "This is an 'important' send_stop() event since our "
1258  "start_counter_ is 0.\n"));
1259 
1260  // We just caused the start_counter_ to become zero. This
1261  // means that we aren't expecting another send() or send_stop() at any
1262  // time in the near future (ie, it isn't imminent).
1263 
1264  // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have
1265  // anything to do here because samples have already been going to the
1266  // queue.
1267 
1268  // We only need to do something if the mode_ is
1269  // MODE_DIRECT. It means that we may have some sample(s) in the
1270  // current packet that have never been sent. This is our
1271  // opportunity to send the current packet directly if this is the case.
1272  if (mode_ == MODE_QUEUE || mode_ == MODE_SUSPEND) {
1273  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1274  "But since we are in %C, we don't have to do "
1275  "anything more in this important send_stop().\n",
1276  mode_as_str(mode_)));
1277  // We don't do anything if we are in MODE_QUEUE. Just leave.
1278  return;
1279  }
1280 
1281  size_t header_length = header_.length_;
1282  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1283  "We are in MODE_DIRECT in an important send_stop() - "
1284  "header_.length_ == [%d].\n", header_length));
1285 
1286  // Only attempt to send the current packet (directly) if the current
1287  // packet actually contains something (it could be empty).
1288  if ((header_length > 0) &&
1289  //(elems_.size ()+not_yet_pac_q_->size() > 0))
1290  (elems_.size() > 0)) {
1291  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1292  "There is something in the current packet - attempt to send "
1293  "it (directly) now.\n"));
1294  // If a relink needs to be done for this packet to be sent, do it.
1295  direct_send(true);
1296  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1297  "Back from the attempt to send leftover packet directly.\n"));
1298 
1299  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1300  "But we %C as a result.\n",
1301  ((mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE":
1302  "stayed in MODE_DIRECT" )));
1303  if (mode_ == MODE_QUEUE && mode_ != MODE_SUSPEND) {
1304  VDBG((LM_DEBUG, "(%P|%t) DBG: "
1305  "Notify Synch thread of work availability\n"));
1306  synch_->work_available();
1307  }
1308  }
1309  }
1310 
1312 }
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
TransportHeader header_
Current transport packet header.
unique_ptr< ThreadSynch > synch_
The thread synch object.
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define VDBG(DBG_ARGS)
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define VDBG_LVL(DBG_ARGS, LEVEL)
static const char * mode_as_str(SendMode mode)
Helper function to debugging.

◆ set_graceful_disconnecting()

void OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting ( bool  flag)
protected

Set graceful disconnecting flag.

Definition at line 1737 of file TransportSendStrategy.cpp.

References graceful_disconnecting_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().

◆ set_header_source()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::set_header_source ( ACE_INT64  source)
protected

Definition at line 32 of file TransportSendStrategy.inl.

References ACE_INLINE, header_, and OpenDDS::DCPS::TransportHeader::source_.

Referenced by OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i().

33 {
34  header_.source_ = source;
35 }
TransportHeader header_
Current transport packet header.

◆ space_available()

size_t OpenDDS::DCPS::TransportSendStrategy::space_available ( size_t  already_used = 0) const
private

How much space is available in packet with a given used space before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]

Definition at line 1951 of file TransportSendStrategy.cpp.

References max_header_size_, max_message_size(), and max_size_.

Referenced by current_space_available(), and fragmentation_helper().

1952 {
1953  const size_t used = max_header_size_ + already_used;
1954  const size_t max_msg = max_message_size();
1955  if (max_msg) {
1956  return std::min(static_cast<size_t>(max_size_), max_msg) - used;
1957  }
1958  return max_size_ - used;
1959 }
size_t max_header_size_
Maximum marshalled size of the transport packet header.
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)

◆ start()

int OpenDDS::DCPS::TransportSendStrategy::start ( void  )

Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.

Definition at line 831 of file TransportSendStrategy.cpp.

References ACE_ERROR_RETURN, OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, header_data_allocator_, header_db_allocator_, header_db_lock_pool_, header_mb_allocator_, LM_ERROR, lock_, max_header_size_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), send_buffer_, start_i(), synch_, and TheServiceParticipant.

Referenced by OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i(), and OpenDDS::DCPS::DataLink::start().

832 {
833  DBG_ENTRY_LVL("TransportSendStrategy","start",6);
834 
835  {
836  GuardType guard(lock_);
837 
838  if (!start_i()) {
839  return -1;
840  }
841  }
842 
843  size_t header_chunks(1);
844 
845  // If a secondary send buffer is bound, sent headers should
846  // be cached to properly maintain the buffer:
847  if (send_buffer_ != 0) {
848  header_chunks += send_buffer_->capacity();
849 
850  } else {
851  header_chunks += 1;
852  }
853 
854  header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks));
855  header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks));
856  header_db_lock_pool_.reset(new DataBlockLockPool(static_cast<unsigned long>(TheServiceParticipant->n_chunks())));
857  header_data_allocator_.reset(new DataAllocator(TheServiceParticipant->association_chunk_multiplier(), max_header_size_));
858 
859  // Since we (the TransportSendStrategy object) are a reference-counted
860  // object, but the synch_ object doesn't necessarily know this, we need
861  // to give a "copy" of a reference to ourselves to the synch_ object here.
862  // We will do the reverse when we unregister ourselves (as a worker) from
863  // the synch_ object.
864 
865  if (synch_->register_worker(*this) == -1) {
866 
867  ACE_ERROR_RETURN((LM_ERROR,
868  "(%P|%t) ERROR: TransportSendStrategy failed to register "
869  "as a worker with the ThreadSynch object.\n"),
870  -1);
871  }
872 
873  return 0;
874 }
size_t max_header_size_
Maximum marshalled size of the transport packet header.
unique_ptr< DataAllocator > header_data_allocator_
unique_ptr< ThreadSynch > synch_
The thread synch object.
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
Cached_Allocator_With_Overflow< ACE_Data_Block, RECEIVE_SYNCH > TransportDataBlockAllocator
unique_ptr< TransportDataBlockAllocator > header_db_allocator_
Allocator for header message block.
unique_ptr< TransportMessageBlockAllocator > header_mb_allocator_
Allocator for header data block.
virtual bool start_i()
Let the subclass start.
unique_ptr< DataBlockLockPool > header_db_lock_pool_
DataBlockLockPool.
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
Allocator for data buffers.
Cached_Allocator_With_Overflow< ACE_Message_Block, RECEIVE_SYNCH > TransportMessageBlockAllocator
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant

◆ start_i()

virtual bool OpenDDS::DCPS::TransportSendStrategy::start_i ( )
inlinevirtual

Let the subclass start.

Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.

Definition at line 136 of file TransportSendStrategy.h.

Referenced by start().

136 { return true; }

◆ stop()

void OpenDDS::DCPS::TransportSendStrategy::stop ( void  )

Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.

Definition at line 877 of file TransportSendStrategy.cpp.

References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), ACE_DEBUG, ACE_TEXT(), DBG_ENTRY_LVL, elems_, header_block_, LM_WARNING, lock_, pkt_chain_, queue_, ACE_Message_Block::release(), OpenDDS::DCPS::BasicQueue< T >::size(), stop_i(), OpenDDS::DCPS::BasicQueue< T >::swap(), synch_, and ACE_Message_Block::total_length().

Referenced by OpenDDS::DCPS::DataLink::start(), and OpenDDS::DCPS::DataLink::stop().

878 {
879  DBG_ENTRY_LVL("TransportSendStrategy","stop",6);
880 
881  if (header_block_ != 0) {
883  header_block_ = 0;
884  }
885 
886  synch_->unregister_worker();
887 
888  QueueType elems;
889  QueueType queue;
890  {
891  GuardType guard(lock_);
892 
893  if (pkt_chain_ != 0) {
894  size_t size = pkt_chain_->total_length();
895  if (size > 0) {
896  pkt_chain_->release();
897  ACE_DEBUG((LM_WARNING,
898  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
899  ACE_TEXT("terminating with %d unsent bytes.\n"),
900  size));
901  }
902  pkt_chain_ = 0;
903  }
904 
905  if (elems_.size()) {
906  elems_.swap(elems);
907  ACE_DEBUG((LM_WARNING,
908  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
909  ACE_TEXT("terminating with %d unsent elements.\n"),
910  elems_.size()));
911  }
912 
913  if (queue_.size()) {
914  queue_.swap(queue);
915  ACE_DEBUG((LM_WARNING,
916  ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ")
917  ACE_TEXT("terminating with %d queued elements.\n"),
918  queue_.size()));
919  }
920  }
921 
922  RemoveAllVisitor remove_all_visitor;
923 
924  elems.accept_remove_visitor(remove_all_visitor);
925  queue.accept_remove_visitor(remove_all_visitor);
926 
927  {
928  GuardType guard(lock_);
929 
930  stop_i();
931  }
932 }
BasicQueue< TransportQueueElement > QueueType
#define ACE_DEBUG(X)
size_t size() const
Accessor for the current number of elements in the queue.
Definition: BasicQueue_T.h:65
unique_ptr< ThreadSynch > synch_
The thread synch object.
virtual ACE_Message_Block * release(void)
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
size_t total_length(void) const
void swap(BasicQueue &other)
Definition: BasicQueue_T.h:122
ACE_TEXT("TCP_Factory")
virtual void stop_i()=0
Let the subclass stop.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ stop_i()

virtual void OpenDDS::DCPS::TransportSendStrategy::stop_i ( )
pure virtual

◆ suspend_send()

ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send ( )

This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.

Definition at line 66 of file TransportSendStrategy.inl.

References ACE_INLINE, DBG_ENTRY_LVL, lock_, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.

67 {
68  DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6);
69  GuardType guard(this->lock_);
70 
71  if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) {
72  this->mode_before_suspend_ = this->mode_;
73  this->mode_ = MODE_SUSPEND;
74  }
75 }
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ synch()

ACE_INLINE ThreadSynch * OpenDDS::DCPS::TransportSendStrategy::synch ( ) const
protected

Definition at line 24 of file TransportSendStrategy.inl.

References ACE_INLINE, DBG_ENTRY_LVL, and synch_.

Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().

25 {
26  DBG_ENTRY_LVL("TransportSendStrategy","synch",6);
27 
28  return synch_.get();
29 }
unique_ptr< ThreadSynch > synch_
The thread synch object.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ terminate_send()

void OpenDDS::DCPS::TransportSendStrategy::terminate_send ( bool  graceful_disconnecting = false)

Remove all samples in the backpressure queue and packet queue.

This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.

Definition at line 743 of file TransportSendStrategy.cpp.

References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, LM_DEBUG, lock_, mode_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.

Referenced by OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and OpenDDS::DCPS::DataLink::terminate_send().

744 {
745  DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6);
746 
747  bool reset_flag = true;
748 
749  {
750  GuardType guard(lock_);
751 
752  // If the terminate_send call due to a non-graceful disconnection before
753  // a datalink shutdown then we will not try to send the graceful disconnect
754  // message.
755  if ((mode_ == MODE_TERMINATED || mode_ == MODE_SUSPEND)
757  VDBG((LM_DEBUG, "(%P|%t) DBG: "
758  "It was already terminated non gracefully, will not set to graceful disconnecting\n"));
759  reset_flag = false;
760  }
761  }
762 
763  VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED\n"));
764 
766 
767  if (reset_flag) {
768  GuardType guard(lock_);
769  graceful_disconnecting_ = graceful_disconnecting;
770  }
771 }
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
#define VDBG(DBG_ARGS)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ terminate_send_if_suspended()

void OpenDDS::DCPS::TransportSendStrategy::terminate_send_if_suspended ( )
virtual

Reimplemented in OpenDDS::DCPS::TcpSendStrategy.

Definition at line 774 of file TransportSendStrategy.cpp.

Referenced by OpenDDS::DCPS::DataLink::terminate_send_if_suspended().

775 {
776 }

Friends And Related Function Documentation

◆ TransportSendBuffer

friend class TransportSendBuffer
friend

Definition at line 436 of file TransportSendStrategy.h.

Member Data Documentation

◆ elems_

QueueType OpenDDS::DCPS::TransportSendStrategy::elems_
private

Current elements that have contributed blocks to the current transport packet.

Definition at line 368 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), resume_send(), send(), send_packet(), send_stop(), and stop().

◆ graceful_disconnecting_

bool OpenDDS::DCPS::TransportSendStrategy::graceful_disconnecting_
private

◆ header_

TransportHeader OpenDDS::DCPS::TransportSendStrategy::header_
private

◆ header_block_

ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::header_block_
private

Current transport packet header, marshalled.

Definition at line 361 of file TransportSendStrategy.h.

Referenced by do_remove_sample(), prepare_packet(), and stop().

◆ header_complete_

bool OpenDDS::DCPS::TransportSendStrategy::header_complete_
private

Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.

Definition at line 376 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().

◆ header_data_allocator_

unique_ptr<DataAllocator> OpenDDS::DCPS::TransportSendStrategy::header_data_allocator_
private

Definition at line 412 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

◆ header_db_allocator_

unique_ptr<TransportDataBlockAllocator> OpenDDS::DCPS::TransportSendStrategy::header_db_allocator_
private

Allocator for header message block.

Definition at line 405 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

◆ header_db_lock_pool_

unique_ptr<DataBlockLockPool> OpenDDS::DCPS::TransportSendStrategy::header_db_lock_pool_
private

DataBlockLockPool.

Definition at line 408 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

◆ header_mb_allocator_

unique_ptr<TransportMessageBlockAllocator> OpenDDS::DCPS::TransportSendStrategy::header_mb_allocator_
private

Allocator for header data block.

Definition at line 402 of file TransportSendStrategy.h.

Referenced by prepare_packet(), and start().

◆ header_sequence_

SequenceNumber OpenDDS::DCPS::TransportSendStrategy::header_sequence_
private

Current transport header sequence number.

Definition at line 364 of file TransportSendStrategy.h.

Referenced by prepare_header().

◆ link_released_

bool OpenDDS::DCPS::TransportSendStrategy::link_released_
private

Definition at line 429 of file TransportSendStrategy.h.

Referenced by link_released(), send(), send_start(), and send_stop().

◆ lock_

LockType OpenDDS::DCPS::TransportSendStrategy::lock_
private

◆ max_header_size_

size_t OpenDDS::DCPS::TransportSendStrategy::max_header_size_
private

Maximum marshalled size of the transport packet header.

Definition at line 358 of file TransportSendStrategy.h.

Referenced by prepare_packet(), send(), space_available(), start(), and TransportSendStrategy().

◆ max_samples_

size_t OpenDDS::DCPS::TransportSendStrategy::max_samples_
private

Configuration - max number of samples per transport packet.

Definition at line 342 of file TransportSendStrategy.h.

Referenced by add_delayed_notification(), get_packet_elems_from_queue(), send(), and TransportSendStrategy().

◆ max_size_

ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::max_size_
private

Configuration - max transport packet size (bytes)

Definition at line 348 of file TransportSendStrategy.h.

Referenced by send(), space_available(), and TransportSendStrategy().

◆ mode_

Atomic<SendMode> OpenDDS::DCPS::TransportSendStrategy::mode_
private

◆ mode_before_suspend_

SendMode OpenDDS::DCPS::TransportSendStrategy::mode_before_suspend_
private

This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.

Definition at line 395 of file TransportSendStrategy.h.

Referenced by clear(), direct_send(), resume_send(), and suspend_send().

◆ optimum_size_

ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::optimum_size_
private

Configuration - optimum transport packet size (bytes)

Definition at line 345 of file TransportSendStrategy.h.

Referenced by get_packet_elems_from_queue(), send(), and TransportSendStrategy().

◆ pkt_chain_

ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::pkt_chain_
private

Current (head of chain) block containing unsent bytes for the current transport packet.

Definition at line 372 of file TransportSendStrategy.h.

Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), prepare_packet(), resume_send(), send_packet(), and stop().

◆ queue_

QueueType OpenDDS::DCPS::TransportSendStrategy::queue_
private

Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.

Definition at line 355 of file TransportSendStrategy.h.

Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), perform_work(), resume_send(), send(), and stop().

◆ replaced_element_db_allocator_

DataBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_db_allocator_
private

Definition at line 423 of file TransportSendStrategy.h.

Referenced by do_remove_sample().

◆ replaced_element_mb_allocator_

MessageBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_mb_allocator_
private

Cached allocator for TransportReplaceElement.

Definition at line 422 of file TransportSendStrategy.h.

Referenced by do_remove_sample().

◆ send_buffer_

TransportSendBuffer* OpenDDS::DCPS::TransportSendStrategy::send_buffer_
private

Definition at line 431 of file TransportSendStrategy.h.

Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().

◆ start_counter_

unsigned OpenDDS::DCPS::TransportSendStrategy::start_counter_
private

Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accommodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().

Definition at line 387 of file TransportSendStrategy.h.

Referenced by clear(), resume_send(), send_start(), and send_stop().

◆ synch_

unique_ptr<ThreadSynch> OpenDDS::DCPS::TransportSendStrategy::synch_
private

The thread synch object.

Definition at line 415 of file TransportSendStrategy.h.

Referenced by perform_work(), resume_send(), send(), send_stop(), start(), stop(), synch(), and TransportSendStrategy().

◆ transport_

WeakRcHandle<TransportImpl> OpenDDS::DCPS::TransportSendStrategy::transport_
private

Definition at line 425 of file TransportSendStrategy.h.

Referenced by direct_send(), and send_delayed_notifications().

◆ UDP_MAX_MESSAGE_SIZE

const size_t OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE = 65466
static

Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.

Definition at line 159 of file TransportSendStrategy.h.

Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().


The documentation for this class was generated from the following files: