00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H 00009 #define OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/Definitions.h" 00013 #include "dds/DCPS/RcObject.h" 00014 #include "dds/DCPS/PoolAllocator.h" 00015 #include "ThreadSynchWorker.h" 00016 #include "TransportDefs.h" 00017 #include "BasicQueue_T.h" 00018 #include "TransportHeader.h" 00019 #include "TransportReplacedElement.h" 00020 #include "TransportRetainedElement.h" 00021 #include "ThreadSynchStrategy_rch.h" 00022 #include "ace/Synch_Traits.h" 00023 00024 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL 00025 00026 namespace OpenDDS { 00027 namespace DCPS { 00028 00029 class ThreadSynch; 00030 class ThreadSynchResource; 00031 class TransportQueueElement; 00032 class TransportSendElement; 00033 class TransportSendBuffer; 00034 class DataSampleElement; 00035 class QueueRemoveVisitor; 00036 class PacketRemoveVisitor; 00037 class TransportImpl; 00038 00039 /** 00040 * This class provides methods to fill packets with samples for sending 00041 * and handles backpressure. It maintains the list of samples in current 00042 * packets and also the list of samples queued during backpressure. A thread 00043 * per connection is created to handle the queued samples. 00044 * 00045 * Notes for the object ownership: 00046 * 1) Owns ThreadSynch object, list of samples in current packet and list 00047 * of samples in queue. 00048 */ 00049 class OpenDDS_Dcps_Export TransportSendStrategy 00050 : public ThreadSynchWorker { 00051 public: 00052 virtual ~TransportSendStrategy(); 00053 00054 /// Assigns an optional send buffer. 00055 void send_buffer(TransportSendBuffer* send_buffer); 00056 00057 /// Start the TransportSendStrategy. This happens once, when 00058 /// the DataLink that "owns" this strategy object has established 00059 /// a connection. 00060 int start(); 00061 00062 /// Stop the TransportSendStrategy. This happens once, when the 00063 /// DataLink that "owns" this strategy object is going away. 00064 void stop(); 00065 00066 /// Invoked prior to one or more send() invocations from a particular 00067 /// TransportClient. 00068 void send_start(); 00069 00070 /// Our DataLink has been requested by some particular 00071 /// TransportClient to send the element. 00072 void send(TransportQueueElement* element, bool relink = true); 00073 00074 /// Invoked after one or more send() invocations from a particular 00075 /// TransportClient. 00076 void send_stop(RepoId repoId); 00077 00078 /// Our DataLink has been requested by some particular 00079 /// TransportClient to remove the supplied sample 00080 /// (basically, an "unsend" attempt) from this strategy object. 00081 RemoveResult remove_sample(const DataSampleElement* sample, void* context); 00082 00083 void remove_all_msgs(RepoId pub_id); 00084 00085 /// Called by our ThreadSynch object when we should be able to 00086 /// start sending any partial packet bytes and/or compose a new 00087 /// packet using elements from the queue_. 00088 /// 00089 /// Returns 0 to indicate that the ThreadSynch object doesn't need 00090 /// to call perform_work() again since the queue (and any unsent 00091 /// packet bytes) has been drained, and the mode_ has been switched 00092 /// to MODE_DIRECT. 00093 /// 00094 /// Returns 1 to indicate that there is more work to do, and the 00095 /// ThreadSynch object should have this perform_work() method 00096 /// called again. 00097 virtual WorkOutcome perform_work(); 00098 00099 /// The subclass needs to provide the implementation 00100 /// for re-establishing the datalink. This is called 00101 /// when send returns an error. 00102 virtual void relink(bool do_suspend = true); 00103 00104 /// This is called when first time reconnect is attempted. The send mode 00105 /// is set to MODE_SUSPEND. Messages are queued at this state. 00106 void suspend_send(); 00107 00108 /// This is called when connection is lost and reconnect succeeds. 00109 /// The send mode is set to the mode before suspend which is either MODE_QUEUE 00110 /// or MODE_DIRECT. 00111 void resume_send(); 00112 00113 /// This is called whenver the connection is lost and reconnect fails. 00114 /// It removes all samples in the backpressure queue and packet queue. 00115 void terminate_send(bool graceful_disconnecting = false); 00116 00117 // Moved clear() declaration below as Enums can't be foward declared. 00118 00119 /// Let the subclass stop. 00120 virtual void stop_i() = 0; 00121 00122 /// Let the subclass start. 00123 virtual bool start_i() { return true; } 00124 00125 void link_released(bool flag); 00126 00127 bool isDirectMode(); 00128 00129 typedef BasicQueue<TransportQueueElement> QueueType; 00130 00131 /// Convert ACE_Message_Block chain into iovec[] entries for send(), 00132 /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). 00133 /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater. 00134 static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov); 00135 00136 // Subclasses which make use of acceptors should override 00137 // this method and return the peer handle. 00138 virtual ACE_HANDLE get_handle(); 00139 00140 void deliver_ack_request(TransportQueueElement* element); 00141 00142 protected: 00143 00144 TransportSendStrategy(std::size_t id, 00145 TransportImpl& transport, 00146 ThreadSynchResource* synch_resource, 00147 Priority priority, 00148 const ThreadSynchStrategy_rch& thread_sync_strategy); 00149 00150 00151 // Only our subclass knows how to do this. 00152 // Third arg is the "back-pressure" flag. If send_bytes() returns 00153 // -1 and the bp == 1, then it isn't really an error - it is 00154 // backpressure. 00155 virtual ssize_t send_bytes(const iovec iov[], int n, int& bp); 00156 00157 virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp); 00158 00159 virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0; 00160 00161 /// Specific implementation processing of prepared packet header. 00162 virtual void prepare_header_i(); 00163 00164 /// Specific implementation processing of prepared packet. 00165 virtual void prepare_packet_i(); 00166 00167 TransportQueueElement* current_packet_first_element() const; 00168 00169 /// The maximum size of a message allowed by the this TransportImpl, or 0 00170 /// if there is no such limit. This is expected to be a constant, for example 00171 /// UDP/IPv4 can send messages of up to 65466 bytes. 00172 /// The transport framework will use the returned value (if > 0) to 00173 /// fragment larger messages. This fragmentation and 00174 /// reassembly will be transparent to the user. 00175 virtual size_t max_message_size() const; 00176 00177 /// Put the maximum UDP payload size here so that it can be shared by all 00178 /// UDP-based transports. This is the worst-case (conservative) value for 00179 /// UDP/IPv4. If there are no IP options, or if IPv6 is used, it could 00180 /// actually be a little larger. 00181 static const size_t UDP_MAX_MESSAGE_SIZE = 65466; 00182 00183 /// Set graceful disconnecting flag. 00184 void set_graceful_disconnecting(bool flag); 00185 00186 virtual void add_delayed_notification(TransportQueueElement* element); 00187 00188 /// If delayed notifications were queued up, issue those callbacks here. 00189 /// The default match is "match all", otherwise match can be used to specify 00190 /// either a certain individual packet or a publication id. 00191 /// Returns true if anything in the delayed notification list matched. 00192 bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0); 00193 00194 private: 00195 00196 enum SendPacketOutcome { 00197 OUTCOME_COMPLETE_SEND, 00198 OUTCOME_PARTIAL_SEND, 00199 OUTCOME_BACKPRESSURE, 00200 OUTCOME_PEER_LOST, 00201 OUTCOME_SEND_ERROR 00202 }; 00203 00204 /// Called from send() when it is time to attempt to send our 00205 /// current packet to the socket while in MODE_DIRECT mode_. 00206 /// If backpressure occurs, our current packet will be adjusted 00207 /// to account for bytes that were sent, and the mode will be 00208 /// changed to MODE_QUEUE. 00209 /// If no backpressure occurs (ie, the entire packet is sent), then 00210 /// our current packet will be "reset" to be an empty packet following 00211 /// the send. 00212 void direct_send(bool relink); 00213 00214 /// This method is used while in MODE_QUEUE mode, and a new packet 00215 /// needs to be formulated using elements from the queue_. This is 00216 /// the first step of formulating the new packet. It will extract 00217 /// elements from the queue_ and insert those elements into the 00218 /// pkt_elems_ collection. 00219 /// 00220 /// After this step has been done, the prepare_packet() step can 00221 /// be performed, followed by the actual send_packet() call. 00222 void get_packet_elems_from_queue(); 00223 00224 /// This method is responsible for updating the packet header. 00225 /// Called exclusively by prepare_packet. 00226 void prepare_header(); 00227 00228 /// This method is responsible for actually "creating" the current 00229 /// send packet using the packet header and the collection of 00230 /// packet elements that are to make-up the packet's contents. 00231 void prepare_packet(); 00232 00233 /// This is called to send the current packet. The current packet 00234 /// will either be a "partially sent" packet, or a packet that has 00235 /// just been prepared via a call to prepare_packet(). 00236 SendPacketOutcome send_packet(); 00237 00238 /// Form an IOV and call the send_bytes() template method. 00239 ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp); 00240 00241 #if defined(OPENDDS_SECURITY) 00242 /// Derived classes can override to transform the data right before it's 00243 /// sent. If the returned value is non-NULL it will be sent instead of 00244 /// sending the parameter. 00245 virtual ACE_Message_Block* pre_send_packet(const ACE_Message_Block*) 00246 { 00247 return 0; 00248 } 00249 #endif 00250 00251 /// This is called from the send_packet() method after it has 00252 /// sent at least one byte from the current packet. This method 00253 /// will update the current packet appropriately, as well as deal 00254 /// with all of the release()'ing of fully sent ACE_Message_Blocks, 00255 /// and the data_delivered() calls on the fully sent elements. 00256 /// Returns 0 if the entire packet was sent, and returns 1 if 00257 /// the entire packet was not sent. 00258 int adjust_packet_after_send(ssize_t num_bytes_sent); 00259 00260 00261 /// How much space is available in the current packet before we reach one 00262 /// of the limits: max_message_size() [transport's inherent limitation] 00263 /// or max_size_ [user's configured limit] 00264 size_t space_available() const; 00265 00266 typedef ACE_SYNCH_MUTEX LockType; 00267 typedef ACE_Guard<LockType> GuardType; 00268 00269 public: 00270 enum SendMode { 00271 // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so 00272 // we can check if the resume_send is paired with suspend_send. 00273 MODE_NOT_SET, 00274 // Send out the sample with current packet. 00275 MODE_DIRECT, 00276 // The samples need be queued because of the backpressure or partial send. 00277 MODE_QUEUE, 00278 // The samples need be queued because the connection is lost and we are 00279 // trying to reconnect. 00280 MODE_SUSPEND, 00281 // The samples need be dropped since we lost connection and could not 00282 // reconnect. 00283 MODE_TERMINATED 00284 }; 00285 00286 /// Clear queued messages and messages in current packet. 00287 // The API now has a defaulted mode (the default is the same as 00288 // as the earlier hard-coded value). 00289 // Clear locks the local mutex. In certain situations its 00290 // important to set the new mode in the clear itself. 00291 // Since the default is the earlier hard-coded value, this 00292 // shouldn't have any impact. 00293 void clear(SendMode mode = MODE_DIRECT); 00294 00295 /// Access the current sending mode. 00296 SendMode mode() const; 00297 protected: 00298 /// Implement framework chain visitations to remove a sample. 00299 virtual RemoveResult do_remove_sample(const RepoId& pub_id, 00300 const TransportQueueElement::MatchCriteria& criteria, 00301 void* context); 00302 00303 private: 00304 00305 virtual bool marshal_transport_header(ACE_Message_Block* mb); 00306 00307 /// Helper function to debugging. 00308 static const char* mode_as_str(SendMode mode); 00309 00310 /// Configuration - max number of samples per transport packet 00311 size_t max_samples_; 00312 00313 /// Configuration - optimum transport packet size (bytes) 00314 ACE_UINT32 optimum_size_; 00315 00316 /// Configuration - max transport packet size (bytes) 00317 ACE_UINT32 max_size_; 00318 00319 /// Used during backpressure situations to hold samples that have 00320 /// not yet been made to be part of a transport packet, and are 00321 /// completely unsent. 00322 /// Also used as a bucket for packets which still have to become 00323 /// part of a packet. 00324 QueueType queue_; 00325 00326 /// Maximum marshalled size of the transport packet header. 00327 size_t max_header_size_; 00328 00329 /// Current transport packet header, marshalled. 00330 ACE_Message_Block* header_block_; 00331 00332 /// Current transport header sequence number. 00333 SequenceNumber header_sequence_; 00334 00335 /// Current elements that have contributed blocks to the current 00336 /// transport packet. 00337 QueueType elems_; 00338 00339 /// Current (head of chain) block containing unsent bytes for the 00340 /// current transport packet. 00341 ACE_Message_Block* pkt_chain_; 00342 00343 /// Set to false when the packet header hasn't been fully sent. 00344 /// Set to true once the packet header has been fully sent. 00345 bool header_complete_; 00346 00347 /// Counter that, when greater than zero, indicates that we still 00348 /// expect to receive a send_stop() event. 00349 /// Incremented once for each call to our send_start() method, 00350 /// and decremented once for each call to our send_stop() method. 00351 /// We only care about the transitions of the start_counter_ 00352 /// value from 0 to 1, and from 1 to 0. This accomodates the 00353 /// case where more than one TransportClient is sending to 00354 /// us at the same time. We use this counter to enable a 00355 /// "composite" send_start() and send_stop(). 00356 unsigned start_counter_; 00357 00358 /// This mode determines how send() calls will be handled. 00359 SendMode mode_; 00360 00361 /// This mode remembers the mode before send is suspended and is 00362 /// used after the send is resumed because the connection is 00363 /// re-established. 00364 SendMode mode_before_suspend_; 00365 00366 /// Used for delayed notifications when performing work. 00367 typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair; 00368 OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_; 00369 00370 /// Allocator for header data block. 00371 unique_ptr<TransportMessageBlockAllocator> header_mb_allocator_; 00372 00373 /// Allocator for header message block. 00374 unique_ptr<TransportDataBlockAllocator> header_db_allocator_; 00375 00376 /// The thread synch object. 00377 unique_ptr<ThreadSynch> synch_; 00378 00379 /// This lock will protect critical sections of code that play a 00380 /// role in the sending of data. 00381 LockType lock_; 00382 00383 /// Cached allocator for TransportReplaceElement. 00384 MessageBlockAllocator replaced_element_mb_allocator_; 00385 DataBlockAllocator replaced_element_db_allocator_; 00386 00387 TransportImpl& transport_; 00388 00389 bool graceful_disconnecting_; 00390 00391 bool link_released_; 00392 00393 TransportSendBuffer* send_buffer_; 00394 00395 // N.B. The behavior present in TransortSendBuffer should be 00396 // refactored into the TransportSendStrategy eventually; a good 00397 // amount of private state is shared between both classes. 00398 friend class TransportSendBuffer; 00399 00400 protected: 00401 ThreadSynch* synch() const; 00402 00403 /// Current transport packet header. 00404 TransportHeader header_; 00405 }; 00406 00407 } // namespace DCPS 00408 } // namespace OpenDDS 00409 00410 OPENDDS_END_VERSIONED_NAMESPACE_DECL 00411 00412 #if defined (__ACE_INLINE__) 00413 #include "TransportSendStrategy.inl" 00414 #endif /* __ACE_INLINE__ */ 00415 00416 #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */