OpenDDS  Snapshot(2023/04/28-20:55)
TransportReceiveStrategy_T.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTRECEIVESTRATEGY_T_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTRECEIVESTRATEGY_T_H
10 
11 #include "dds/DCPS/dcps_export.h"
12 #include "ReceivedDataSample.h"
13 #include "TransportStrategy.h"
14 #include "TransportDefs.h"
15 #include "TransportHeader.h"
16 #include "TransportInst_rch.h"
17 
18 #include "ace/INET_Addr.h"
19 #include "ace/Lock_Adapter_T.h"
20 #include "ace/Synch_Traits.h"
21 
23 
24 namespace OpenDDS {
25 namespace DCPS {
26 
27 struct OpenDDS_Dcps_Export TransportReceiveConstants { // non-template base for constants only
28  //
29  // The total available space in the receive buffers must have enough to hold
30  // a max sized message. The max message is about 64K and the low water for
31  // a buffer is 4096. Therefore, 16 receive buffers is appropriate.
32  //
33  static const size_t RECEIVE_BUFFERS = DEFAULT_TRANSPORT_RECEIVE_BUFFERS;
34  static const size_t BUFFER_LOW_WATER = 4096;
35 
36  //
37  // Message Block Allocators are more plentiful since they hold samples
38  // as well as data read from the handle(s).
39  //
40  static const size_t MESSAGE_BLOCKS = 1000;
41  static const size_t DATA_BLOCKS = 100;
42 };
43 
44 
45 /**
46  * This class provides buffer for data received by transports, de-assemble
47  * the data to individual samples and deliver them.
48  */
49 template<typename TH = TransportHeader, typename DSH = DataSampleHeader>
52 public:
53 
54  virtual ~TransportReceiveStrategy();
55 
56  int start();
57  void stop();
58 
59  int handle_dds_input(ACE_HANDLE fd);
60 
61  /// The subclass needs to provide the implementation
62  /// for re-establishing the datalink. This is called
63  /// when recv returns an error.
64  virtual void relink(bool do_suspend = true);
65 
66  /// Provides access to the received transport header
67  /// for subclasses.
68  const TH& received_header() const;
69  TH& received_header();
70 
71  /// Provides access to the received sample header
72  /// for subclasses.
73  const DSH& received_sample_header() const;
74  DSH& received_sample_header();
75 
76  /// Use the receive strategy's Message Block Allocator to convert
77  /// the ReceivedDataSample's payload to an ACE_Message_Block chain
78  ACE_Message_Block* to_msgblock(const ReceivedDataSample& sample);
79 
80 protected:
81  explicit TransportReceiveStrategy(const TransportInst_rch& config,
82  size_t receive_buffers_count = RECEIVE_BUFFERS);
83 
84  /// Only our subclass knows how to do this.
85  virtual ssize_t receive_bytes(iovec iov[],
86  int n,
87  ACE_INET_Addr& remote_address,
88  ACE_HANDLE fd,
89  bool& stop) = 0;
90 
91  /// Check the transport header for suitability.
92  virtual bool check_header(const TH& header);
93 
94  /// Check the data sample header for suitability.
95  virtual bool check_header(const DSH& header);
96 
97  /// Begin Current Transport Header Processing
99 
100  /// End Current Transport Header Processing
102 
104  public:
105  explicit ScopedHeaderProcessing(TransportReceiveStrategy& trs) : trs_(trs) { trs_.begin_transport_header_processing(); }
106  ~ScopedHeaderProcessing() { trs_.end_transport_header_processing(); }
107  private:
109  };
110 
111  /// Called when there is a ReceivedDataSample to be delivered.
112  virtual void deliver_sample(ReceivedDataSample& sample,
113  const ACE_INET_Addr& remote_address) = 0;
114 
115  virtual void finish_message() {}
116 
117  /// Let the subclass start.
118  virtual int start_i() = 0;
119 
120  /// Let the subclass stop.
121  virtual void stop_i() = 0;
122 
123  /// Ignore bad PDUs by skipping over them.
124  int skip_bad_pdus();
125 
126  /// For datagram-based derived classes, reset() can be called to clear any
127  /// state that may be remaining from parsing the previous datagram.
128  void reset();
129 
130  size_t pdu_remaining() const { return this->pdu_remaining_; }
131 
132  /// Flag indicates if the GRACEFUL_DISCONNECT message is received.
134 
135  /// Manage an index into the receive buffer array.
136  size_t successor_index(size_t index) const;
137 
138  void update_buffer_index(bool& done);
139 
140  virtual bool reassemble(ReceivedDataSample& data);
141 
142  /// Bytes remaining in the current DataSample.
144 
145  /// Current receive TransportHeader.
147 
148 //MJM: We should probably bring the allocator typedefs down into this
149 //MJM: class since they are limited to this scope.
153 
154  /// Locking strategy for the allocators.
156 
157  /// Set of receive buffers in use.
158  OPENDDS_VECTOR(ACE_Message_Block*) receive_buffers_;
159 
160  /// Current receive buffer index in use.
162 
163  /// Current data sample header.
165 
167 
168  /** Flag indicating that the currently resident PDU is a good one
169  * (i.e. has not been received and processed previously). This is
170  * included in case we receive PDUs that were resent for reliability
171  * reasons and we receive one even if we have already processed it.
172  * This is a use case from multicast transports.
173  */
174  bool good_pdu_;
175 
176  /// Amount of the current PDU that has not been processed yet.
178 };
179 
180 } // namespace DCPS */
181 } // namespace OpenDDS */
182 
184 
185 #if defined (__ACE_INLINE__)
187 #endif /* __ACE_INLINE__ */
188 
189 #ifdef ACE_TEMPLATES_REQUIRE_SOURCE
191 #endif
192 
193 #endif /* OPENDDS_DCPS_TRANSPORTRECEIVESTRATEGY */
size_t pdu_remaining_
Amount of the current PDU that has not been processed yet.
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
int ssize_t
DSH data_sample_header_
Current data sample header.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
size_t receive_sample_remaining_
Bytes remaining in the current DataSample.
ACE_Lock_Adapter< ACE_SYNCH_MUTEX > receive_lock_
Locking strategy for the allocators.
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
virtual void end_transport_header_processing()
End Current Transport Header Processing.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void begin_transport_header_processing()
Begin Current Transport Header Processing.
size_t buffer_index_
Current receive buffer index in use.
TH receive_transport_header_
Current receive TransportHeader.
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28