TransportReceiveStrategy_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY
00009 #define OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "ReceivedDataSample.h"
00013 #include "TransportStrategy.h"
00014 #include "TransportDefs.h"
00015 #include "TransportHeader.h"
00016 
00017 #include "ace/INET_Addr.h"
00018 #include "ace/Synch.h"
00019 #include "ace/Lock_Adapter_T.h"
00020 
00021 namespace OpenDDS {
00022 namespace DCPS {
00023 
00024 /**
00025  * This class provides buffer for data received by transports, de-assemble
00026  * the data to individual samples and deliver them.
00027  */
00028 template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
00029 class TransportReceiveStrategy
00030   : public TransportStrategy {
00031 public:
00032 
00033   virtual ~TransportReceiveStrategy();
00034 
00035   int start();
00036   void stop();
00037 
00038   int handle_dds_input(ACE_HANDLE fd);
00039 
00040   /// The subclass needs to provide the implementation
00041   /// for re-establishing the datalink. This is called
00042   /// when recv returns an error.
00043   virtual void relink(bool do_suspend = true);
00044 
00045   /// Provides access to the received transport header
00046   /// for subclasses.
00047   const TH& received_header() const;
00048   TH& received_header();
00049 
00050   /// Provides access to the received sample header
00051   /// for subclasses.
00052   const DSH& received_sample_header() const;
00053   DSH& received_sample_header();
00054 
00055 protected:
00056   TransportReceiveStrategy();
00057 
00058   /// Only our subclass knows how to do this.
00059   virtual ssize_t receive_bytes(iovec          iov[],
00060                                 int            n,
00061                                 ACE_INET_Addr& remote_address,
00062                                 ACE_HANDLE     fd) = 0;
00063 
00064   /// Check the transport header for suitability.
00065   virtual bool check_header(const TH& header);
00066 
00067   /// Check the data sample header for suitability.
00068   virtual bool check_header(const DSH& header);
00069 
00070   /// Called when there is a ReceivedDataSample to be delivered.
00071   virtual void deliver_sample(ReceivedDataSample&  sample,
00072                               const ACE_INET_Addr& remote_address) = 0;
00073 
00074   /// Let the subclass start.
00075   virtual int start_i() = 0;
00076 
00077   /// Let the subclass stop.
00078   virtual void stop_i() = 0;
00079 
00080   /// Ignore bad PDUs by skipping over them.
00081   int skip_bad_pdus();
00082 
00083   /// For datagram-based derived classes, reset() can be called to clear any
00084   /// state that may be remaining from parsing the previous datagram.
00085   void reset();
00086 
00087   size_t pdu_remaining() const { return this->pdu_remaining_; }
00088 
00089   /// Flag indicates if the GRACEFUL_DISCONNECT message is received.
00090   bool gracefully_disconnected_;
00091 
00092 private:
00093 
00094   /// Manage an index into the receive buffer array.
00095   size_t successor_index(size_t index) const;
00096 
00097   void update_buffer_index(bool& done);
00098 
00099   virtual bool reassemble(ReceivedDataSample& data);
00100 
00101   /// Bytes remaining in the current DataSample.
00102   size_t receive_sample_remaining_;
00103 
00104   /// Current receive TransportHeader.
00105   TH receive_transport_header_;
00106 
00107   //
00108   // The total available space in the receive buffers must have enough to hold
00109   // a max sized message.  The max message is about 64K and the low water for
00110   // a buffer is 4096.  Therefore, 16 receive buffers is appropriate.
00111   //
00112   enum { RECEIVE_BUFFERS  =   16 };
00113   enum { BUFFER_LOW_WATER = 4096 };
00114 
00115   //
00116   // Message Block Allocators are more plentiful since they hold samples
00117   // as well as data read from the handle(s).
00118   //
00119   enum { MESSAGE_BLOCKS   = 1000 };
00120   enum { DATA_BLOCKS      =  100 };
00121 
00122 //MJM: We should probably bring the allocator typedefs down into this
00123 //MJM: class since they are limited to this scope.
00124   TransportMessageBlockAllocator mb_allocator_;
00125   TransportDataBlockAllocator    db_allocator_;
00126   TransportDataAllocator         data_allocator_;
00127 
00128   /// Locking strategy for the allocators.
00129   ACE_Lock_Adapter<ACE_SYNCH_MUTEX> receive_lock_;
00130 
00131   /// Set of receive buffers in use.
00132   ACE_Message_Block* receive_buffers_[RECEIVE_BUFFERS];
00133 
00134   /// Current receive buffer index in use.
00135   size_t buffer_index_;
00136 
00137   /// Current data sample header.
00138   DSH data_sample_header_;
00139 
00140   ACE_Message_Block* payload_;
00141 
00142   /** Flag indicating that the currently resident PDU is a good one
00143     * (i.e. has not been received and processed previously).  This is
00144     * included in case we receive PDUs that were resent for reliability
00145     * reasons and we receive one even if we have already processed it.
00146     * This is a use case from multicast transports.
00147     */
00148   bool good_pdu_;
00149 
00150   /// Amount of the current PDU that has not been processed yet.
00151   size_t pdu_remaining_;
00152 };
00153 
00154 } // namespace DCPS */
00155 } // namespace OpenDDS */
00156 
00157 #if defined (__ACE_INLINE__)
00158 #include "TransportReceiveStrategy_T.inl"
00159 #endif /* __ACE_INLINE__ */
00160 
00161 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00162 #include "TransportReceiveStrategy_T.cpp"
00163 #endif
00164 
00165 #endif /* OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY */

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7