00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY 00009 #define OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "ReceivedDataSample.h" 00013 #include "TransportStrategy.h" 00014 #include "TransportDefs.h" 00015 #include "TransportHeader.h" 00016 00017 #include "ace/INET_Addr.h" 00018 #include "ace/Lock_Adapter_T.h" 00019 #include "ace/Synch_Traits.h" 00020 00021 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00022 00023 namespace OpenDDS { 00024 namespace DCPS { 00025 00026 /** 00027 * This class provides buffer for data received by transports, de-assemble 00028 * the data to individual samples and deliver them. 00029 */ 00030 template<typename TH = TransportHeader, typename DSH = DataSampleHeader> 00031 class TransportReceiveStrategy 00032 : public TransportStrategy { 00033 public: 00034 00035 virtual ~TransportReceiveStrategy(); 00036 00037 int start(); 00038 void stop(); 00039 00040 int handle_dds_input(ACE_HANDLE fd); 00041 00042 /// The subclass needs to provide the implementation 00043 /// for re-establishing the datalink. This is called 00044 /// when recv returns an error. 00045 virtual void relink(bool do_suspend = true); 00046 00047 /// Provides access to the received transport header 00048 /// for subclasses. 00049 const TH& received_header() const; 00050 TH& received_header(); 00051 00052 /// Provides access to the received sample header 00053 /// for subclasses. 00054 const DSH& received_sample_header() const; 00055 DSH& received_sample_header(); 00056 00057 protected: 00058 TransportReceiveStrategy(); 00059 00060 /// Only our subclass knows how to do this. 00061 virtual ssize_t receive_bytes(iovec iov[], 00062 int n, 00063 ACE_INET_Addr& remote_address, 00064 ACE_HANDLE fd) = 0; 00065 00066 /// Check the transport header for suitability. 00067 virtual bool check_header(const TH& header); 00068 00069 /// Check the data sample header for suitability. 00070 virtual bool check_header(const DSH& header); 00071 00072 /// Called when there is a ReceivedDataSample to be delivered. 00073 virtual void deliver_sample(ReceivedDataSample& sample, 00074 const ACE_INET_Addr& remote_address) = 0; 00075 00076 /// Let the subclass start. 00077 virtual int start_i() = 0; 00078 00079 /// Let the subclass stop. 00080 virtual void stop_i() = 0; 00081 00082 /// Ignore bad PDUs by skipping over them. 00083 int skip_bad_pdus(); 00084 00085 /// For datagram-based derived classes, reset() can be called to clear any 00086 /// state that may be remaining from parsing the previous datagram. 00087 void reset(); 00088 00089 size_t pdu_remaining() const { return this->pdu_remaining_; } 00090 00091 /// Flag indicates if the GRACEFUL_DISCONNECT message is received. 00092 bool gracefully_disconnected_; 00093 00094 private: 00095 00096 /// Manage an index into the receive buffer array. 00097 size_t successor_index(size_t index) const; 00098 00099 void update_buffer_index(bool& done); 00100 00101 virtual bool reassemble(ReceivedDataSample& data); 00102 00103 /// Bytes remaining in the current DataSample. 00104 size_t receive_sample_remaining_; 00105 00106 /// Current receive TransportHeader. 00107 TH receive_transport_header_; 00108 00109 // 00110 // The total available space in the receive buffers must have enough to hold 00111 // a max sized message. The max message is about 64K and the low water for 00112 // a buffer is 4096. Therefore, 16 receive buffers is appropriate. 00113 // 00114 enum { RECEIVE_BUFFERS = 16 }; 00115 enum { BUFFER_LOW_WATER = 4096 }; 00116 00117 // 00118 // Message Block Allocators are more plentiful since they hold samples 00119 // as well as data read from the handle(s). 00120 // 00121 enum { MESSAGE_BLOCKS = 1000 }; 00122 enum { DATA_BLOCKS = 100 }; 00123 00124 //MJM: We should probably bring the allocator typedefs down into this 00125 //MJM: class since they are limited to this scope. 00126 TransportMessageBlockAllocator mb_allocator_; 00127 TransportDataBlockAllocator db_allocator_; 00128 TransportDataAllocator data_allocator_; 00129 00130 /// Locking strategy for the allocators. 00131 ACE_Lock_Adapter<ACE_SYNCH_MUTEX> receive_lock_; 00132 00133 /// Set of receive buffers in use. 00134 ACE_Message_Block* receive_buffers_[RECEIVE_BUFFERS]; 00135 00136 /// Current receive buffer index in use. 00137 size_t buffer_index_; 00138 00139 /// Current data sample header. 00140 DSH data_sample_header_; 00141 00142 ACE_Message_Block* payload_; 00143 00144 /** Flag indicating that the currently resident PDU is a good one 00145 * (i.e. has not been received and processed previously). This is 00146 * included in case we receive PDUs that were resent for reliability 00147 * reasons and we receive one even if we have already processed it. 00148 * This is a use case from multicast transports. 00149 */ 00150 bool good_pdu_; 00151 00152 /// Amount of the current PDU that has not been processed yet. 00153 size_t pdu_remaining_; 00154 }; 00155 00156 } // namespace DCPS */ 00157 } // namespace OpenDDS */ 00158 00159 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00160 00161 #if defined (__ACE_INLINE__) 00162 #include "TransportReceiveStrategy_T.inl" 00163 #endif /* __ACE_INLINE__ */ 00164 00165 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE 00166 #include "TransportReceiveStrategy_T.cpp" 00167 #endif 00168 00169 #endif /* OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY */