OpenDDS  Snapshot(2023/04/28-20:55)
TransportSendStrategy.h
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
9 #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
10 
11 #include "BasicQueue_T.h"
13 #include "ThreadSynchWorker.h"
14 #include "TransportDefs.h"
15 #include "TransportImpl_rch.h"
16 #include "TransportHeader.h"
19 
20 #include <dds/DCPS/dcps_export.h>
21 #include "dds/DCPS/Atomic.h"
23 #include <dds/DCPS/Definitions.h>
25 #include <dds/DCPS/PoolAllocator.h>
26 #include <dds/DCPS/RcObject.h>
27 
28 #if defined(OPENDDS_SECURITY)
29 #include <dds/DdsSecurityCoreC.h>
32 #endif
33 
34 #include <ace/Synch_Traits.h>
35 
37 
38 namespace OpenDDS {
39 namespace DCPS {
40 
41 class ThreadSynch;
42 class ThreadSynchResource;
43 class TransportQueueElement;
44 class TransportSendElement;
45 class TransportSendBuffer;
46 class DataSampleElement;
47 class QueueRemoveVisitor;
48 class PacketRemoveVisitor;
49 class TransportImpl;
50 
51 /**
52  * This class provides methods to fill packets with samples for sending
53  * and handles backpressure. It maintains the list of samples in current
54  * packets and also the list of samples queued during backpressure. A thread
55  * per connection is created to handle the queued samples.
56  *
57  * Notes for the object ownership:
58  * 1) Owns ThreadSynch object, list of samples in current packet and list
59  * of samples in queue.
60  */
62  : public ThreadSynchWorker {
63 public:
64  virtual ~TransportSendStrategy();
65 
66  /// Assigns an optional send buffer.
67  void send_buffer(TransportSendBuffer* send_buffer);
68 
69  /// Start the TransportSendStrategy. This happens once, when
70  /// the DataLink that "owns" this strategy object has established
71  /// a connection.
72  int start();
73 
74  /// Stop the TransportSendStrategy. This happens once, when the
75  /// DataLink that "owns" this strategy object is going away.
76  void stop();
77 
78  /// Invoked prior to one or more send() invocations from a particular
79  /// TransportClient.
80  void send_start();
81 
82  /// Our DataLink has been requested by some particular
83  /// TransportClient to send the element.
84  void send(TransportQueueElement* element, bool relink = true);
85 
86  /// Invoked after one or more send() invocations from a particular
87  /// TransportClient.
88  void send_stop(GUID_t repoId);
89 
90  /// Our DataLink has been requested by some particular
91  /// TransportClient to remove the supplied sample
92  /// (basically, an "unsend" attempt) from this strategy object.
93  RemoveResult remove_sample(const DataSampleElement* sample);
94 
95  void remove_all_msgs(const GUID_t& pub_id);
96 
97  /// Called by our ThreadSynch object when we should be able to
98  /// start sending any partial packet bytes and/or compose a new
99  /// packet using elements from the queue_.
100  ///
101  /// Returns 0 to indicate that the ThreadSynch object doesn't need
102  /// to call perform_work() again since the queue (and any unsent
103  /// packet bytes) has been drained, and the mode_ has been switched
104  /// to MODE_DIRECT.
105  ///
106  /// Returns 1 to indicate that there is more work to do, and the
107  /// ThreadSynch object should have this perform_work() method
108  /// called again.
109  virtual WorkOutcome perform_work();
110 
111  /// The subclass needs to provide the implementation
112  /// for re-establishing the datalink. This is called
113  /// when send returns an error.
114  virtual void relink(bool do_suspend = true);
115 
116  /// This is called when first time reconnect is attempted. The send mode
117  /// is set to MODE_SUSPEND. Messages are queued at this state.
118  void suspend_send();
119 
120  /// This is called when connection is lost and reconnect succeeds.
121  /// The send mode is set to the mode before suspend which is either MODE_QUEUE
122  /// or MODE_DIRECT.
123  void resume_send();
124 
125  /// This is called whenver the connection is lost and reconnect fails.
126  /// It removes all samples in the backpressure queue and packet queue.
127  void terminate_send(bool graceful_disconnecting = false);
128  virtual void terminate_send_if_suspended();
129 
130  // Moved clear() declaration below as Enums can't be forward declared.
131 
132  /// Let the subclass stop.
133  virtual void stop_i() = 0;
134 
135  /// Let the subclass start.
136  virtual bool start_i() { return true; }
137 
138  void link_released(bool flag);
139 
140  bool isDirectMode();
141 
143 
144  /// Convert ACE_Message_Block chain into iovec[] entries for send(),
145  /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS).
146  /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
147  static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov);
148 
149  // Subclasses which make use of acceptors should override
150  // this method and return the peer handle.
151  virtual ACE_HANDLE get_handle();
152 
153  void deliver_ack_request(TransportQueueElement* element);
154 
155  /// Put the maximum UDP payload size here so that it can be shared by all
156  /// UDP-based transports. This is the worst-case (conservative) value for
157  /// UDP/IPv4. If there are no IP options, or if IPv6 is used, it could
158  /// actually be a little larger.
159  static const size_t UDP_MAX_MESSAGE_SIZE = 65466;
160 
161  /// Alternative to TransportSendStrategy::send for fragmentation
162  ///
163  /// @param original_element data sample to send, may be larger than max msg size
164  /// @param elements_to_send populated by this method with either original_element
165  /// or fragments created from it. Elements need to be
166  /// cleaned up by the caller using data_delivered or
167  /// data_dropped.
168  /// @return operation succeeded
169  bool fragmentation_helper(
170  TransportQueueElement* original_element, TqeVector& elements_to_send);
171 
172 protected:
173 
174  TransportSendStrategy(std::size_t id,
175  const TransportImpl_rch& transport,
176  ThreadSynchResource* synch_resource,
177  Priority priority,
178  const ThreadSynchStrategy_rch& thread_sync_strategy);
179 
180 
181  // Only our subclass knows how to do this.
182  // Third arg is the "back-pressure" flag. If send_bytes() returns
183  // -1 and the bp == 1, then it isn't really an error - it is
184  // backpressure.
185  virtual ssize_t send_bytes(const iovec iov[], int n, int& bp);
186 
187  virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp);
188 
189  virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0;
190 
191  /// Specific implementation processing of prepared packet header.
192  virtual void prepare_header_i();
193 
194  /// Specific implementation processing of prepared packet.
195  virtual void prepare_packet_i();
196 
197  TransportQueueElement* current_packet_first_element() const;
198 
199  /// The maximum size of a message allowed by the this TransportImpl, or 0
200  /// if there is no such limit. This is expected to be a constant, for example
201  /// UDP/IPv4 can send messages of up to 65466 bytes.
202  /// The transport framework will use the returned value (if > 0) to
203  /// fragment larger messages. This fragmentation and
204  /// reassembly will be transparent to the user.
205  virtual size_t max_message_size() const;
206 
207  /// Set graceful disconnecting flag.
208  void set_graceful_disconnecting(bool flag);
209 
210  virtual void add_delayed_notification(TransportQueueElement* element);
211 
212  /// If delayed notifications were queued up, issue those callbacks here.
213  /// The default match is "match all", otherwise match can be used to specify
214  /// either a certain individual packet or a publication id.
215  /// Returns true if anything in the delayed notification list matched.
216  bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0);
217 
218 #ifdef OPENDDS_SECURITY
220 #endif
221 
222 private:
223 
229  OUTCOME_SEND_ERROR
230  };
231 
232  /// Called from send() when it is time to attempt to send our
233  /// current packet to the socket while in MODE_DIRECT mode_.
234  /// If backpressure occurs, our current packet will be adjusted
235  /// to account for bytes that were sent, and the mode will be
236  /// changed to MODE_QUEUE.
237  /// If no backpressure occurs (ie, the entire packet is sent), then
238  /// our current packet will be "reset" to be an empty packet following
239  /// the send.
240  void direct_send(bool relink);
241 
242  /// This method is used while in MODE_QUEUE mode, and a new packet
243  /// needs to be formulated using elements from the queue_. This is
244  /// the first step of formulating the new packet. It will extract
245  /// elements from the queue_ and insert those elements into the
246  /// pkt_elems_ collection.
247  ///
248  /// After this step has been done, the prepare_packet() step can
249  /// be performed, followed by the actual send_packet() call.
250  void get_packet_elems_from_queue();
251 
252  /// This method is responsible for updating the packet header.
253  /// Called exclusively by prepare_packet.
254  void prepare_header();
255 
256  /// This method is responsible for actually "creating" the current
257  /// send packet using the packet header and the collection of
258  /// packet elements that are to make-up the packet's contents.
259  void prepare_packet();
260 
261  /// This is called to send the current packet. The current packet
262  /// will either be a "partially sent" packet, or a packet that has
263  /// just been prepared via a call to prepare_packet().
264  SendPacketOutcome send_packet();
265 
266  /// Form an IOV and call the send_bytes() template method.
267  ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp);
268 
269 #ifdef OPENDDS_SECURITY
270  /// Derived classes can override to transform the data right before it's
271  /// sent. If the returned value is non-NULL it will be sent instead of
272  /// sending the parameter. If the returned value is NULL the original
273  /// message will be dropped.
275  {
276  return m->duplicate();
277  }
278 #endif
279 
280  /// This is called from the send_packet() method after it has
281  /// sent at least one byte from the current packet. This method
282  /// will update the current packet appropriately, as well as deal
283  /// with all of the release()'ing of fully sent ACE_Message_Blocks,
284  /// and the data_delivered() calls on the fully sent elements.
285  /// Returns 0 if the entire packet was sent, and returns 1 if
286  /// the entire packet was not sent.
287  int adjust_packet_after_send(ssize_t num_bytes_sent);
288 
289  /**
290  * How much space is available in packet with a given used space before we
291  * reach one of the limits: max_message_size() [transport's inherent
292  * limitation] or max_size_ [user's configured limit]
293  */
294  size_t space_available(size_t already_used = 0) const;
295 
296  /**
297  * Like above, but use the current packet.
298  */
299  size_t current_space_available() const;
300 
303 
304 public:
305  enum SendMode {
306  // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
307  // we can check if the resume_send is paired with suspend_send.
309  // Send out the sample with current packet.
311  // The samples need be queued because of the backpressure or partial send.
313  // The samples need be queued because the connection is lost and we are
314  // trying to reconnect.
316  // The samples need be dropped since we lost connection and could not
317  // reconnect.
318  MODE_TERMINATED
319  };
320 
321  /// Clear queued messages and messages in current packet and set the
322  /// current mode to new_mod if the current mode equals old_mode or
323  /// old_mode is MODE_NOT_SET.
324 
325  void clear(SendMode new_mode, SendMode old_mode = MODE_NOT_SET);
326 
327  /// Access the current sending mode.
328  SendMode mode() const;
329 protected:
330  /// Implement framework chain visitations to remove a sample.
331  virtual RemoveResult do_remove_sample(const GUID_t& pub_id,
332  const TransportQueueElement::MatchCriteria& criteria, bool remove_all = false);
333 
334 private:
335 
336  virtual bool marshal_transport_header(ACE_Message_Block* mb);
337 
338  /// Helper function to debugging.
339  static const char* mode_as_str(SendMode mode);
340 
341  /// Configuration - max number of samples per transport packet
342  size_t max_samples_;
343 
344  /// Configuration - optimum transport packet size (bytes)
345  ACE_UINT32 optimum_size_;
346 
347  /// Configuration - max transport packet size (bytes)
348  ACE_UINT32 max_size_;
349 
350  /// Used during backpressure situations to hold samples that have
351  /// not yet been made to be part of a transport packet, and are
352  /// completely unsent.
353  /// Also used as a bucket for packets which still have to become
354  /// part of a packet.
355  QueueType queue_;
356 
357  /// Maximum marshalled size of the transport packet header.
359 
360  /// Current transport packet header, marshalled.
362 
363  /// Current transport header sequence number.
365 
366  /// Current elements that have contributed blocks to the current
367  /// transport packet.
368  QueueType elems_;
369 
370  /// Current (head of chain) block containing unsent bytes for the
371  /// current transport packet.
373 
374  /// Set to false when the packet header hasn't been fully sent.
375  /// Set to true once the packet header has been fully sent.
377 
378  /// Counter that, when greater than zero, indicates that we still
379  /// expect to receive a send_stop() event.
380  /// Incremented once for each call to our send_start() method,
381  /// and decremented once for each call to our send_stop() method.
382  /// We only care about the transitions of the start_counter_
383  /// value from 0 to 1, and from 1 to 0. This accommodates the
384  /// case where more than one TransportClient is sending to
385  /// us at the same time. We use this counter to enable a
386  /// "composite" send_start() and send_stop().
387  unsigned start_counter_;
388 
389  /// This mode determines how send() calls will be handled.
391 
392  /// This mode remembers the mode before send is suspended and is
393  /// used after the send is resumed because the connection is
394  /// re-established.
396 
397  /// Used for delayed notifications when performing work.
398  typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair;
399  OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_;
400 
401  /// Allocator for header data block.
403 
404  /// Allocator for header message block.
406 
407  /// DataBlockLockPool
409 
410  /// Allocator for data buffers.
413 
414  /// The thread synch object.
416 
417  /// This lock will protect critical sections of code that play a
418  /// role in the sending of data.
419  LockType lock_;
420 
421  /// Cached allocator for TransportReplaceElement.
424 
426 
428 
430 
432 
433  // N.B. The behavior present in TransortSendBuffer should be
434  // refactored into the TransportSendStrategy eventually; a good
435  // amount of private state is shared between both classes.
436  friend class TransportSendBuffer;
437 
438  /// Current transport packet header.
440 
441 protected:
442  ThreadSynch* synch() const;
443 
444  void set_header_source(ACE_INT64 source);
445 };
446 
447 } // namespace DCPS
448 } // namespace OpenDDS
449 
451 
452 #if defined (__ACE_INLINE__)
453 #include "TransportSendStrategy.inl"
454 #endif /* __ACE_INLINE__ */
455 
456 #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */
#define ACE_SYNCH_MUTEX
WeakRcHandle< TransportImpl > transport_
std::pair< TransportQueueElement *, SendMode > TQESendModePair
Used for delayed notifications when performing work.
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
SequenceNumber header_sequence_
Current transport header sequence number.
virtual ACE_Message_Block * pre_send_packet(const ACE_Message_Block *m)
#define OpenDDS_Dcps_Export
Definition: dcps_export.h:24
ssize_t send(ACE_HANDLE handle, const void *buf, size_t len, int flags, const ACE_Time_Value *timeout=0)
unique_ptr< TransportMessageBlockAllocator > header_mb_allocator_
Allocator for header data block.
int ssize_t
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
TransportHeader header_
Current transport packet header.
unique_ptr< DataAllocator > header_data_allocator_
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
size_t max_samples_
Configuration - max number of samples per transport packet.
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)
BasicQueue< TransportQueueElement > QueueType
virtual ACE_Message_Block * duplicate(void) const
Defines class that represents a transport packet header.
virtual bool start_i()
Let the subclass start.
ACE_HANDLE get_handle(void)
unique_ptr< ThreadSynch > synch_
The thread synch object.
virtual Security::SecurityConfig_rch security_config() const
ACE_CDR::Long Priority
size_t max_header_size_
Maximum marshalled size of the transport packet header.
unique_ptr< DataBlockLockPool > header_db_lock_pool_
DataBlockLockPool.
unique_ptr< TransportDataBlockAllocator > header_db_allocator_
Allocator for header message block.
Sequence number abstraction. Only allows positive 64 bit values.
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
long long ACE_INT64
typedef OPENDDS_VECTOR(ActionConnectionRecord) ConnectionRecords
DCPS::RcHandle< SecurityConfig > SecurityConfig_rch
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
Allocator for data buffers.
MessageBlockAllocator replaced_element_mb_allocator_
Cached allocator for TransportReplaceElement.
Base wrapper class around a data/control sample to be sent.