Line data Source code
1 : /*
2 : *
3 : *
4 : * Distributed under the OpenDDS License.
5 : * See: http://www.opendds.org/license.html
6 : */
7 :
8 : #ifndef OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
9 : #define OPENDDS_DCPS_TRANSPORT_FRAMEWORK_TRANSPORTSENDSTRATEGY_H
10 :
11 : #include "BasicQueue_T.h"
12 : #include "ThreadSynchStrategy_rch.h"
13 : #include "ThreadSynchWorker.h"
14 : #include "TransportDefs.h"
15 : #include "TransportImpl_rch.h"
16 : #include "TransportHeader.h"
17 : #include "TransportReplacedElement.h"
18 : #include "TransportRetainedElement.h"
19 :
20 : #include <dds/DCPS/dcps_export.h>
21 : #include "dds/DCPS/Atomic.h"
22 : #include <dds/DCPS/DataBlockLockPool.h>
23 : #include <dds/DCPS/Definitions.h>
24 : #include <dds/DCPS/Dynamic_Cached_Allocator_With_Overflow_T.h>
25 : #include <dds/DCPS/PoolAllocator.h>
26 : #include <dds/DCPS/RcObject.h>
27 :
28 : #if defined(OPENDDS_SECURITY)
29 : #include <dds/DdsSecurityCoreC.h>
30 : #include <dds/DCPS/security/framework/SecurityConfig.h>
31 : #include <dds/DCPS/security/framework/SecurityConfig_rch.h>
32 : #endif
33 :
34 : #include <ace/Synch_Traits.h>
35 :
36 : OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
37 :
38 : namespace OpenDDS {
39 : namespace DCPS {
40 :
41 : class ThreadSynch;
42 : class ThreadSynchResource;
43 : class TransportQueueElement;
44 : class TransportSendElement;
45 : class TransportSendBuffer;
46 : class DataSampleElement;
47 : class QueueRemoveVisitor;
48 : class PacketRemoveVisitor;
49 : class TransportImpl;
50 :
51 : /**
52 : * This class provides methods to fill packets with samples for sending
53 : * and handles backpressure. It maintains the list of samples in current
54 : * packets and also the list of samples queued during backpressure. A thread
55 : * per connection is created to handle the queued samples.
56 : *
57 : * Notes for the object ownership:
58 : * 1) Owns ThreadSynch object, list of samples in current packet and list
59 : * of samples in queue.
60 : */
61 : class OpenDDS_Dcps_Export TransportSendStrategy
62 : : public ThreadSynchWorker {
63 : public:
64 : virtual ~TransportSendStrategy();
65 :
66 : /// Assigns an optional send buffer.
67 : void send_buffer(TransportSendBuffer* send_buffer);
68 :
69 : /// Start the TransportSendStrategy. This happens once, when
70 : /// the DataLink that "owns" this strategy object has established
71 : /// a connection.
72 : int start();
73 :
74 : /// Stop the TransportSendStrategy. This happens once, when the
75 : /// DataLink that "owns" this strategy object is going away.
76 : void stop();
77 :
78 : /// Invoked prior to one or more send() invocations from a particular
79 : /// TransportClient.
80 : void send_start();
81 :
82 : /// Our DataLink has been requested by some particular
83 : /// TransportClient to send the element.
84 : void send(TransportQueueElement* element, bool relink = true);
85 :
86 : /// Invoked after one or more send() invocations from a particular
87 : /// TransportClient.
88 : void send_stop(GUID_t repoId);
89 :
90 : /// Our DataLink has been requested by some particular
91 : /// TransportClient to remove the supplied sample
92 : /// (basically, an "unsend" attempt) from this strategy object.
93 : RemoveResult remove_sample(const DataSampleElement* sample);
94 :
95 : void remove_all_msgs(const GUID_t& pub_id);
96 :
97 : /// Called by our ThreadSynch object when we should be able to
98 : /// start sending any partial packet bytes and/or compose a new
99 : /// packet using elements from the queue_.
100 : ///
101 : /// Returns 0 to indicate that the ThreadSynch object doesn't need
102 : /// to call perform_work() again since the queue (and any unsent
103 : /// packet bytes) has been drained, and the mode_ has been switched
104 : /// to MODE_DIRECT.
105 : ///
106 : /// Returns 1 to indicate that there is more work to do, and the
107 : /// ThreadSynch object should have this perform_work() method
108 : /// called again.
109 : virtual WorkOutcome perform_work();
110 :
111 : /// The subclass needs to provide the implementation
112 : /// for re-establishing the datalink. This is called
113 : /// when send returns an error.
114 : virtual void relink(bool do_suspend = true);
115 :
116 : /// This is called when first time reconnect is attempted. The send mode
117 : /// is set to MODE_SUSPEND. Messages are queued at this state.
118 : void suspend_send();
119 :
120 : /// This is called when connection is lost and reconnect succeeds.
121 : /// The send mode is set to the mode before suspend which is either MODE_QUEUE
122 : /// or MODE_DIRECT.
123 : void resume_send();
124 :
125 : /// This is called whenver the connection is lost and reconnect fails.
126 : /// It removes all samples in the backpressure queue and packet queue.
127 : void terminate_send(bool graceful_disconnecting = false);
128 : virtual void terminate_send_if_suspended();
129 :
130 : // Moved clear() declaration below as Enums can't be forward declared.
131 :
132 : /// Let the subclass stop.
133 : virtual void stop_i() = 0;
134 :
135 : /// Let the subclass start.
136 0 : virtual bool start_i() { return true; }
137 :
138 : void link_released(bool flag);
139 :
140 : bool isDirectMode();
141 :
142 : typedef BasicQueue<TransportQueueElement> QueueType;
143 :
144 : /// Convert ACE_Message_Block chain into iovec[] entries for send(),
145 : /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS).
146 : /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
147 : static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov);
148 :
149 : // Subclasses which make use of acceptors should override
150 : // this method and return the peer handle.
151 : virtual ACE_HANDLE get_handle();
152 :
153 : void deliver_ack_request(TransportQueueElement* element);
154 :
155 : /// Put the maximum UDP payload size here so that it can be shared by all
156 : /// UDP-based transports. This is the worst-case (conservative) value for
157 : /// UDP/IPv4. If there are no IP options, or if IPv6 is used, it could
158 : /// actually be a little larger.
159 : static const size_t UDP_MAX_MESSAGE_SIZE = 65466;
160 :
161 : /// Alternative to TransportSendStrategy::send for fragmentation
162 : ///
163 : /// @param original_element data sample to send, may be larger than max msg size
164 : /// @param elements_to_send populated by this method with either original_element
165 : /// or fragments created from it. Elements need to be
166 : /// cleaned up by the caller using data_delivered or
167 : /// data_dropped.
168 : /// @return operation succeeded
169 : bool fragmentation_helper(
170 : TransportQueueElement* original_element, TqeVector& elements_to_send);
171 :
172 : protected:
173 :
174 : TransportSendStrategy(std::size_t id,
175 : const TransportImpl_rch& transport,
176 : ThreadSynchResource* synch_resource,
177 : Priority priority,
178 : const ThreadSynchStrategy_rch& thread_sync_strategy);
179 :
180 :
181 : // Only our subclass knows how to do this.
182 : // Third arg is the "back-pressure" flag. If send_bytes() returns
183 : // -1 and the bp == 1, then it isn't really an error - it is
184 : // backpressure.
185 : virtual ssize_t send_bytes(const iovec iov[], int n, int& bp);
186 :
187 : virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp);
188 :
189 : virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0;
190 :
191 : /// Specific implementation processing of prepared packet header.
192 : virtual void prepare_header_i();
193 :
194 : /// Specific implementation processing of prepared packet.
195 : virtual void prepare_packet_i();
196 :
197 : TransportQueueElement* current_packet_first_element() const;
198 :
199 : /// The maximum size of a message allowed by the this TransportImpl, or 0
200 : /// if there is no such limit. This is expected to be a constant, for example
201 : /// UDP/IPv4 can send messages of up to 65466 bytes.
202 : /// The transport framework will use the returned value (if > 0) to
203 : /// fragment larger messages. This fragmentation and
204 : /// reassembly will be transparent to the user.
205 : virtual size_t max_message_size() const;
206 :
207 : /// Set graceful disconnecting flag.
208 : void set_graceful_disconnecting(bool flag);
209 :
210 : virtual void add_delayed_notification(TransportQueueElement* element);
211 :
212 : /// If delayed notifications were queued up, issue those callbacks here.
213 : /// The default match is "match all", otherwise match can be used to specify
214 : /// either a certain individual packet or a publication id.
215 : /// Returns true if anything in the delayed notification list matched.
216 : bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0);
217 :
218 : #ifdef OPENDDS_SECURITY
219 0 : virtual Security::SecurityConfig_rch security_config() const { return Security::SecurityConfig_rch(); }
220 : #endif
221 :
222 : private:
223 :
224 : enum SendPacketOutcome {
225 : OUTCOME_COMPLETE_SEND,
226 : OUTCOME_PARTIAL_SEND,
227 : OUTCOME_BACKPRESSURE,
228 : OUTCOME_PEER_LOST,
229 : OUTCOME_SEND_ERROR
230 : };
231 :
232 : /// Called from send() when it is time to attempt to send our
233 : /// current packet to the socket while in MODE_DIRECT mode_.
234 : /// If backpressure occurs, our current packet will be adjusted
235 : /// to account for bytes that were sent, and the mode will be
236 : /// changed to MODE_QUEUE.
237 : /// If no backpressure occurs (ie, the entire packet is sent), then
238 : /// our current packet will be "reset" to be an empty packet following
239 : /// the send.
240 : void direct_send(bool relink);
241 :
242 : /// This method is used while in MODE_QUEUE mode, and a new packet
243 : /// needs to be formulated using elements from the queue_. This is
244 : /// the first step of formulating the new packet. It will extract
245 : /// elements from the queue_ and insert those elements into the
246 : /// pkt_elems_ collection.
247 : ///
248 : /// After this step has been done, the prepare_packet() step can
249 : /// be performed, followed by the actual send_packet() call.
250 : void get_packet_elems_from_queue();
251 :
252 : /// This method is responsible for updating the packet header.
253 : /// Called exclusively by prepare_packet.
254 : void prepare_header();
255 :
256 : /// This method is responsible for actually "creating" the current
257 : /// send packet using the packet header and the collection of
258 : /// packet elements that are to make-up the packet's contents.
259 : void prepare_packet();
260 :
261 : /// This is called to send the current packet. The current packet
262 : /// will either be a "partially sent" packet, or a packet that has
263 : /// just been prepared via a call to prepare_packet().
264 : SendPacketOutcome send_packet();
265 :
266 : /// Form an IOV and call the send_bytes() template method.
267 : ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp);
268 :
269 : #ifdef OPENDDS_SECURITY
270 : /// Derived classes can override to transform the data right before it's
271 : /// sent. If the returned value is non-NULL it will be sent instead of
272 : /// sending the parameter. If the returned value is NULL the original
273 : /// message will be dropped.
274 0 : virtual ACE_Message_Block* pre_send_packet(const ACE_Message_Block* m)
275 : {
276 0 : return m->duplicate();
277 : }
278 : #endif
279 :
280 : /// This is called from the send_packet() method after it has
281 : /// sent at least one byte from the current packet. This method
282 : /// will update the current packet appropriately, as well as deal
283 : /// with all of the release()'ing of fully sent ACE_Message_Blocks,
284 : /// and the data_delivered() calls on the fully sent elements.
285 : /// Returns 0 if the entire packet was sent, and returns 1 if
286 : /// the entire packet was not sent.
287 : int adjust_packet_after_send(ssize_t num_bytes_sent);
288 :
289 : /**
290 : * How much space is available in packet with a given used space before we
291 : * reach one of the limits: max_message_size() [transport's inherent
292 : * limitation] or max_size_ [user's configured limit]
293 : */
294 : size_t space_available(size_t already_used = 0) const;
295 :
296 : /**
297 : * Like above, but use the current packet.
298 : */
299 : size_t current_space_available() const;
300 :
301 : typedef ACE_SYNCH_MUTEX LockType;
302 : typedef ACE_Guard<LockType> GuardType;
303 :
304 : public:
305 : enum SendMode {
306 : // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so
307 : // we can check if the resume_send is paired with suspend_send.
308 : MODE_NOT_SET,
309 : // Send out the sample with current packet.
310 : MODE_DIRECT,
311 : // The samples need be queued because of the backpressure or partial send.
312 : MODE_QUEUE,
313 : // The samples need be queued because the connection is lost and we are
314 : // trying to reconnect.
315 : MODE_SUSPEND,
316 : // The samples need be dropped since we lost connection and could not
317 : // reconnect.
318 : MODE_TERMINATED
319 : };
320 :
321 : /// Clear queued messages and messages in current packet and set the
322 : /// current mode to new_mod if the current mode equals old_mode or
323 : /// old_mode is MODE_NOT_SET.
324 :
325 : void clear(SendMode new_mode, SendMode old_mode = MODE_NOT_SET);
326 :
327 : /// Access the current sending mode.
328 : SendMode mode() const;
329 : protected:
330 : /// Implement framework chain visitations to remove a sample.
331 : virtual RemoveResult do_remove_sample(const GUID_t& pub_id,
332 : const TransportQueueElement::MatchCriteria& criteria, bool remove_all = false);
333 :
334 : private:
335 :
336 : virtual bool marshal_transport_header(ACE_Message_Block* mb);
337 :
338 : /// Helper function to debugging.
339 : static const char* mode_as_str(SendMode mode);
340 :
341 : /// Configuration - max number of samples per transport packet
342 : size_t max_samples_;
343 :
344 : /// Configuration - optimum transport packet size (bytes)
345 : ACE_UINT32 optimum_size_;
346 :
347 : /// Configuration - max transport packet size (bytes)
348 : ACE_UINT32 max_size_;
349 :
350 : /// Used during backpressure situations to hold samples that have
351 : /// not yet been made to be part of a transport packet, and are
352 : /// completely unsent.
353 : /// Also used as a bucket for packets which still have to become
354 : /// part of a packet.
355 : QueueType queue_;
356 :
357 : /// Maximum marshalled size of the transport packet header.
358 : size_t max_header_size_;
359 :
360 : /// Current transport packet header, marshalled.
361 : ACE_Message_Block* header_block_;
362 :
363 : /// Current transport header sequence number.
364 : SequenceNumber header_sequence_;
365 :
366 : /// Current elements that have contributed blocks to the current
367 : /// transport packet.
368 : QueueType elems_;
369 :
370 : /// Current (head of chain) block containing unsent bytes for the
371 : /// current transport packet.
372 : ACE_Message_Block* pkt_chain_;
373 :
374 : /// Set to false when the packet header hasn't been fully sent.
375 : /// Set to true once the packet header has been fully sent.
376 : bool header_complete_;
377 :
378 : /// Counter that, when greater than zero, indicates that we still
379 : /// expect to receive a send_stop() event.
380 : /// Incremented once for each call to our send_start() method,
381 : /// and decremented once for each call to our send_stop() method.
382 : /// We only care about the transitions of the start_counter_
383 : /// value from 0 to 1, and from 1 to 0. This accommodates the
384 : /// case where more than one TransportClient is sending to
385 : /// us at the same time. We use this counter to enable a
386 : /// "composite" send_start() and send_stop().
387 : unsigned start_counter_;
388 :
389 : /// This mode determines how send() calls will be handled.
390 : Atomic<SendMode> mode_;
391 :
392 : /// This mode remembers the mode before send is suspended and is
393 : /// used after the send is resumed because the connection is
394 : /// re-established.
395 : SendMode mode_before_suspend_;
396 :
397 : /// Used for delayed notifications when performing work.
398 : typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair;
399 : OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_;
400 :
401 : /// Allocator for header data block.
402 : unique_ptr<TransportMessageBlockAllocator> header_mb_allocator_;
403 :
404 : /// Allocator for header message block.
405 : unique_ptr<TransportDataBlockAllocator> header_db_allocator_;
406 :
407 : /// DataBlockLockPool
408 : unique_ptr<DataBlockLockPool> header_db_lock_pool_;
409 :
410 : /// Allocator for data buffers.
411 : typedef Dynamic_Cached_Allocator_With_Overflow<ACE_Thread_Mutex> DataAllocator;
412 : unique_ptr<DataAllocator> header_data_allocator_;
413 :
414 : /// The thread synch object.
415 : unique_ptr<ThreadSynch> synch_;
416 :
417 : /// This lock will protect critical sections of code that play a
418 : /// role in the sending of data.
419 : LockType lock_;
420 :
421 : /// Cached allocator for TransportReplaceElement.
422 : MessageBlockAllocator replaced_element_mb_allocator_;
423 : DataBlockAllocator replaced_element_db_allocator_;
424 :
425 : WeakRcHandle<TransportImpl> transport_;
426 :
427 : bool graceful_disconnecting_;
428 :
429 : bool link_released_;
430 :
431 : TransportSendBuffer* send_buffer_;
432 :
433 : // N.B. The behavior present in TransortSendBuffer should be
434 : // refactored into the TransportSendStrategy eventually; a good
435 : // amount of private state is shared between both classes.
436 : friend class TransportSendBuffer;
437 :
438 : /// Current transport packet header.
439 : TransportHeader header_;
440 :
441 : protected:
442 : ThreadSynch* synch() const;
443 :
444 : void set_header_source(ACE_INT64 source);
445 : };
446 :
447 : } // namespace DCPS
448 : } // namespace OpenDDS
449 :
450 : OPENDDS_END_VERSIONED_NAMESPACE_DECL
451 :
452 : #if defined (__ACE_INLINE__)
453 : #include "TransportSendStrategy.inl"
454 : #endif /* __ACE_INLINE__ */
455 :
456 : #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */
|