TransportReceiveStrategy_T.h

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #ifndef OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY
00009 #define OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY
00010 
00011 #include "dds/DCPS/dcps_export.h"
00012 #include "ReceivedDataSample.h"
00013 #include "TransportStrategy.h"
00014 #include "TransportDefs.h"
00015 #include "TransportHeader.h"
00016 
00017 #include "ace/INET_Addr.h"
00018 #include "ace/Lock_Adapter_T.h"
00019 #include "ace/Synch_Traits.h"
00020 
00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00022 
00023 namespace OpenDDS {
00024 namespace DCPS {
00025 
00026 /**
00027  * This class provides buffer for data received by transports, de-assemble
00028  * the data to individual samples and deliver them.
00029  */
00030 template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
00031 class TransportReceiveStrategy
00032   : public TransportStrategy {
00033 public:
00034 
00035   virtual ~TransportReceiveStrategy();
00036 
00037   int start();
00038   void stop();
00039 
00040   int handle_dds_input(ACE_HANDLE fd);
00041 
00042   /// The subclass needs to provide the implementation
00043   /// for re-establishing the datalink. This is called
00044   /// when recv returns an error.
00045   virtual void relink(bool do_suspend = true);
00046 
00047   /// Provides access to the received transport header
00048   /// for subclasses.
00049   const TH& received_header() const;
00050   TH& received_header();
00051 
00052   /// Provides access to the received sample header
00053   /// for subclasses.
00054   const DSH& received_sample_header() const;
00055   DSH& received_sample_header();
00056 
00057 protected:
00058   TransportReceiveStrategy();
00059 
00060   /// Only our subclass knows how to do this.
00061   virtual ssize_t receive_bytes(iovec          iov[],
00062                                 int            n,
00063                                 ACE_INET_Addr& remote_address,
00064                                 ACE_HANDLE     fd) = 0;
00065 
00066   /// Check the transport header for suitability.
00067   virtual bool check_header(const TH& header);
00068 
00069   /// Check the data sample header for suitability.
00070   virtual bool check_header(const DSH& header);
00071 
00072   /// Called when there is a ReceivedDataSample to be delivered.
00073   virtual void deliver_sample(ReceivedDataSample&  sample,
00074                               const ACE_INET_Addr& remote_address) = 0;
00075 
00076   /// Let the subclass start.
00077   virtual int start_i() = 0;
00078 
00079   /// Let the subclass stop.
00080   virtual void stop_i() = 0;
00081 
00082   /// Ignore bad PDUs by skipping over them.
00083   int skip_bad_pdus();
00084 
00085   /// For datagram-based derived classes, reset() can be called to clear any
00086   /// state that may be remaining from parsing the previous datagram.
00087   void reset();
00088 
00089   size_t pdu_remaining() const { return this->pdu_remaining_; }
00090 
00091   /// Flag indicates if the GRACEFUL_DISCONNECT message is received.
00092   bool gracefully_disconnected_;
00093 
00094 private:
00095 
00096   /// Manage an index into the receive buffer array.
00097   size_t successor_index(size_t index) const;
00098 
00099   void update_buffer_index(bool& done);
00100 
00101   virtual bool reassemble(ReceivedDataSample& data);
00102 
00103   /// Bytes remaining in the current DataSample.
00104   size_t receive_sample_remaining_;
00105 
00106   /// Current receive TransportHeader.
00107   TH receive_transport_header_;
00108 
00109   //
00110   // The total available space in the receive buffers must have enough to hold
00111   // a max sized message.  The max message is about 64K and the low water for
00112   // a buffer is 4096.  Therefore, 16 receive buffers is appropriate.
00113   //
00114   enum { RECEIVE_BUFFERS  =   16 };
00115   enum { BUFFER_LOW_WATER = 4096 };
00116 
00117   //
00118   // Message Block Allocators are more plentiful since they hold samples
00119   // as well as data read from the handle(s).
00120   //
00121   enum { MESSAGE_BLOCKS   = 1000 };
00122   enum { DATA_BLOCKS      =  100 };
00123 
00124 //MJM: We should probably bring the allocator typedefs down into this
00125 //MJM: class since they are limited to this scope.
00126   TransportMessageBlockAllocator mb_allocator_;
00127   TransportDataBlockAllocator    db_allocator_;
00128   TransportDataAllocator         data_allocator_;
00129 
00130   /// Locking strategy for the allocators.
00131   ACE_Lock_Adapter<ACE_SYNCH_MUTEX> receive_lock_;
00132 
00133   /// Set of receive buffers in use.
00134   ACE_Message_Block* receive_buffers_[RECEIVE_BUFFERS];
00135 
00136   /// Current receive buffer index in use.
00137   size_t buffer_index_;
00138 
00139   /// Current data sample header.
00140   DSH data_sample_header_;
00141 
00142   ACE_Message_Block* payload_;
00143 
00144   /** Flag indicating that the currently resident PDU is a good one
00145     * (i.e. has not been received and processed previously).  This is
00146     * included in case we receive PDUs that were resent for reliability
00147     * reasons and we receive one even if we have already processed it.
00148     * This is a use case from multicast transports.
00149     */
00150   bool good_pdu_;
00151 
00152   /// Amount of the current PDU that has not been processed yet.
00153   size_t pdu_remaining_;
00154 };
00155 
00156 } // namespace DCPS */
00157 } // namespace OpenDDS */
00158 
00159 OPENDDS_END_VERSIONED_NAMESPACE_DECL
00160 
00161 #if defined (__ACE_INLINE__)
00162 #include "TransportReceiveStrategy_T.inl"
00163 #endif /* __ACE_INLINE__ */
00164 
00165 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
00166 #include "TransportReceiveStrategy_T.cpp"
00167 #endif
00168 
00169 #endif /* OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY */
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1