00001 /* 00002 * 00003 * 00004 * Distributed under the OpenDDS License. 00005 * See: http://www.opendds.org/license.html 00006 */ 00007 00008 #ifndef OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H 00009 #define OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H 00010 00011 #include "dds/DCPS/dcps_export.h" 00012 #include "dds/DCPS/Definitions.h" 00013 #include "dds/DCPS/RcObject_T.h" 00014 #include "dds/DCPS/PoolAllocator.h" 00015 #include "ThreadSynchWorker.h" 00016 #include "TransportDefs.h" 00017 #include "BasicQueue_T.h" 00018 #include "TransportHeader.h" 00019 #include "TransportReplacedElement.h" 00020 #include "TransportRetainedElement.h" 00021 #include "TransportInst_rch.h" 00022 #include "ThreadSynchStrategy_rch.h" 00023 00024 #include "ace/Synch.h" 00025 00026 namespace OpenDDS { 00027 namespace DCPS { 00028 00029 class ThreadSynch; 00030 class ThreadSynchResource; 00031 class TransportQueueElement; 00032 class TransportSendElement; 00033 class TransportSendBuffer; 00034 class DataSampleElement; 00035 class QueueRemoveVisitor; 00036 class PacketRemoveVisitor; 00037 00038 /** 00039 * This class provides methods to fill packets with samples for sending 00040 * and handles backpressure. It maintains the list of samples in current 00041 * packets and also the list of samples queued during backpressure. A thread 00042 * per connection is created to handle the queued samples. 00043 * 00044 * Notes for the object ownership: 00045 * 1) Owns ThreadSynch object, list of samples in current packet and list 00046 * of samples in queue. 00047 */ 00048 class OpenDDS_Dcps_Export TransportSendStrategy 00049 : public RcObject<ACE_SYNCH_MUTEX>, 00050 public ThreadSynchWorker { 00051 public: 00052 virtual ~TransportSendStrategy(); 00053 00054 /// Assigns an optional send buffer. 00055 void send_buffer(TransportSendBuffer* send_buffer); 00056 00057 /// Start the TransportSendStrategy. This happens once, when 00058 /// the DataLink that "owns" this strategy object has established 00059 /// a connection. 00060 int start(); 00061 00062 /// Stop the TransportSendStrategy. This happens once, when the 00063 /// DataLink that "owns" this strategy object is going away. 00064 void stop(); 00065 00066 /// Invoked prior to one or more send() invocations from a particular 00067 /// TransportClient. 00068 void send_start(); 00069 00070 /// Our DataLink has been requested by some particular 00071 /// TransportClient to send the element. 00072 void send(TransportQueueElement* element, bool relink = true); 00073 00074 /// Invoked after one or more send() invocations from a particular 00075 /// TransportClient. 00076 void send_stop(RepoId repoId); 00077 00078 /// Our DataLink has been requested by some particular 00079 /// TransportClient to remove the supplied sample 00080 /// (basically, an "unsend" attempt) from this strategy object. 00081 RemoveResult remove_sample(const DataSampleElement* sample); 00082 00083 void remove_all_msgs(RepoId pub_id); 00084 00085 /// Called by our ThreadSynch object when we should be able to 00086 /// start sending any partial packet bytes and/or compose a new 00087 /// packet using elements from the queue_. 00088 /// 00089 /// Returns 0 to indicate that the ThreadSynch object doesn't need 00090 /// to call perform_work() again since the queue (and any unsent 00091 /// packet bytes) has been drained, and the mode_ has been switched 00092 /// to MODE_DIRECT. 00093 /// 00094 /// Returns 1 to indicate that there is more work to do, and the 00095 /// ThreadSynch object should have this perform_work() method 00096 /// called again. 00097 virtual WorkOutcome perform_work(); 00098 00099 /// The subclass needs to provide the implementation 00100 /// for re-establishing the datalink. This is called 00101 /// when send returns an error. 00102 virtual void relink(bool do_suspend = true); 00103 00104 /// This is called when first time reconnect is attempted. The send mode 00105 /// is set to MODE_SUSPEND. Messages are queued at this state. 00106 void suspend_send(); 00107 00108 /// This is called when connection is lost and reconnect succeeds. 00109 /// The send mode is set to the mode before suspend which is either MODE_QUEUE 00110 /// or MODE_DIRECT. 00111 void resume_send(); 00112 00113 /// This is called whenver the connection is lost and reconnect fails. 00114 /// It removes all samples in the backpressure queue and packet queue. 00115 void terminate_send(bool graceful_disconnecting = false); 00116 00117 // Moved clear() declaration below as Enums can't be foward declared. 00118 00119 /// Let the subclass stop. 00120 virtual void stop_i() = 0; 00121 00122 /// Let the subclass start. 00123 virtual bool start_i() { return true; } 00124 00125 void link_released(bool flag); 00126 00127 bool isDirectMode(); 00128 00129 /// Informed transport shutdown so no more notifications to 00130 /// listener. 00131 void transport_shutdown(); 00132 00133 typedef BasicQueue<TransportQueueElement> QueueType; 00134 00135 /// Convert ACE_Message_Block chain into iovec[] entries for send(), 00136 /// returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). 00137 /// Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater. 00138 static int mb_to_iov(const ACE_Message_Block& msg, iovec* iov); 00139 00140 // Subclasses which make use of acceptors should override 00141 // this method and return the peer handle. 00142 virtual ACE_HANDLE get_handle(); 00143 00144 protected: 00145 00146 TransportSendStrategy(std::size_t id, 00147 const TransportInst_rch& transport_inst, 00148 ThreadSynchResource* synch_resource, 00149 Priority priority, 00150 const ThreadSynchStrategy_rch& thread_sync_strategy); 00151 00152 // Only our subclass knows how to do this. 00153 // Third arg is the "back-pressure" flag. If send_bytes() returns 00154 // -1 and the bp == 1, then it isn't really an error - it is 00155 // backpressure. 00156 virtual ssize_t send_bytes(const iovec iov[], int n, int& bp); 00157 00158 virtual ssize_t non_blocking_send(const iovec iov[], int n, int& bp); 00159 00160 virtual ssize_t send_bytes_i(const iovec iov[], int n) = 0; 00161 00162 /// Specific implementation processing of prepared packet header. 00163 virtual void prepare_header_i(); 00164 00165 /// Specific implementation processing of prepared packet. 00166 virtual void prepare_packet_i(); 00167 00168 TransportQueueElement* current_packet_first_element() const; 00169 00170 /// The maximum size of a message allowed by the this TransportImpl, or 0 00171 /// if there is no such limit. This is expected to be a constant, for example 00172 /// UDP/IPv4 can send messages of up to 65466 bytes. 00173 /// The transport framework will use the returned value (if > 0) to 00174 /// fragment larger messages. This fragmentation and 00175 /// reassembly will be transparent to the user. 00176 virtual size_t max_message_size() const; 00177 00178 /// Put the maximum UDP payload size here so that it can be shared by all 00179 /// UDP-based transports. This is the worst-case (conservative) value for 00180 /// UDP/IPv4. If there are no IP options, or if IPv6 is used, it could 00181 /// actually be a little larger. 00182 static const size_t UDP_MAX_MESSAGE_SIZE = 65466; 00183 00184 /// Set graceful disconnecting flag. 00185 void set_graceful_disconnecting(bool flag); 00186 00187 virtual void add_delayed_notification(TransportQueueElement* element); 00188 00189 private: 00190 00191 enum SendPacketOutcome { 00192 OUTCOME_COMPLETE_SEND, 00193 OUTCOME_PARTIAL_SEND, 00194 OUTCOME_BACKPRESSURE, 00195 OUTCOME_PEER_LOST, 00196 OUTCOME_SEND_ERROR 00197 }; 00198 00199 /// Called from send() when it is time to attempt to send our 00200 /// current packet to the socket while in MODE_DIRECT mode_. 00201 /// If backpressure occurs, our current packet will be adjusted 00202 /// to account for bytes that were sent, and the mode will be 00203 /// changed to MODE_QUEUE. 00204 /// If no backpressure occurs (ie, the entire packet is sent), then 00205 /// our current packet will be "reset" to be an empty packet following 00206 /// the send. 00207 void direct_send(bool relink); 00208 00209 /// This method is used while in MODE_QUEUE mode, and a new packet 00210 /// needs to be formulated using elements from the queue_. This is 00211 /// the first step of formulating the new packet. It will extract 00212 /// elements from the queue_ and insert those elements into the 00213 /// pkt_elems_ collection. 00214 /// 00215 /// After this step has been done, the prepare_packet() step can 00216 /// be performed, followed by the actual send_packet() call. 00217 void get_packet_elems_from_queue(); 00218 00219 /// This method is responsible for updating the packet header. 00220 /// Called exclusively by prepare_packet. 00221 void prepare_header(); 00222 00223 /// This method is responsible for actually "creating" the current 00224 /// send packet using the packet header and the collection of 00225 /// packet elements that are to make-up the packet's contents. 00226 void prepare_packet(); 00227 00228 /// This is called to send the current packet. The current packet 00229 /// will either be a "partially sent" packet, or a packet that has 00230 /// just been prepared via a call to prepare_packet(). 00231 SendPacketOutcome send_packet(); 00232 00233 /// Form an IOV and call the send_bytes() template method. 00234 ssize_t do_send_packet(const ACE_Message_Block* packet, int& bp); 00235 00236 /// This is called from the send_packet() method after it has 00237 /// sent at least one byte from the current packet. This method 00238 /// will update the current packet appropriately, as well as deal 00239 /// with all of the release()'ing of fully sent ACE_Message_Blocks, 00240 /// and the data_delivered() calls on the fully sent elements. 00241 /// Returns 0 if the entire packet was sent, and returns 1 if 00242 /// the entire packet was not sent. 00243 int adjust_packet_after_send(ssize_t num_bytes_sent); 00244 00245 /// If delayed notifications were queued up, issue those callbacks here. 00246 /// The default match is "match all", otherwise match can be used to specify 00247 /// either a certain individual packet or a publication id. 00248 /// Returns true if anything in the delayed notification list matched. 00249 bool send_delayed_notifications(const TransportQueueElement::MatchCriteria* match = 0); 00250 00251 /// How much space is available in the current packet before we reach one 00252 /// of the limits: max_message_size() [transport's inherent limitation] 00253 /// or max_size_ [user's configured limit] 00254 size_t space_available() const; 00255 00256 typedef ACE_SYNCH_MUTEX LockType; 00257 typedef ACE_Guard<LockType> GuardType; 00258 00259 public: 00260 enum SendMode { 00261 // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so 00262 // we can check if the resume_send is paired with suspend_send. 00263 MODE_NOT_SET, 00264 // Send out the sample with current packet. 00265 MODE_DIRECT, 00266 // The samples need be queued because of the backpressure or partial send. 00267 MODE_QUEUE, 00268 // The samples need be queued because the connection is lost and we are 00269 // trying to reconnect. 00270 MODE_SUSPEND, 00271 // The samples need be dropped since we lost connection and could not 00272 // reconnect. 00273 MODE_TERMINATED 00274 }; 00275 00276 /// Clear queued messages and messages in current packet. 00277 // The API now has a defaulted mode (the default is the same as 00278 // as the earlier hard-coded value). 00279 // Clear locks the local mutex. In certain situations its 00280 // important to set the new mode in the clear itself. 00281 // Since the default is the earlier hard-coded value, this 00282 // shouldn't have any impact. 00283 void clear(SendMode mode = MODE_DIRECT); 00284 00285 /// Access the current sending mode. 00286 SendMode mode() const; 00287 protected: 00288 /// Implement framework chain visitations to remove a sample. 00289 virtual RemoveResult do_remove_sample(const RepoId& pub_id, 00290 const TransportQueueElement::MatchCriteria& criteria); 00291 00292 private: 00293 00294 virtual void marshal_transport_header(ACE_Message_Block* mb); 00295 00296 /// Helper function to debugging. 00297 static const char* mode_as_str(SendMode mode); 00298 00299 /// Configuration - max number of samples per transport packet 00300 size_t max_samples_; 00301 00302 /// Configuration - optimum transport packet size (bytes) 00303 ACE_UINT32 optimum_size_; 00304 00305 /// Configuration - max transport packet size (bytes) 00306 ACE_UINT32 max_size_; 00307 00308 /// Used during backpressure situations to hold samples that have 00309 /// not yet been made to be part of a transport packet, and are 00310 /// completely unsent. 00311 /// Also used as a bucket for packets which still have to become 00312 /// part of a packet. 00313 QueueType* queue_; 00314 00315 /// Maximum marshalled size of the transport packet header. 00316 size_t max_header_size_; 00317 00318 /// Current transport packet header, marshalled. 00319 ACE_Message_Block* header_block_; 00320 00321 /// Current transport header sequence number. 00322 SequenceNumber header_sequence_; 00323 00324 /// Current elements that have contributed blocks to the current 00325 /// transport packet. 00326 QueueType* elems_; 00327 00328 /// Current (head of chain) block containing unsent bytes for the 00329 /// current transport packet. 00330 ACE_Message_Block* pkt_chain_; 00331 00332 /// Set to false when the packet header hasn't been fully sent. 00333 /// Set to true once the packet header has been fully sent. 00334 bool header_complete_; 00335 00336 /// Counter that, when greater than zero, indicates that we still 00337 /// expect to receive a send_stop() event. 00338 /// Incremented once for each call to our send_start() method, 00339 /// and decremented once for each call to our send_stop() method. 00340 /// We only care about the transitions of the start_counter_ 00341 /// value from 0 to 1, and from 1 to 0. This accomodates the 00342 /// case where more than one TransportClient is sending to 00343 /// us at the same time. We use this counter to enable a 00344 /// "composite" send_start() and send_stop(). 00345 unsigned start_counter_; 00346 00347 /// This mode determines how send() calls will be handled. 00348 SendMode mode_; 00349 00350 /// This mode remembers the mode before send is suspended and is 00351 /// used after the send is resumed because the connection is 00352 /// re-established. 00353 SendMode mode_before_suspend_; 00354 00355 /// Used for delayed notifications when performing work. 00356 typedef std::pair<TransportQueueElement*, SendMode> TQESendModePair; 00357 OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_; 00358 00359 /// Allocator for header data block. 00360 TransportMessageBlockAllocator* header_mb_allocator_; 00361 00362 /// Allocator for header message block. 00363 TransportDataBlockAllocator* header_db_allocator_; 00364 00365 /// The thread synch object. 00366 ThreadSynch* synch_; 00367 00368 /// This lock will protect critical sections of code that play a 00369 /// role in the sending of data. 00370 LockType lock_; 00371 00372 /// Cached allocator for TransportReplaceElement. 00373 TransportReplacedElementAllocator replaced_element_allocator_; 00374 MessageBlockAllocator replaced_element_mb_allocator_; 00375 DataBlockAllocator replaced_element_db_allocator_; 00376 00377 /// Cached allocator for TransportRetainedElements used by reliable 00378 /// datagram transports to retain PDUs after they have been sent. This 00379 /// is created in start if the transport needs it. 00380 TransportRetainedElementAllocator* retained_element_allocator_; 00381 00382 TransportInst_rch transport_inst_; 00383 00384 bool graceful_disconnecting_; 00385 00386 bool link_released_; 00387 00388 TransportSendBuffer* send_buffer_; 00389 00390 // N.B. The behavior present in TransortSendBuffer should be 00391 // refactored into the TransportSendStrategy eventually; a good 00392 // amount of private state is shared between both classes. 00393 friend class TransportSendBuffer; 00394 00395 bool transport_shutdown_; 00396 00397 protected: 00398 ThreadSynch* synch() const; 00399 00400 /// Current transport packet header. 00401 TransportHeader header_; 00402 }; 00403 00404 } // namespace DCPS 00405 } // namespace OpenDDS 00406 00407 #if defined (__ACE_INLINE__) 00408 #include "TransportSendStrategy.inl" 00409 #endif /* __ACE_INLINE__ */ 00410 00411 #endif /* OPENDDS_DCPS_TRANSPORTSENDSTRATEGY_H */