00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY 00009 #define OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "ReceivedDataSample.h" 00013 #include "TransportStrategy.h" 00014 #include "TransportDefs.h" 00015 #include "TransportHeader.h" 00016 00017 #include "ace/INET_Addr.h" 00018 #include "ace/Synch.h" 00019 #include "ace/Lock_Adapter_T.h" 00020 00021 namespace OpenDDS { 00022 namespace DCPS { 00023 00024 /** 00025 * This class provides buffer for data received by transports, de-assemble 00026 * the data to individual samples and deliver them. 00027 */ 00028 template<typename TH = TransportHeader, typename DSH = DataSampleHeader> 00029 class TransportReceiveStrategy 00030 : public TransportStrategy { 00031 public: 00032 00033 virtual ~TransportReceiveStrategy(); 00034 00035 int start(); 00036 void stop(); 00037 00038 int handle_dds_input(ACE_HANDLE fd); 00039 00040 /// The subclass needs to provide the implementation 00041 /// for re-establishing the datalink. This is called 00042 /// when recv returns an error. 00043 virtual void relink(bool do_suspend = true); 00044 00045 /// Provides access to the received transport header 00046 /// for subclasses. 00047 const TH& received_header() const; 00048 TH& received_header(); 00049 00050 /// Provides access to the received sample header 00051 /// for subclasses. 00052 const DSH& received_sample_header() const; 00053 DSH& received_sample_header(); 00054 00055 protected: 00056 TransportReceiveStrategy(); 00057 00058 /// Only our subclass knows how to do this. 00059 virtual ssize_t receive_bytes(iovec iov[], 00060 int n, 00061 ACE_INET_Addr& remote_address, 00062 ACE_HANDLE fd) = 0; 00063 00064 /// Check the transport header for suitability. 00065 virtual bool check_header(const TH& header); 00066 00067 /// Check the data sample header for suitability. 00068 virtual bool check_header(const DSH& header); 00069 00070 /// Called when there is a ReceivedDataSample to be delivered. 00071 virtual void deliver_sample(ReceivedDataSample& sample, 00072 const ACE_INET_Addr& remote_address) = 0; 00073 00074 /// Let the subclass start. 00075 virtual int start_i() = 0; 00076 00077 /// Let the subclass stop. 00078 virtual void stop_i() = 0; 00079 00080 /// Ignore bad PDUs by skipping over them. 00081 int skip_bad_pdus(); 00082 00083 /// For datagram-based derived classes, reset() can be called to clear any 00084 /// state that may be remaining from parsing the previous datagram. 00085 void reset(); 00086 00087 size_t pdu_remaining() const { return this->pdu_remaining_; } 00088 00089 /// Flag indicates if the GRACEFUL_DISCONNECT message is received. 00090 bool gracefully_disconnected_; 00091 00092 private: 00093 00094 /// Manage an index into the receive buffer array. 00095 size_t successor_index(size_t index) const; 00096 00097 void update_buffer_index(bool& done); 00098 00099 virtual bool reassemble(ReceivedDataSample& data); 00100 00101 /// Bytes remaining in the current DataSample. 00102 size_t receive_sample_remaining_; 00103 00104 /// Current receive TransportHeader. 00105 TH receive_transport_header_; 00106 00107 // 00108 // The total available space in the receive buffers must have enough to hold 00109 // a max sized message. The max message is about 64K and the low water for 00110 // a buffer is 4096. Therefore, 16 receive buffers is appropriate. 00111 // 00112 enum { RECEIVE_BUFFERS = 16 }; 00113 enum { BUFFER_LOW_WATER = 4096 }; 00114 00115 // 00116 // Message Block Allocators are more plentiful since they hold samples 00117 // as well as data read from the handle(s). 00118 // 00119 enum { MESSAGE_BLOCKS = 1000 }; 00120 enum { DATA_BLOCKS = 100 }; 00121 00122 //MJM: We should probably bring the allocator typedefs down into this 00123 //MJM: class since they are limited to this scope. 00124 TransportMessageBlockAllocator mb_allocator_; 00125 TransportDataBlockAllocator db_allocator_; 00126 TransportDataAllocator data_allocator_; 00127 00128 /// Locking strategy for the allocators. 00129 ACE_Lock_Adapter<ACE_SYNCH_MUTEX> receive_lock_; 00130 00131 /// Set of receive buffers in use. 00132 ACE_Message_Block* receive_buffers_[RECEIVE_BUFFERS]; 00133 00134 /// Current receive buffer index in use. 00135 size_t buffer_index_; 00136 00137 /// Current data sample header. 00138 DSH data_sample_header_; 00139 00140 ACE_Message_Block* payload_; 00141 00142 /** Flag indicating that the currently resident PDU is a good one 00143 * (i.e. has not been received and processed previously). This is 00144 * included in case we receive PDUs that were resent for reliability 00145 * reasons and we receive one even if we have already processed it. 00146 * This is a use case from multicast transports. 00147 */ 00148 bool good_pdu_; 00149 00150 /// Amount of the current PDU that has not been processed yet. 00151 size_t pdu_remaining_; 00152 }; 00153 00154 } // namespace DCPS */ 00155 } // namespace OpenDDS */ 00156 00157 #if defined (__ACE_INLINE__) 00158 #include "TransportReceiveStrategy_T.inl" 00159 #endif /* __ACE_INLINE__ */ 00160 00161 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE 00162 #include "TransportReceiveStrategy_T.cpp" 00163 #endif 00164 00165 #endif /* OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY */