TransportSendStrategy.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H
00009 #define OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "dds/DCPS/Definitions.h"
00013 #include "dds/DCPS/RcObject.h"
00014 #include "dds/DCPS/PoolAllocator.h"
00015 #include "ThreadSynchWorker.h"
00016 #include "TransportDefs.h"
00017 #include "BasicQueue_T.h"
00018 #include "TransportHeader.h"
00019 #include "TransportReplacedElement.h"
00020 #include "TransportRetainedElement.h"
00021 #include "ThreadSynchStrategy_rch.h"
00022 #include "ace/Synch_Traits.h"
00023 
00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00025 
00026 namespace OpenDDS {
00027 namespace DCPS {
00028 
00029 class ThreadSynch;
00030 class ThreadSynchResource;
00031 class TransportQueueElement;
00032 class TransportSendElement;
00033 class TransportSendBuffer;
00034 class DataSampleElement;
00035 class QueueRemoveVisitor;
00036 class PacketRemoveVisitor;
00037 class TransportImpl;
00038 
00039 /**
00040  * This class provides methods to fill packets with samples for sending
00041  * and handles backpressure. It maintains the list of samples in current
00042  * packets and also the list of samples queued during backpressure. A thread
00043  * per connection is created to handle the queued samples.
00044  *
00045  * Notes for the object ownership:
00046  * 1) Owns ThreadSynch object, list of samples in current packet and list
00047  *    of samples in queue.
00048  */
00049 class OpenDDS_Dcps_Export TransportSendStrategy
00050   : public ThreadSynchWorker {
00051 public:
00052   virtual ~TransportSendStrategy();
00053 
00054   /// Assigns an optional send buffer.
00055   void send_buffer(TransportSendBuffer* send_buffer);
00056 
00057   /// Start the TransportSendStrategy.  This happens once, when
00058   /// the DataLink that "owns" this strategy object has established
00059   /// a connection.
00060   int start();
00061 
00062   /// Stop the TransportSendStrategy.  This happens once, when the
00063   /// DataLink that "owns" this strategy object is going away.
00064   void stop();
00065 
00066   /// Invoked prior to one or more send() invocations from a particular
00067   /// TransportClient.
00068   void send_start();
00069 
00070   /// Our DataLink has been requested by some particular
00071   /// TransportClient to send the element.
00072   void send(TransportQueueElement* element, bool relink = true);
00073 
00074   /// Invoked after one or more send() invocations from a particular
00075   /// TransportClient.
00076   void send_stop(RepoId repoId);
00077 
00078   /// Our DataLink has been requested by some particular
00079   /// TransportClient to remove the supplied sample
00080   /// (basically, an "unsend" attempt) from this strategy object.
00081   RemoveResult remove_sample(const DataSampleElement* sample, void* context);
00082 
00083   void remove_all_msgs(RepoId pub_id);
00084 
00085   /// Called by our ThreadSynch object when we should be able to
00086   /// start sending any partial packet bytes and/or compose a new
00087   /// packet using elements from the queue_.
00088   ///
00089   /// Returns 0 to indicate that the ThreadSynch object doesn't need
00090   /// to call perform_work() again since the queue (and any unsent
00091   /// packet bytes) has been drained, and the mode_ has been switched
00092   /// to MODE_DIRECT.
00093   ///
00094   /// Returns 1 to indicate that there is more work to do, and the
00095   /// ThreadSynch object should have this perform_work() method
00096   /// called again.
00097   virtual WorkOutcome perform_work();
00098 
00099   /// The subclass needs to provide the implementation
00100   /// for re-establishing the datalink. This is called
00101   /// when send returns an error.
00102   virtual void relink(bool do_suspend = true);
00103 
00104   /// This is called when first time reconnect is attempted. The send mode
00105   /// is set to MODE_SUSPEND. Messages are queued at this state.
00106   void suspend_send();
00107 
00108   /// This is called when connection is lost and reconnect succeeds.
00109   /// The send mode is set to the mode before suspend which is either MODE_QUEUE
00110   /// or MODE_DIRECT.
00111   void resume_send();
00112 
00113   /// This is called whenver the connection is lost and reconnect fails.
00114   /// It removes all samples in the backpressure queue and packet queue.
00115   void terminate_send(bool graceful_disconnecting = false);
00116 
00117   // Moved clear() declaration below as Enums can't be foward declared.
00118 
00119   /// Let the subclass stop.
00120   virtual void stop_i() = 0;
00121 
00122   /// Let the subclass start.
00123   virtual bool start_i() { return true; }
00124 
00125   void link_released(bool flag);
00126 
00127   bool isDirectMode();
00128 
00129   typedef BasicQueue<TransportQueueElement> QueueType;
00130 
00131   /// Convert ACE_Message_Block chain into iovec[] entries for send(),
00132   /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS).
00133   /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
00134   static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov);
00135 
00136   // Subclasses which make use of acceptors should override
00137   // this method and return the peer handle.
00138   virtual ACE_HANDLE get_handle();
00139 
00140   void deliver_ack_request(TransportQueueElement* element);
00141 
00142 protected:
00143 
00144   TransportSendStrategy(std::size_t id,
00145                         TransportImpl& transport,
00146                         ThreadSynchResource* synch_resource,
00147                         Priority priority,
00148                         const ThreadSynchStrategy_rch& thread_sync_strategy);
00149 
00150 
00151   // Only our subclass knows how to do this.
00152   // Third arg is the "back-pressure" flag.  If send_bytes() returns
00153   // -1 and the bp == 1, then it isn't really an error - it is
00154   // backpressure.
00155   virtual ssize_t send_bytes(const iovec iov[], int n, int& bp);
00156 
00157   virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp);
00158 
00159   virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0;
00160 
00161   /// Specific implementation processing of prepared packet header.
00162   virtual void prepare_header_i();
00163 
00164   /// Specific implementation processing of prepared packet.
00165   virtual void prepare_packet_i();
00166 
00167   TransportQueueElement* current_packet_first_element() const;
00168 
00169   /// The maximum size of a message allowed by the this TransportImpl, or 0
00170   /// if there is no such limit.  This is expected to be a constant, for example
00171   /// UDP/IPv4 can send messages of up to 65466 bytes.
00172   /// The transport framework will use the returned value (if > 0) to
00173   /// fragment larger messages.  This fragmentation and
00174   /// reassembly will be transparent to the user.
00175   virtual size_t max_message_size() const;
00176 
00177   /// Put the maximum UDP payload size here so that it can be shared by all
00178   /// UDP-based transports.  This is the worst-case (conservative) value for
00179   /// UDP/IPv4.  If there are no IP options, or if IPv6 is used, it could
00180   /// actually be a little larger.
00181   static const size_t UDP_MAX_MESSAGE_SIZE = 65466;
00182 
00183   /// Set graceful disconnecting flag.
00184   void set_graceful_disconnecting(bool flag);
00185 
00186   virtual void add_delayed_notification(TransportQueueElement* element);
00187 
00188   /// If delayed notifications were queued up, issue those callbacks here.
00189   /// The default match is "match all", otherwise match can be used to specify
00190   /// either a certain individual packet or a publication id.
00191   /// Returns true if anything in the delayed notification list matched.
00192   bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0);
00193 
00194 private:
00195 
00196   enum SendPacketOutcome {
00197     OUTCOME_COMPLETE_SEND,
00198     OUTCOME_PARTIAL_SEND,
00199     OUTCOME_BACKPRESSURE,
00200     OUTCOME_PEER_LOST,
00201     OUTCOME_SEND_ERROR
00202   };
00203 
00204   /// Called from send() when it is time to attempt to send our
00205   /// current packet to the socket while in MODE_DIRECT mode_.
00206   /// If backpressure occurs, our current packet will be adjusted
00207   /// to account for bytes that were sent, and the mode will be
00208   /// changed to MODE_QUEUE.
00209   /// If no backpressure occurs (ie, the entire packet is sent), then
00210   /// our current packet will be "reset" to be an empty packet following
00211   /// the send.
00212   void direct_send(bool relink);
00213 
00214   /// This method is used while in MODE_QUEUE mode, and a new packet
00215   /// needs to be formulated using elements from the queue_.  This is
00216   /// the first step of formulating the new packet.  It will extract
00217   /// elements from the queue_ and insert those elements into the
00218   /// pkt_elems_ collection.
00219   ///
00220   /// After this step has been done, the prepare_packet() step can
00221   /// be performed, followed by the actual send_packet() call.
00222   void get_packet_elems_from_queue();
00223 
00224   /// This method is responsible for updating the packet header.
00225   /// Called exclusively by prepare_packet.
00226   void prepare_header();
00227 
00228   /// This method is responsible for actually "creating" the current
00229   /// send packet using the packet header and the collection of
00230   /// packet elements that are to make-up the packet's contents.
00231   void prepare_packet();
00232 
00233   /// This is called to send the current packet.  The current packet
00234   /// will either be a "partially sent" packet, or a packet that has
00235   /// just been prepared via a call to prepare_packet().
00236   SendPacketOutcome send_packet();
00237 
00238   /// Form an IOV and call the send_bytes() template method.
00239   ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp);
00240 
00241 #if defined(OPENDDS_SECURITY)
00242   /// Derived classes can override to transform the data right before it's
00243   /// sent.  If the returned value is non-NULL it will be sent instead of
00244   /// sending the parameter.
00245   virtual ACE_Message_Block* pre_send_packet(const ACE_Message_Block*)
00246   {
00247     return 0;
00248   }
00249 #endif
00250 
00251   /// This is called from the send_packet() method after it has
00252   /// sent at least one byte from the current packet.  This method
00253   /// will update the current packet appropriately, as well as deal
00254   /// with all of the release()'ing of fully sent ACE_Message_Blocks,
00255   /// and the data_delivered() calls on the fully sent elements.
00256   /// Returns 0 if the entire packet was sent, and returns 1 if
00257   /// the entire packet was not sent.
00258   int adjust_packet_after_send(ssize_t num_bytes_sent);
00259 
00260 
00261   /// How much space is available in the current packet before we reach one
00262   /// of the limits: max_message_size() [transport's inherent limitation]
00263   /// or max_size_ [user's configured limit]
00264   size_t space_available() const;
00265 
00266   typedef ACE_SYNCH_MUTEX     LockType;
00267   typedef ACE_Guard<LockType> GuardType;
00268 
00269 public:
00270   enum SendMode {
00271     // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
00272     // we can check if the resume_send is paired with suspend_send.
00273     MODE_NOT_SET,
00274     // Send out the sample with current packet.
00275     MODE_DIRECT,
00276     // The samples need be queued because of the backpressure or partial send.
00277     MODE_QUEUE,
00278     // The samples need be queued because the connection is lost and we are
00279     // trying to reconnect.
00280     MODE_SUSPEND,
00281     // The samples need be dropped since we lost connection and could not
00282     // reconnect.
00283     MODE_TERMINATED
00284   };
00285 
00286   /// Clear queued messages and messages in current packet.
00287   // The API now has a defaulted mode (the default is the same as
00288   // as the earlier hard-coded value).
00289   // Clear locks the local mutex. In certain situations its
00290   // important to set the new mode in the clear itself.
00291   // Since the default is the earlier hard-coded value, this
00292   // shouldn't have any impact.
00293   void clear(SendMode mode = MODE_DIRECT);
00294 
00295   /// Access the current sending mode.
00296   SendMode mode() const;
00297 protected:
00298   /// Implement framework chain visitations to remove a sample.
00299   virtual RemoveResult do_remove_sample(const RepoId& pub_id,
00300     const TransportQueueElement::MatchCriteria& criteria,
00301     void* context);
00302 
00303 private:
00304 
00305   virtual bool marshal_transport_header(ACE_Message_Block* mb);
00306 
00307   /// Helper function to debugging.
00308   static const char* mode_as_str(SendMode mode);
00309 
00310   /// Configuration - max number of samples per transport packet
00311   size_t max_samples_;
00312 
00313   /// Configuration - optimum transport packet size (bytes)
00314   ACE_UINT32 optimum_size_;
00315 
00316   /// Configuration - max transport packet size (bytes)
00317   ACE_UINT32 max_size_;
00318 
00319   /// Used during backpressure situations to hold samples that have
00320   /// not yet been made to be part of a transport packet, and are
00321   /// completely unsent.
00322   /// Also used as a bucket for packets which still have to become
00323   /// part of a packet.
00324   QueueType queue_;
00325 
00326   /// Maximum marshalled size of the transport packet header.
00327   size_t max_header_size_;
00328 
00329   /// Current transport packet header, marshalled.
00330   ACE_Message_Block* header_block_;
00331 
00332   /// Current transport header sequence number.
00333   SequenceNumber header_sequence_;
00334 
00335   /// Current elements that have contributed blocks to the current
00336   /// transport packet.
00337   QueueType elems_;
00338 
00339   /// Current (head of chain) block containing unsent bytes for the
00340   /// current transport packet.
00341   ACE_Message_Block* pkt_chain_;
00342 
00343   /// Set to false when the packet header hasn't been fully sent.
00344   /// Set to true once the packet header has been fully sent.
00345   bool header_complete_;
00346 
00347   /// Counter that, when greater than zero, indicates that we still
00348   /// expect to receive a send_stop() event.
00349   /// Incremented once for each call to our send_start() method,
00350   /// and decremented once for each call to our send_stop() method.
00351   /// We only care about the transitions of the start_counter_
00352   /// value from 0 to 1, and from 1 to 0.  This accomodates the
00353   /// case where more than one TransportClient is sending to
00354   /// us at the same time.  We use this counter to enable a
00355   /// "composite" send_start() and send_stop().
00356   unsigned start_counter_;
00357 
00358   /// This mode determines how send() calls will be handled.
00359   SendMode mode_;
00360 
00361   /// This mode remembers the mode before send is suspended and is
00362   /// used after the send is resumed because the connection is
00363   /// re-established.
00364   SendMode mode_before_suspend_;
00365 
00366   /// Used for delayed notifications when performing work.
00367   typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair;
00368   OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_;
00369 
00370   /// Allocator for header data block.
00371   unique_ptr<TransportMessageBlockAllocator> header_mb_allocator_;
00372 
00373   /// Allocator for header message block.
00374   unique_ptr<TransportDataBlockAllocator> header_db_allocator_;
00375 
00376   /// The thread synch object.
00377   unique_ptr<ThreadSynch> synch_;
00378 
00379   /// This lock will protect critical sections of code that play a
00380   /// role in the sending of data.
00381   LockType lock_;
00382 
00383   /// Cached allocator for TransportReplaceElement.
00384   MessageBlockAllocator replaced_element_mb_allocator_;
00385   DataBlockAllocator replaced_element_db_allocator_;
00386 
00387   TransportImpl& transport_;
00388 
00389   bool graceful_disconnecting_;
00390 
00391   bool link_released_;
00392 
00393   TransportSendBuffer* send_buffer_;
00394 
00395   // N.B. The behavior present in TransortSendBuffer should be
00396   // refactored into the TransportSendStrategy eventually; a good
00397   // amount of private state is shared between both classes.
00398   friend class TransportSendBuffer;
00399 
00400 protected:
00401   ThreadSynch* synch() const;
00402 
00403   /// Current transport packet header.
00404   TransportHeader header_;
00405 };
00406 
00407 } // namespace DCPS
00408 } // namespace OpenDDS
00409 
00410 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00411 
00412 #if defined (__ACE_INLINE__)
00413 #include "TransportSendStrategy.inl"
00414 #endif /* __ACE_INLINE__ */
00415 
00416 #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1