TransportSendStrategy.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H
00009 #define OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject_T.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "ThreadSynchWorker.h"
00016 #include "TransportDefs.h"
00017 #include "BasicQueue_T.h"
00018 #include "TransportHeader.h"
00019 #include "TransportReplacedElement.h"
00020 #include "TransportRetainedElement.h"
00021 #include "TransportInst_rch.h"
00022 #include "ThreadSynchStrategy_rch.h"
00023 
00024 #include "ace/Synch.h"
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class ThreadSynch;
00030 class ThreadSynchResource;
00031 class TransportQueueElement;
00032 class TransportSendElement;
00033 class TransportSendBuffer;
00034 class DataSampleElement;
00035 class QueueRemoveVisitor;
00036 class PacketRemoveVisitor;
00037 
00038 /**
00039  * This class provides methods to fill packets with samples for sending
00040  * and handles backpressure. It maintains the list of samples in current
00041  * packets and also the list of samples queued during backpressure. A thread
00042  * per connection is created to handle the queued samples.
00043  *
00044  * Notes for the object ownership:
00045  * 1) Owns ThreadSynch object, list of samples in current packet and list
00046  *    of samples in queue.
00047  */
00048 class OpenDDS_Dcps_Export TransportSendStrategy
00049   : public RcObject<ACE_SYNCH_MUTEX>,
00050     public ThreadSynchWorker {
00051 public:
00052   virtual ~TransportSendStrategy();
00053 
00054   /// Assigns an optional send buffer.
00055   void send_buffer(TransportSendBuffer* send_buffer);
00056 
00057   /// Start the TransportSendStrategy.  This happens once, when
00058   /// the DataLink that "owns" this strategy object has established
00059   /// a connection.
00060   int start();
00061 
00062   /// Stop the TransportSendStrategy.  This happens once, when the
00063   /// DataLink that "owns" this strategy object is going away.
00064   void stop();
00065 
00066   /// Invoked prior to one or more send() invocations from a particular
00067   /// TransportClient.
00068   void send_start();
00069 
00070   /// Our DataLink has been requested by some particular
00071   /// TransportClient to send the element.
00072   void send(TransportQueueElement* element, bool relink = true);
00073 
00074   /// Invoked after one or more send() invocations from a particular
00075   /// TransportClient.
00076   void send_stop(RepoId repoId);
00077 
00078   /// Our DataLink has been requested by some particular
00079   /// TransportClient to remove the supplied sample
00080   /// (basically, an "unsend" attempt) from this strategy object.
00081   RemoveResult remove_sample(const DataSampleElement* sample);
00082 
00083   void remove_all_msgs(RepoId pub_id);
00084 
00085   /// Called by our ThreadSynch object when we should be able to
00086   /// start sending any partial packet bytes and/or compose a new
00087   /// packet using elements from the queue_.
00088   ///
00089   /// Returns 0 to indicate that the ThreadSynch object doesn't need
00090   /// to call perform_work() again since the queue (and any unsent
00091   /// packet bytes) has been drained, and the mode_ has been switched
00092   /// to MODE_DIRECT.
00093   ///
00094   /// Returns 1 to indicate that there is more work to do, and the
00095   /// ThreadSynch object should have this perform_work() method
00096   /// called again.
00097   virtual WorkOutcome perform_work();
00098 
00099   /// The subclass needs to provide the implementation
00100   /// for re-establishing the datalink. This is called
00101   /// when send returns an error.
00102   virtual void relink(bool do_suspend = true);
00103 
00104   /// This is called when first time reconnect is attempted. The send mode
00105   /// is set to MODE_SUSPEND. Messages are queued at this state.
00106   void suspend_send();
00107 
00108   /// This is called when connection is lost and reconnect succeeds.
00109   /// The send mode is set to the mode before suspend which is either MODE_QUEUE
00110   /// or MODE_DIRECT.
00111   void resume_send();
00112 
00113   /// This is called whenver the connection is lost and reconnect fails.
00114   /// It removes all samples in the backpressure queue and packet queue.
00115   void terminate_send(bool graceful_disconnecting = false);
00116 
00117   // Moved clear() declaration below as Enums can't be foward declared.
00118 
00119   /// Let the subclass stop.
00120   virtual void stop_i() = 0;
00121 
00122   /// Let the subclass start.
00123   virtual bool start_i() { return true; }
00124 
00125   void link_released(bool flag);
00126 
00127   bool isDirectMode();
00128 
00129   /// Informed transport shutdown so no more notifications to
00130   /// listener.
00131   void transport_shutdown();
00132 
00133   typedef BasicQueue<TransportQueueElement> QueueType;
00134 
00135   /// Convert ACE_Message_Block chain into iovec[] entries for send(),
00136   /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS).
00137   /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
00138   static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov);
00139 
00140   // Subclasses which make use of acceptors should override
00141   // this method and return the peer handle.
00142   virtual ACE_HANDLE get_handle();
00143 
00144 protected:
00145 
00146   TransportSendStrategy(std::size_t id,
00147                         const TransportInst_rch& transport_inst,
00148                         ThreadSynchResource* synch_resource,
00149                         Priority priority,
00150                         const ThreadSynchStrategy_rch& thread_sync_strategy);
00151 
00152   // Only our subclass knows how to do this.
00153   // Third arg is the "back-pressure" flag.  If send_bytes() returns
00154   // -1 and the bp == 1, then it isn't really an error - it is
00155   // backpressure.
00156   virtual ssize_t send_bytes(const iovec iov[], int n, int& bp);
00157 
00158   virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp);
00159 
00160   virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0;
00161 
00162   /// Specific implementation processing of prepared packet header.
00163   virtual void prepare_header_i();
00164 
00165   /// Specific implementation processing of prepared packet.
00166   virtual void prepare_packet_i();
00167 
00168   TransportQueueElement* current_packet_first_element() const;
00169 
00170   /// The maximum size of a message allowed by the this TransportImpl, or 0
00171   /// if there is no such limit.  This is expected to be a constant, for example
00172   /// UDP/IPv4 can send messages of up to 65466 bytes.
00173   /// The transport framework will use the returned value (if > 0) to
00174   /// fragment larger messages.  This fragmentation and
00175   /// reassembly will be transparent to the user.
00176   virtual size_t max_message_size() const;
00177 
00178   /// Put the maximum UDP payload size here so that it can be shared by all
00179   /// UDP-based transports.  This is the worst-case (conservative) value for
00180   /// UDP/IPv4.  If there are no IP options, or if IPv6 is used, it could
00181   /// actually be a little larger.
00182   static const size_t UDP_MAX_MESSAGE_SIZE = 65466;
00183 
00184   /// Set graceful disconnecting flag.
00185   void set_graceful_disconnecting(bool flag);
00186 
00187   virtual void add_delayed_notification(TransportQueueElement* element);
00188 
00189 private:
00190 
00191   enum SendPacketOutcome {
00192     OUTCOME_COMPLETE_SEND,
00193     OUTCOME_PARTIAL_SEND,
00194     OUTCOME_BACKPRESSURE,
00195     OUTCOME_PEER_LOST,
00196     OUTCOME_SEND_ERROR
00197   };
00198 
00199   /// Called from send() when it is time to attempt to send our
00200   /// current packet to the socket while in MODE_DIRECT mode_.
00201   /// If backpressure occurs, our current packet will be adjusted
00202   /// to account for bytes that were sent, and the mode will be
00203   /// changed to MODE_QUEUE.
00204   /// If no backpressure occurs (ie, the entire packet is sent), then
00205   /// our current packet will be "reset" to be an empty packet following
00206   /// the send.
00207   void direct_send(bool relink);
00208 
00209   /// This method is used while in MODE_QUEUE mode, and a new packet
00210   /// needs to be formulated using elements from the queue_.  This is
00211   /// the first step of formulating the new packet.  It will extract
00212   /// elements from the queue_ and insert those elements into the
00213   /// pkt_elems_ collection.
00214   ///
00215   /// After this step has been done, the prepare_packet() step can
00216   /// be performed, followed by the actual send_packet() call.
00217   void get_packet_elems_from_queue();
00218 
00219   /// This method is responsible for updating the packet header.
00220   /// Called exclusively by prepare_packet.
00221   void prepare_header();
00222 
00223   /// This method is responsible for actually "creating" the current
00224   /// send packet using the packet header and the collection of
00225   /// packet elements that are to make-up the packet's contents.
00226   void prepare_packet();
00227 
00228   /// This is called to send the current packet.  The current packet
00229   /// will either be a "partially sent" packet, or a packet that has
00230   /// just been prepared via a call to prepare_packet().
00231   SendPacketOutcome send_packet();
00232 
00233   /// Form an IOV and call the send_bytes() template method.
00234   ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp);
00235 
00236   /// This is called from the send_packet() method after it has
00237   /// sent at least one byte from the current packet.  This method
00238   /// will update the current packet appropriately, as well as deal
00239   /// with all of the release()'ing of fully sent ACE_Message_Blocks,
00240   /// and the data_delivered() calls on the fully sent elements.
00241   /// Returns 0 if the entire packet was sent, and returns 1 if
00242   /// the entire packet was not sent.
00243   int adjust_packet_after_send(ssize_t num_bytes_sent);
00244 
00245   /// If delayed notifications were queued up, issue those callbacks here.
00246   /// The default match is "match all", otherwise match can be used to specify
00247   /// either a certain individual packet or a publication id.
00248   /// Returns true if anything in the delayed notification list matched.
00249   bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0);
00250 
00251   /// How much space is available in the current packet before we reach one
00252   /// of the limits: max_message_size() [transport's inherent limitation]
00253   /// or max_size_ [user's configured limit]
00254   size_t space_available() const;
00255 
00256   typedef ACE_SYNCH_MUTEX     LockType;
00257   typedef ACE_Guard<LockType> GuardType;
00258 
00259 public:
00260   enum SendMode {
00261     // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
00262     // we can check if the resume_send is paired with suspend_send.
00263     MODE_NOT_SET,
00264     // Send out the sample with current packet.
00265     MODE_DIRECT,
00266     // The samples need be queued because of the backpressure or partial send.
00267     MODE_QUEUE,
00268     // The samples need be queued because the connection is lost and we are
00269     // trying to reconnect.
00270     MODE_SUSPEND,
00271     // The samples need be dropped since we lost connection and could not
00272     // reconnect.
00273     MODE_TERMINATED
00274   };
00275 
00276   /// Clear queued messages and messages in current packet.
00277   // The API now has a defaulted mode (the default is the same as
00278   // as the earlier hard-coded value).
00279   // Clear locks the local mutex. In certain situations its
00280   // important to set the new mode in the clear itself.
00281   // Since the default is the earlier hard-coded value, this
00282   // shouldn't have any impact.
00283   void clear(SendMode mode = MODE_DIRECT);
00284 
00285   /// Access the current sending mode.
00286   SendMode mode() const;
00287 protected:
00288   /// Implement framework chain visitations to remove a sample.
00289   virtual RemoveResult do_remove_sample(const RepoId& pub_id,
00290     const TransportQueueElement::MatchCriteria& criteria);
00291 
00292 private:
00293 
00294   virtual void marshal_transport_header(ACE_Message_Block* mb);
00295 
00296   /// Helper function to debugging.
00297   static const char* mode_as_str(SendMode mode);
00298 
00299   /// Configuration - max number of samples per transport packet
00300   size_t max_samples_;
00301 
00302   /// Configuration - optimum transport packet size (bytes)
00303   ACE_UINT32 optimum_size_;
00304 
00305   /// Configuration - max transport packet size (bytes)
00306   ACE_UINT32 max_size_;
00307 
00308   /// Used during backpressure situations to hold samples that have
00309   /// not yet been made to be part of a transport packet, and are
00310   /// completely unsent.
00311   /// Also used as a bucket for packets which still have to become
00312   /// part of a packet.
00313   QueueType* queue_;
00314 
00315   /// Maximum marshalled size of the transport packet header.
00316   size_t max_header_size_;
00317 
00318   /// Current transport packet header, marshalled.
00319   ACE_Message_Block* header_block_;
00320 
00321   /// Current transport header sequence number.
00322   SequenceNumber header_sequence_;
00323 
00324   /// Current elements that have contributed blocks to the current
00325   /// transport packet.
00326   QueueType* elems_;
00327 
00328   /// Current (head of chain) block containing unsent bytes for the
00329   /// current transport packet.
00330   ACE_Message_Block* pkt_chain_;
00331 
00332   /// Set to false when the packet header hasn't been fully sent.
00333   /// Set to true once the packet header has been fully sent.
00334   bool header_complete_;
00335 
00336   /// Counter that, when greater than zero, indicates that we still
00337   /// expect to receive a send_stop() event.
00338   /// Incremented once for each call to our send_start() method,
00339   /// and decremented once for each call to our send_stop() method.
00340   /// We only care about the transitions of the start_counter_
00341   /// value from 0 to 1, and from 1 to 0.  This accomodates the
00342   /// case where more than one TransportClient is sending to
00343   /// us at the same time.  We use this counter to enable a
00344   /// "composite" send_start() and send_stop().
00345   unsigned start_counter_;
00346 
00347   /// This mode determines how send() calls will be handled.
00348   SendMode mode_;
00349 
00350   /// This mode remembers the mode before send is suspended and is
00351   /// used after the send is resumed because the connection is
00352   /// re-established.
00353   SendMode mode_before_suspend_;
00354 
00355   /// Used for delayed notifications when performing work.
00356   typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair;
00357   OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_;
00358 
00359   /// Allocator for header data block.
00360   TransportMessageBlockAllocator* header_mb_allocator_;
00361 
00362   /// Allocator for header message block.
00363   TransportDataBlockAllocator* header_db_allocator_;
00364 
00365   /// The thread synch object.
00366   ThreadSynch* synch_;
00367 
00368   /// This lock will protect critical sections of code that play a
00369   /// role in the sending of data.
00370   LockType lock_;
00371 
00372   /// Cached allocator for TransportReplaceElement.
00373   TransportReplacedElementAllocator replaced_element_allocator_;
00374   MessageBlockAllocator replaced_element_mb_allocator_;
00375   DataBlockAllocator replaced_element_db_allocator_;
00376 
00377   /// Cached allocator for TransportRetainedElements used by reliable
00378   /// datagram transports to retain PDUs after they have been sent.  This
00379   /// is created in start if the transport needs it.
00380   TransportRetainedElementAllocator* retained_element_allocator_;
00381 
00382   TransportInst_rch transport_inst_;
00383 
00384   bool graceful_disconnecting_;
00385 
00386   bool link_released_;
00387 
00388   TransportSendBuffer* send_buffer_;
00389 
00390   // N.B. The behavior present in TransortSendBuffer should be
00391   // refactored into the TransportSendStrategy eventually; a good
00392   // amount of private state is shared between both classes.
00393   friend class TransportSendBuffer;
00394 
00395   bool transport_shutdown_;
00396 
00397 protected:
00398   ThreadSynch* synch() const;
00399 
00400   /// Current transport packet header.
00401   TransportHeader header_;
00402 };
00403 
00404 } // namespace DCPS
00405 } // namespace OpenDDS
00406 
00407 #if defined (__ACE_INLINE__)
00408 #include "TransportSendStrategy.inl"
00409 #endif /* __ACE_INLINE__ */
00410 
00411 #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7