LCOV - code coverage report
Current view: top level - DCPS/transport/framework - TransportSendStrategy.h (source / functions) Hit Total Coverage
Test: coverage.info Lines: 0 4 0.0 %
Date: 2023-04-30 01:32:43 Functions: 0 3 0.0 %

          Line data    Source code
       1             : /*
       2             :  *
       3             :  *
       4             :  * Distributed under the OpenDDS License.
       5             :  * See: http://www.opendds.org/license.html
       6             :  */
       7             : 
       8             : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
       9             : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
      10             : 
      11             : #include "BasicQueue_T.h"
      12             : #include "ThreadSynchStrategy_rch.h"
      13             : #include "ThreadSynchWorker.h"
      14             : #include "TransportDefs.h"
      15             : #include "TransportImpl_rch.h"
      16             : #include "TransportHeader.h"
      17             : #include "TransportReplacedElement.h"
      18             : #include "TransportRetainedElement.h"
      19             : 
      20             : #include <dds/DCPS/dcps_export.h>
      21             : #include "dds/DCPS/Atomic.h"
      22             : #include <dds/DCPS/DataBlockLockPool.h>
      23             : #include <dds/DCPS/Definitions.h>
      24             : #include <dds/DCPS/Dynamic_Cached_Allocator_With_Overflow_T.h>
      25             : #include <dds/DCPS/PoolAllocator.h>
      26             : #include <dds/DCPS/RcObject.h>
      27             : 
      28             : #if defined(OPENDDS_SECURITY)
      29             : #include <dds/DdsSecurityCoreC.h>
      30             : #include <dds/DCPS/security/framework/SecurityConfig.h>
      31             : #include <dds/DCPS/security/framework/SecurityConfig_rch.h>
      32             : #endif
      33             : 
      34             : #include <ace/Synch_Traits.h>
      35             : 
      36             : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
      37             : 
      38             : namespace OpenDDS {
      39             : namespace DCPS {
      40             : 
      41             : class ThreadSynch;
      42             : class ThreadSynchResource;
      43             : class TransportQueueElement;
      44             : class TransportSendElement;
      45             : class TransportSendBuffer;
      46             : class DataSampleElement;
      47             : class QueueRemoveVisitor;
      48             : class PacketRemoveVisitor;
      49             : class TransportImpl;
      50             : 
      51             : /**
      52             :  * This class provides methods to fill packets with samples for sending
      53             :  * and handles backpressure. It maintains the list of samples in current
      54             :  * packets and also the list of samples queued during backpressure. A thread
      55             :  * per connection is created to handle the queued samples.
      56             :  *
      57             :  * Notes for the object ownership:
      58             :  * 1) Owns ThreadSynch object, list of samples in current packet and list
      59             :  *    of samples in queue.
      60             :  */
      61             : class OpenDDS_Dcps_Export TransportSendStrategy
      62             :   : public ThreadSynchWorker {
      63             : public:
      64             :   virtual ~TransportSendStrategy();
      65             : 
      66             :   /// Assigns an optional send buffer.
      67             :   void send_buffer(TransportSendBuffer* send_buffer);
      68             : 
      69             :   /// Start the TransportSendStrategy.  This happens once, when
      70             :   /// the DataLink that "owns" this strategy object has established
      71             :   /// a connection.
      72             :   int start();
      73             : 
      74             :   /// Stop the TransportSendStrategy.  This happens once, when the
      75             :   /// DataLink that "owns" this strategy object is going away.
      76             :   void stop();
      77             : 
      78             :   /// Invoked prior to one or more send() invocations from a particular
      79             :   /// TransportClient.
      80             :   void send_start();
      81             : 
      82             :   /// Our DataLink has been requested by some particular
      83             :   /// TransportClient to send the element.
      84             :   void send(TransportQueueElement* element, bool relink = true);
      85             : 
      86             :   /// Invoked after one or more send() invocations from a particular
      87             :   /// TransportClient.
      88             :   void send_stop(GUID_t repoId);
      89             : 
      90             :   /// Our DataLink has been requested by some particular
      91             :   /// TransportClient to remove the supplied sample
      92             :   /// (basically, an "unsend" attempt) from this strategy object.
      93             :   RemoveResult remove_sample(const DataSampleElement* sample);
      94             : 
      95             :   void remove_all_msgs(const GUID_t& pub_id);
      96             : 
      97             :   /// Called by our ThreadSynch object when we should be able to
      98             :   /// start sending any partial packet bytes and/or compose a new
      99             :   /// packet using elements from the queue_.
     100             :   ///
     101             :   /// Returns 0 to indicate that the ThreadSynch object doesn't need
     102             :   /// to call perform_work() again since the queue (and any unsent
     103             :   /// packet bytes) has been drained, and the mode_ has been switched
     104             :   /// to MODE_DIRECT.
     105             :   ///
     106             :   /// Returns 1 to indicate that there is more work to do, and the
     107             :   /// ThreadSynch object should have this perform_work() method
     108             :   /// called again.
     109             :   virtual WorkOutcome perform_work();
     110             : 
     111             :   /// The subclass needs to provide the implementation
     112             :   /// for re-establishing the datalink. This is called
     113             :   /// when send returns an error.
     114             :   virtual void relink(bool do_suspend = true);
     115             : 
     116             :   /// This is called when first time reconnect is attempted. The send mode
     117             :   /// is set to MODE_SUSPEND. Messages are queued at this state.
     118             :   void suspend_send();
     119             : 
     120             :   /// This is called when connection is lost and reconnect succeeds.
     121             :   /// The send mode is set to the mode before suspend which is either MODE_QUEUE
     122             :   /// or MODE_DIRECT.
     123             :   void resume_send();
     124             : 
     125             :   /// This is called whenver the connection is lost and reconnect fails.
     126             :   /// It removes all samples in the backpressure queue and packet queue.
     127             :   void terminate_send(bool graceful_disconnecting = false);
     128             :   virtual void terminate_send_if_suspended();
     129             : 
     130             :   // Moved clear() declaration below as Enums can't be forward declared.
     131             : 
     132             :   /// Let the subclass stop.
     133             :   virtual void stop_i() = 0;
     134             : 
     135             :   /// Let the subclass start.
     136           0 :   virtual bool start_i() { return true; }
     137             : 
     138             :   void link_released(bool flag);
     139             : 
     140             :   bool isDirectMode();
     141             : 
     142             :   typedef BasicQueue<TransportQueueElement> QueueType;
     143             : 
     144             :   /// Convert ACE_Message_Block chain into iovec[] entries for send(),
     145             :   /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS).
     146             :   /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
     147             :   static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov);
     148             : 
     149             :   // Subclasses which make use of acceptors should override
     150             :   // this method and return the peer handle.
     151             :   virtual ACE_HANDLE get_handle();
     152             : 
     153             :   void deliver_ack_request(TransportQueueElement* element);
     154             : 
     155             :   /// Put the maximum UDP payload size here so that it can be shared by all
     156             :   /// UDP-based transports.  This is the worst-case (conservative) value for
     157             :   /// UDP/IPv4.  If there are no IP options, or if IPv6 is used, it could
     158             :   /// actually be a little larger.
     159             :   static const size_t UDP_MAX_MESSAGE_SIZE = 65466;
     160             : 
     161             :   /// Alternative to TransportSendStrategy::send for fragmentation
     162             :   ///
     163             :   /// @param original_element data sample to send, may be larger than max msg size
     164             :   /// @param elements_to_send populated by this method with either original_element
     165             :   ///                         or fragments created from it.  Elements need to be
     166             :   ///                         cleaned up by the caller using data_delivered or
     167             :   ///                         data_dropped.
     168             :   /// @return operation succeeded
     169             :   bool fragmentation_helper(
     170             :     TransportQueueElement* original_element, TqeVector& elements_to_send);
     171             : 
     172             : protected:
     173             : 
     174             :   TransportSendStrategy(std::size_t id,
     175             :                         const TransportImpl_rch& transport,
     176             :                         ThreadSynchResource* synch_resource,
     177             :                         Priority priority,
     178             :                         const ThreadSynchStrategy_rch& thread_sync_strategy);
     179             : 
     180             : 
     181             :   // Only our subclass knows how to do this.
     182             :   // Third arg is the "back-pressure" flag.  If send_bytes() returns
     183             :   // -1 and the bp == 1, then it isn't really an error - it is
     184             :   // backpressure.
     185             :   virtual ssize_t send_bytes(const iovec iov[], int n, int& bp);
     186             : 
     187             :   virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp);
     188             : 
     189             :   virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0;
     190             : 
     191             :   /// Specific implementation processing of prepared packet header.
     192             :   virtual void prepare_header_i();
     193             : 
     194             :   /// Specific implementation processing of prepared packet.
     195             :   virtual void prepare_packet_i();
     196             : 
     197             :   TransportQueueElement* current_packet_first_element() const;
     198             : 
     199             :   /// The maximum size of a message allowed by the this TransportImpl, or 0
     200             :   /// if there is no such limit.  This is expected to be a constant, for example
     201             :   /// UDP/IPv4 can send messages of up to 65466 bytes.
     202             :   /// The transport framework will use the returned value (if > 0) to
     203             :   /// fragment larger messages.  This fragmentation and
     204             :   /// reassembly will be transparent to the user.
     205             :   virtual size_t max_message_size() const;
     206             : 
     207             :   /// Set graceful disconnecting flag.
     208             :   void set_graceful_disconnecting(bool flag);
     209             : 
     210             :   virtual void add_delayed_notification(TransportQueueElement* element);
     211             : 
     212             :   /// If delayed notifications were queued up, issue those callbacks here.
     213             :   /// The default match is "match all", otherwise match can be used to specify
     214             :   /// either a certain individual packet or a publication id.
     215             :   /// Returns true if anything in the delayed notification list matched.
     216             :   bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0);
     217             : 
     218             : #ifdef OPENDDS_SECURITY
     219           0 :   virtual Security::SecurityConfig_rch security_config() const { return Security::SecurityConfig_rch(); }
     220             : #endif
     221             : 
     222             : private:
     223             : 
     224             :   enum SendPacketOutcome {
     225             :     OUTCOME_COMPLETE_SEND,
     226             :     OUTCOME_PARTIAL_SEND,
     227             :     OUTCOME_BACKPRESSURE,
     228             :     OUTCOME_PEER_LOST,
     229             :     OUTCOME_SEND_ERROR
     230             :   };
     231             : 
     232             :   /// Called from send() when it is time to attempt to send our
     233             :   /// current packet to the socket while in MODE_DIRECT mode_.
     234             :   /// If backpressure occurs, our current packet will be adjusted
     235             :   /// to account for bytes that were sent, and the mode will be
     236             :   /// changed to MODE_QUEUE.
     237             :   /// If no backpressure occurs (ie, the entire packet is sent), then
     238             :   /// our current packet will be "reset" to be an empty packet following
     239             :   /// the send.
     240             :   void direct_send(bool relink);
     241             : 
     242             :   /// This method is used while in MODE_QUEUE mode, and a new packet
     243             :   /// needs to be formulated using elements from the queue_.  This is
     244             :   /// the first step of formulating the new packet.  It will extract
     245             :   /// elements from the queue_ and insert those elements into the
     246             :   /// pkt_elems_ collection.
     247             :   ///
     248             :   /// After this step has been done, the prepare_packet() step can
     249             :   /// be performed, followed by the actual send_packet() call.
     250             :   void get_packet_elems_from_queue();
     251             : 
     252             :   /// This method is responsible for updating the packet header.
     253             :   /// Called exclusively by prepare_packet.
     254             :   void prepare_header();
     255             : 
     256             :   /// This method is responsible for actually "creating" the current
     257             :   /// send packet using the packet header and the collection of
     258             :   /// packet elements that are to make-up the packet's contents.
     259             :   void prepare_packet();
     260             : 
     261             :   /// This is called to send the current packet.  The current packet
     262             :   /// will either be a "partially sent" packet, or a packet that has
     263             :   /// just been prepared via a call to prepare_packet().
     264             :   SendPacketOutcome send_packet();
     265             : 
     266             :   /// Form an IOV and call the send_bytes() template method.
     267             :   ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp);
     268             : 
     269             : #ifdef OPENDDS_SECURITY
     270             :   /// Derived classes can override to transform the data right before it's
     271             :   /// sent.  If the returned value is non-NULL it will be sent instead of
     272             :   /// sending the parameter.  If the returned value is NULL the original
     273             :   /// message will be dropped.
     274           0 :   virtual ACE_Message_Block* pre_send_packet(const ACE_Message_Block* m)
     275             :   {
     276           0 :     return m->duplicate();
     277             :   }
     278             : #endif
     279             : 
     280             :   /// This is called from the send_packet() method after it has
     281             :   /// sent at least one byte from the current packet.  This method
     282             :   /// will update the current packet appropriately, as well as deal
     283             :   /// with all of the release()'ing of fully sent ACE_Message_Blocks,
     284             :   /// and the data_delivered() calls on the fully sent elements.
     285             :   /// Returns 0 if the entire packet was sent, and returns 1 if
     286             :   /// the entire packet was not sent.
     287             :   int adjust_packet_after_send(ssize_t num_bytes_sent);
     288             : 
     289             :   /**
     290             :    * How much space is available in packet with a given used space before we
     291             :    * reach one of the limits: max_message_size() [transport's inherent
     292             :    * limitation] or max_size_ [user's configured limit]
     293             :    */
     294             :   size_t space_available(size_t already_used = 0) const;
     295             : 
     296             :   /**
     297             :    * Like above, but use the current packet.
     298             :    */
     299             :   size_t current_space_available() const;
     300             : 
     301             :   typedef ACE_SYNCH_MUTEX     LockType;
     302             :   typedef ACE_Guard<LockType> GuardType;
     303             : 
     304             : public:
     305             :   enum SendMode {
     306             :     // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
     307             :     // we can check if the resume_send is paired with suspend_send.
     308             :     MODE_NOT_SET,
     309             :     // Send out the sample with current packet.
     310             :     MODE_DIRECT,
     311             :     // The samples need be queued because of the backpressure or partial send.
     312             :     MODE_QUEUE,
     313             :     // The samples need be queued because the connection is lost and we are
     314             :     // trying to reconnect.
     315             :     MODE_SUSPEND,
     316             :     // The samples need be dropped since we lost connection and could not
     317             :     // reconnect.
     318             :     MODE_TERMINATED
     319             :   };
     320             : 
     321             :   /// Clear queued messages and messages in current packet and set the
     322             :   /// current mode to new_mod if the current mode equals old_mode or
     323             :   /// old_mode is MODE_NOT_SET.
     324             : 
     325             :   void clear(SendMode new_mode, SendMode old_mode = MODE_NOT_SET);
     326             : 
     327             :   /// Access the current sending mode.
     328             :   SendMode mode() const;
     329             : protected:
     330             :   /// Implement framework chain visitations to remove a sample.
     331             :   virtual RemoveResult do_remove_sample(const GUID_t& pub_id,
     332             :     const TransportQueueElement::MatchCriteria& criteria, bool remove_all = false);
     333             : 
     334             : private:
     335             : 
     336             :   virtual bool marshal_transport_header(ACE_Message_Block* mb);
     337             : 
     338             :   /// Helper function to debugging.
     339             :   static const char* mode_as_str(SendMode mode);
     340             : 
     341             :   /// Configuration - max number of samples per transport packet
     342             :   size_t max_samples_;
     343             : 
     344             :   /// Configuration - optimum transport packet size (bytes)
     345             :   ACE_UINT32 optimum_size_;
     346             : 
     347             :   /// Configuration - max transport packet size (bytes)
     348             :   ACE_UINT32 max_size_;
     349             : 
     350             :   /// Used during backpressure situations to hold samples that have
     351             :   /// not yet been made to be part of a transport packet, and are
     352             :   /// completely unsent.
     353             :   /// Also used as a bucket for packets which still have to become
     354             :   /// part of a packet.
     355             :   QueueType queue_;
     356             : 
     357             :   /// Maximum marshalled size of the transport packet header.
     358             :   size_t max_header_size_;
     359             : 
     360             :   /// Current transport packet header, marshalled.
     361             :   ACE_Message_Block* header_block_;
     362             : 
     363             :   /// Current transport header sequence number.
     364             :   SequenceNumber header_sequence_;
     365             : 
     366             :   /// Current elements that have contributed blocks to the current
     367             :   /// transport packet.
     368             :   QueueType elems_;
     369             : 
     370             :   /// Current (head of chain) block containing unsent bytes for the
     371             :   /// current transport packet.
     372             :   ACE_Message_Block* pkt_chain_;
     373             : 
     374             :   /// Set to false when the packet header hasn't been fully sent.
     375             :   /// Set to true once the packet header has been fully sent.
     376             :   bool header_complete_;
     377             : 
     378             :   /// Counter that, when greater than zero, indicates that we still
     379             :   /// expect to receive a send_stop() event.
     380             :   /// Incremented once for each call to our send_start() method,
     381             :   /// and decremented once for each call to our send_stop() method.
     382             :   /// We only care about the transitions of the start_counter_
     383             :   /// value from 0 to 1, and from 1 to 0.  This accommodates the
     384             :   /// case where more than one TransportClient is sending to
     385             :   /// us at the same time.  We use this counter to enable a
     386             :   /// "composite" send_start() and send_stop().
     387             :   unsigned start_counter_;
     388             : 
     389             :   /// This mode determines how send() calls will be handled.
     390             :   Atomic<SendMode> mode_;
     391             : 
     392             :   /// This mode remembers the mode before send is suspended and is
     393             :   /// used after the send is resumed because the connection is
     394             :   /// re-established.
     395             :   SendMode mode_before_suspend_;
     396             : 
     397             :   /// Used for delayed notifications when performing work.
     398             :   typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair;
     399             :   OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_;
     400             : 
     401             :   /// Allocator for header data block.
     402             :   unique_ptr<TransportMessageBlockAllocator> header_mb_allocator_;
     403             : 
     404             :   /// Allocator for header message block.
     405             :   unique_ptr<TransportDataBlockAllocator> header_db_allocator_;
     406             : 
     407             :   /// DataBlockLockPool
     408             :   unique_ptr<DataBlockLockPool> header_db_lock_pool_;
     409             : 
     410             :   /// Allocator for data buffers.
     411             :   typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
     412             :   unique_ptr<DataAllocator> header_data_allocator_;
     413             : 
     414             :   /// The thread synch object.
     415             :   unique_ptr<ThreadSynch> synch_;
     416             : 
     417             :   /// This lock will protect critical sections of code that play a
     418             :   /// role in the sending of data.
     419             :   LockType lock_;
     420             : 
     421             :   /// Cached allocator for TransportReplaceElement.
     422             :   MessageBlockAllocator replaced_element_mb_allocator_;
     423             :   DataBlockAllocator replaced_element_db_allocator_;
     424             : 
     425             :   WeakRcHandle<TransportImpl> transport_;
     426             : 
     427             :   bool graceful_disconnecting_;
     428             : 
     429             :   bool link_released_;
     430             : 
     431             :   TransportSendBuffer* send_buffer_;
     432             : 
     433             :   // N.B. The behavior present in TransortSendBuffer should be
     434             :   // refactored into the TransportSendStrategy eventually; a good
     435             :   // amount of private state is shared between both classes.
     436             :   friend class TransportSendBuffer;
     437             : 
     438             :   /// Current transport packet header.
     439             :   TransportHeader header_;
     440             : 
     441             : protected:
     442             :   ThreadSynch* synch() const;
     443             : 
     444             :   void set_header_source(ACE_INT64 source);
     445             : };
     446             : 
     447             : } // namespace DCPS
     448             : } // namespace OpenDDS
     449             : 
     450             : OPENDDS_END_VERSIONED_NAMESPACE_DECL
     451             : 
     452             : #if defined (__ACE_INLINE__)
     453             : #include "TransportSendStrategy.inl"
     454             : #endif /* __ACE_INLINE__ */
     455             : 
     456             : #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */

Generated by: LCOV version 1.16