#include <TransportSendStrategy.h>
Public Types | |
enum | SendMode { MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED } |
typedef BasicQueue < TransportQueueElement > | QueueType |
Public Member Functions | |
virtual | ~TransportSendStrategy () |
void | send_buffer (TransportSendBuffer *send_buffer) |
Assigns an optional send buffer. | |
int | start () |
void | stop () |
void | send_start () |
void | send (TransportQueueElement *element, bool relink=true) |
void | send_stop (RepoId repoId) |
RemoveResult | remove_sample (const DataSampleElement *sample, void *context) |
void | remove_all_msgs (RepoId pub_id) |
virtual WorkOutcome | perform_work () |
virtual void | relink (bool do_suspend=true) |
void | suspend_send () |
void | resume_send () |
void | terminate_send (bool graceful_disconnecting=false) |
Remove all samples in the backpressure queue and packet queue. | |
virtual void | stop_i ()=0 |
Let the subclass stop. | |
virtual bool | start_i () |
Let the subclass start. | |
void | link_released (bool flag) |
bool | isDirectMode () |
virtual ACE_HANDLE | get_handle () |
void | deliver_ack_request (TransportQueueElement *element) |
void | clear (SendMode mode=MODE_DIRECT) |
Clear queued messages and messages in current packet. | |
SendMode | mode () const |
Access the current sending mode. | |
Static Public Member Functions | |
static int | mb_to_iov (const ACE_Message_Block &msg, iovec *iov) |
Protected Member Functions | |
TransportSendStrategy (std::size_t id, TransportImpl &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy) | |
virtual ssize_t | send_bytes (const iovec iov[], int n, int &bp) |
virtual ssize_t | non_blocking_send (const iovec iov[], int n, int &bp) |
virtual ssize_t | send_bytes_i (const iovec iov[], int n)=0 |
virtual void | prepare_header_i () |
Specific implementation processing of prepared packet header. | |
virtual void | prepare_packet_i () |
Specific implementation processing of prepared packet. | |
TransportQueueElement * | current_packet_first_element () const |
virtual size_t | max_message_size () const |
void | set_graceful_disconnecting (bool flag) |
Set graceful disconnecting flag. | |
virtual void | add_delayed_notification (TransportQueueElement *element) |
bool | send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0) |
virtual RemoveResult | do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria, void *context) |
Implement framework chain visitations to remove a sample. | |
ThreadSynch * | synch () const |
Protected Attributes | |
TransportHeader | header_ |
Current transport packet header. | |
Static Protected Attributes | |
static const size_t | UDP_MAX_MESSAGE_SIZE = 65466 |
Private Types | |
enum | SendPacketOutcome { OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR } |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef std::pair < TransportQueueElement *, SendMode > | TQESendModePair |
Used for delayed notifications when performing work. | |
Private Member Functions | |
void | direct_send (bool relink) |
void | get_packet_elems_from_queue () |
void | prepare_header () |
void | prepare_packet () |
SendPacketOutcome | send_packet () |
ssize_t | do_send_packet (const ACE_Message_Block *packet, int &bp) |
Form an IOV and call the send_bytes() template method. | |
int | adjust_packet_after_send (ssize_t num_bytes_sent) |
size_t | space_available () const |
virtual bool | marshal_transport_header (ACE_Message_Block *mb) |
OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_ | |
Static Private Member Functions | |
static const char * | mode_as_str (SendMode mode) |
Helper function to debugging. | |
Private Attributes | |
size_t | max_samples_ |
Configuration - max number of samples per transport packet. | |
ACE_UINT32 | optimum_size_ |
Configuration - optimum transport packet size (bytes). | |
ACE_UINT32 | max_size_ |
Configuration - max transport packet size (bytes). | |
QueueType | queue_ |
size_t | max_header_size_ |
Maximum marshalled size of the transport packet header. | |
ACE_Message_Block * | header_block_ |
Current transport packet header, marshalled. | |
SequenceNumber | header_sequence_ |
Current transport header sequence number. | |
QueueType | elems_ |
ACE_Message_Block * | pkt_chain_ |
bool | header_complete_ |
unsigned | start_counter_ |
SendMode | mode_ |
This mode determines how send() calls will be handled. | |
SendMode | mode_before_suspend_ |
unique_ptr < TransportMessageBlockAllocator > | header_mb_allocator_ |
Allocator for header data block. | |
unique_ptr < TransportDataBlockAllocator > | header_db_allocator_ |
Allocator for header message block. | |
unique_ptr< ThreadSynch > | synch_ |
The thread synch object. | |
LockType | lock_ |
MessageBlockAllocator | replaced_element_mb_allocator_ |
Cached allocator for TransportReplaceElement. | |
DataBlockAllocator | replaced_element_db_allocator_ |
TransportImpl & | transport_ |
bool | graceful_disconnecting_ |
bool | link_released_ |
TransportSendBuffer * | send_buffer_ |
Friends | |
class | TransportSendBuffer |
This class provides methods to fill packets with samples for sending and handles backpressure. It maintains the list of samples in current packets and also the list of samples queued during backpressure. A thread per connection is created to handle the queued samples.
Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.
Definition at line 49 of file TransportSendStrategy.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportSendStrategy::GuardType [private] |
Definition at line 267 of file TransportSendStrategy.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportSendStrategy::LockType [private] |
Definition at line 266 of file TransportSendStrategy.h.
Definition at line 129 of file TransportSendStrategy.h.
typedef std::pair<TransportQueueElement*, SendMode> OpenDDS::DCPS::TransportSendStrategy::TQESendModePair [private] |
Used for delayed notifications when performing work.
Definition at line 367 of file TransportSendStrategy.h.
Definition at line 270 of file TransportSendStrategy.h.
00270 { 00271 // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so 00272 // we can check if the resume_send is paired with suspend_send. 00273 MODE_NOT_SET, 00274 // Send out the sample with current packet. 00275 MODE_DIRECT, 00276 // The samples need be queued because of the backpressure or partial send. 00277 MODE_QUEUE, 00278 // The samples need be queued because the connection is lost and we are 00279 // trying to reconnect. 00280 MODE_SUSPEND, 00281 // The samples need be dropped since we lost connection and could not 00282 // reconnect. 00283 MODE_TERMINATED 00284 };
enum OpenDDS::DCPS::TransportSendStrategy::SendPacketOutcome [private] |
OUTCOME_COMPLETE_SEND | |
OUTCOME_PARTIAL_SEND | |
OUTCOME_BACKPRESSURE | |
OUTCOME_PEER_LOST | |
OUTCOME_SEND_ERROR |
Definition at line 196 of file TransportSendStrategy.h.
00196 { 00197 OUTCOME_COMPLETE_SEND, 00198 OUTCOME_PARTIAL_SEND, 00199 OUTCOME_BACKPRESSURE, 00200 OUTCOME_PEER_LOST, 00201 OUTCOME_SEND_ERROR 00202 };
OpenDDS::DCPS::TransportSendStrategy::~TransportSendStrategy | ( | ) | [virtual] |
Definition at line 103 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
00104 { 00105 DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6); 00106 00107 00108 this->delayed_delivered_notification_queue_.clear(); 00109 }
OpenDDS::DCPS::TransportSendStrategy::TransportSendStrategy | ( | std::size_t | id, | |
TransportImpl & | transport, | |||
ThreadSynchResource * | synch_resource, | |||
Priority | priority, | |||
const ThreadSynchStrategy_rch & | thread_sync_strategy | |||
) | [protected] |
Definition at line 58 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, max_header_size_, OpenDDS::DCPS::TransportHeader::max_marshaled_size(), max_samples_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), synch_, TheServiceParticipant, and OpenDDS::DCPS::DirectPriorityMapper::thread_priority().
00064 : ThreadSynchWorker(id), 00065 max_samples_(transport.config().max_samples_per_packet_), 00066 optimum_size_(transport.config().optimum_packet_size_), 00067 max_size_(transport.config().max_packet_size_), 00068 max_header_size_(0), 00069 header_block_(0), 00070 pkt_chain_(0), 00071 header_complete_(false), 00072 start_counter_(0), 00073 mode_(MODE_DIRECT), 00074 mode_before_suspend_(MODE_NOT_SET), 00075 lock_(), 00076 replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2), 00077 replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2), 00078 transport_(transport), 00079 graceful_disconnecting_(false), 00080 link_released_(true), 00081 send_buffer_(0) 00082 { 00083 DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6); 00084 00085 // Create a ThreadSynch object just for us. 00086 DirectPriorityMapper mapper(priority); 00087 this->synch_.reset(thread_sync_strategy->create_synch_object( 00088 synch_resource, 00089 #ifdef ACE_WIN32 00090 ACE_DEFAULT_THREAD_PRIORITY, 00091 #else 00092 mapper.thread_priority(), 00093 #endif 00094 TheServiceParticipant->scheduler())); 00095 00096 // We cache this value in data member since it doesn't change, and we 00097 // don't want to keep asking for it over and over. 00098 this->max_header_size_ = TransportHeader::max_marshaled_size(); 00099 00100 delayed_delivered_notification_queue_.reserve(this->max_samples_); 00101 }
void OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification | ( | TransportQueueElement * | element | ) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::TcpSendStrategy.
Definition at line 1857 of file TransportSendStrategy.cpp.
References LM_DEBUG, max_samples_, mode_, size, and OpenDDS::DCPS::Transport_debug_level.
Referenced by adjust_packet_after_send(), and send().
01858 { 01859 if (Transport_debug_level) { 01860 size_t size = this->delayed_delivered_notification_queue_.size(); 01861 if ((size > 0) && (size % this->max_samples_ == 0)) { 01862 ACE_DEBUG((LM_DEBUG, 01863 "(%P|%t) Transport send strategy notification queue threshold, size=%d\n", 01864 size)); 01865 } 01866 } 01867 01868 this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_)); 01869 }
int OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send | ( | ssize_t | num_bytes_sent | ) | [private] |
This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.
Definition at line 353 of file TransportSendStrategy.cpp.
References add_delayed_notification(), ACE_Message_Block::base(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_INFO, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), and VDBG.
Referenced by clear(), and send_packet().
00354 { 00355 DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6); 00356 00357 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00358 "Adjusting the current packet because %d bytes of the packet " 00359 "have been sent.\n", num_bytes_sent)); 00360 00361 ssize_t num_bytes_left = num_bytes_sent; 00362 ssize_t num_non_header_bytes_sent = 0; 00363 00364 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00365 "Set num_bytes_left to %d.\n", num_bytes_left)); 00366 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00367 "Set num_non_header_bytes_sent to %d.\n", 00368 num_non_header_bytes_sent)); 00369 00370 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00371 "Peek at the element at the front of the packet elems_.\n")); 00372 00373 // This is the element currently at the front of elems_. 00374 TransportQueueElement* element = this->elems_.peek(); 00375 00376 if(!element){ 00377 ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n")); 00378 } else { 00379 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00380 "Use the element's msg() to find the last block in " 00381 "the msg() chain.\n")); 00382 00383 // Get a pointer to the last message block in the element. 00384 const ACE_Message_Block* elem_tail_block = element->msg(); 00385 00386 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00387 "Start with tail block == element->msg().\n")); 00388 00389 while (elem_tail_block->cont() != 0) { 00390 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00391 "Set tail block to its cont() block (next in chain).\n")); 00392 elem_tail_block = elem_tail_block->cont(); 00393 } 00394 00395 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00396 "Tail block now set (because tail block's cont() is 0).\n")); 00397 00398 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00399 "Start the 'while (num_bytes_left > 0)' loop.\n")); 00400 00401 while (num_bytes_left > 0) { 00402 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00403 "At top of 'num bytes left' loop. num_bytes_left == [%d].\n", 00404 num_bytes_left)); 00405 00406 const int block_length = static_cast<int>(this->pkt_chain_->length()); 00407 00408 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00409 "Length of block at front of pkt_chain_ is [%d].\n", 00410 block_length)); 00411 00412 if (block_length <= num_bytes_left) { 00413 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00414 "The whole block at the front of pkt_chain_ was sent.\n")); 00415 00416 // The entire message block at the front of the chain has been sent. 00417 // Detach the head message block from the chain and adjust 00418 // the pkt_chain_ to point to the next block (if any) in 00419 // the chain. 00420 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00421 "Extract the fully sent block from the pkt_chain_.\n")); 00422 00423 ACE_Message_Block* fully_sent_block = this->pkt_chain_; 00424 00425 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00426 "Set pkt_chain_ to pkt_chain_->cont().\n")); 00427 00428 this->pkt_chain_ = this->pkt_chain_->cont(); 00429 00430 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00431 "Set the fully sent block's cont() to 0.\n")); 00432 00433 fully_sent_block->cont(0); 00434 00435 // Update the num_bytes_left to indicate that we have 00436 // processed the entire length of the block. 00437 num_bytes_left -= block_length; 00438 00439 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00440 "Updated num_bytes_left to account for fully sent " 00441 "block (block_length == [%d]).\n", block_length)); 00442 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00443 "Now, num_bytes_left == [%d].\n", num_bytes_left)); 00444 00445 if (!this->header_complete_) { 00446 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00447 "Since the header_complete_ flag is false, it means " 00448 "that the packet header block was still in the " 00449 "pkt_chain_.\n")); 00450 00451 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00452 "Not anymore... Set the header_complete_ flag " 00453 "to true.\n")); 00454 00455 // That was the packet header block. And now we know that it 00456 // has been completely sent. 00457 this->header_complete_ = true; 00458 00459 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00460 "Release the fully sent block.\n")); 00461 00462 // Release the fully_sent_block 00463 fully_sent_block->release(); 00464 00465 } else { 00466 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00467 "Since the header_complete_ flag is true, it means " 00468 "that the packet header block was not in the " 00469 "pkt_chain_.\n")); 00470 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00471 "So, the fully sent block was part of an element.\n")); 00472 00473 // That wasn't the packet header block. It was from the 00474 // element currently at the front of the elems_ 00475 // collection. If it was the last block from the 00476 // element, then we need to extract the element from the 00477 // elems_ collection and invoke data_delivered() on it. 00478 num_non_header_bytes_sent += block_length; 00479 00480 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00481 "Updated num_non_header_bytes_sent to account for " 00482 "fully sent block (block_length == [%d]).\n", 00483 block_length)); 00484 00485 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00486 "Now, num_non_header_bytes_sent == [%d].\n", 00487 num_non_header_bytes_sent)); 00488 00489 if (fully_sent_block->base() == elem_tail_block->base()) { 00490 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00491 "Ok. The fully sent block was a duplicate of " 00492 "the tail block of the element that is at the " 00493 "front of the packet elems_.\n")); 00494 00495 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00496 "This means that we have completely sent the " 00497 "element at the front of the packet elems_.\n")); 00498 00499 // This means that we have completely sent the element 00500 // that is currently at the front of the elems_ collection. 00501 00502 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00503 "We can release the fully sent block now.\n")); 00504 00505 // Release the fully_sent_block 00506 fully_sent_block->release(); 00507 00508 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00509 "We can extract the element from the front of " 00510 "the packet elems_ (we were just peeking).\n")); 00511 00512 // Extract the element from the elems_ collection 00513 element = this->elems_.get(); 00514 00515 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00516 "Tell the element that a decision has been made " 00517 "regarding its fate - data_delivered().\n")); 00518 00519 // Inform the element that the data has been delivered. 00520 this->add_delayed_notification(element); 00521 00522 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00523 "Peek at the next element in the packet " 00524 "elems_.\n")); 00525 00526 // Set up for the next element in elems_ by peek()'ing. 00527 element = this->elems_.peek(); 00528 00529 if (element != 0) { 00530 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00531 "The is an element still in the packet " 00532 "elems_ (we are peeking at it now).\n")); 00533 00534 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00535 "We are going to find the tail block for the " 00536 "current element (we are peeking at).\n")); 00537 00538 // There was a "next element". Determine the 00539 // elem_tail_block for it. 00540 elem_tail_block = element->msg(); 00541 00542 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00543 "Start w/tail block == element->msg().\n")); 00544 00545 while (elem_tail_block->cont() != 0) { 00546 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00547 "Set tail block to next in chain.\n")); 00548 elem_tail_block = elem_tail_block->cont(); 00549 } 00550 00551 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00552 "Done finding tail block.\n")); 00553 } 00554 00555 } else { 00556 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00557 "Ok. The fully sent block is *not* a " 00558 "duplicate of the tail block of the element " 00559 "at the front of the packet elems_.\n")); 00560 00561 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00562 "Thus, we have not completely sent the " 00563 "element yet.\n")); 00564 00565 // We didn't completely send the element - it has more 00566 // message blocks that haven't been sent (that we know of). 00567 00568 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00569 "We can release the fully_sent_block now.\n")); 00570 00571 // Release the fully_sent_block 00572 fully_sent_block->release(); 00573 } 00574 } 00575 00576 } else { 00577 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00578 "Only part of the block at the front of pkt_chain_ " 00579 "was sent.\n")); 00580 00581 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00582 "Advance the rd_ptr() of the front block (of pkt_chain_) " 00583 "by the num_bytes_left (%d).\n", num_bytes_left)); 00584 00585 // Only part of the current block was sent. 00586 this->pkt_chain_->rd_ptr(num_bytes_left); 00587 00588 if (this->header_complete_) { 00589 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00590 "And since the packet header block has already been " 00591 "completely sent, add num_bytes_left to the " 00592 "num_non_header_bytes_sent.\n")); 00593 00594 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00595 "Before, num_non_header_bytes_sent == %d.\n", 00596 num_non_header_bytes_sent)); 00597 00598 // We know that the current block isn't the packet header 00599 // block because the packet header block has already been 00600 // completely sent. We need to count these bytes in the 00601 // num_non_header_bytes_sent. 00602 num_non_header_bytes_sent += num_bytes_left; 00603 00604 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00605 "After, num_non_header_bytes_sent == %d.\n", 00606 num_non_header_bytes_sent)); 00607 } 00608 00609 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00610 "Set the num_bytes_left to 0 now.\n")); 00611 00612 num_bytes_left = 0; 00613 } 00614 } 00615 } 00616 00617 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00618 "The 'num_bytes_left' loop has completed.\n")); 00619 00620 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00621 "Adjust the header_.length_ to account for the " 00622 "num_non_header_bytes_sent.\n")); 00623 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00624 "Before, header_.length_ == %d.\n", 00625 this->header_.length_)); 00626 00627 // Adjust the packet header_.length_ to indicate how many non header 00628 // bytes are left to send. 00629 this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent); 00630 00631 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00632 "After, header_.length_ == %d.\n", 00633 this->header_.length_)); 00634 00635 // Returns 0 if the entire packet was sent, and returns 1 otherwise. 00636 int rc = (this->header_.length_ == 0) ? 0 : 1; 00637 00638 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00639 "Adjustments all done. Returning [%d]. 0 means entire packet " 00640 "has been sent. 1 means otherwise.\n", 00641 rc)); 00642 00643 return rc; 00644 }
void OpenDDS::DCPS::TransportSendStrategy::clear | ( | SendMode | mode = MODE_DIRECT |
) |
Clear queued messages and messages in current packet.
Definition at line 759 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), ACE_Message_Block::total_length(), and VDBG.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), and terminate_send().
00760 { 00761 DBG_ENTRY_LVL("TransportSendStrategy","clear",6); 00762 00763 send_delayed_notifications(); 00764 QueueType elems; 00765 QueueType queue; 00766 { 00767 GuardType guard(this->lock_); 00768 00769 if (this->header_.length_ > 0) { 00770 // Clear the messages in the pkt_chain_ that is partially sent. 00771 // We just reuse these functions for normal partial send except actual sending. 00772 int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length()); 00773 int result = this->adjust_packet_after_send(num_bytes_left); 00774 00775 if (result == 0) { 00776 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00777 "The adjustment logic says that the packet is cleared.\n")); 00778 00779 } else { 00780 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00781 "The adjustment returned partial sent.\n")); 00782 } 00783 } 00784 00785 elems.swap(this->elems_); 00786 queue.swap(this->queue_); 00787 00788 this->header_.length_ = 0; 00789 this->pkt_chain_ = 0; 00790 this->header_complete_ = false; 00791 this->start_counter_ = 0; 00792 this->mode_ = mode; 00793 this->mode_before_suspend_ = MODE_NOT_SET; 00794 } 00795 00796 // We need remove the queued elements outside the lock, 00797 // otherwise we have a deadlock situation when remove vistor 00798 // calls the data_droped on each dropped elements. 00799 00800 // Clear all samples in queue. 00801 RemoveAllVisitor remove_all_visitor; 00802 00803 elems.accept_remove_visitor(remove_all_visitor); 00804 queue.accept_remove_visitor(remove_all_visitor); 00805 }
ACE_INLINE OpenDDS::DCPS::TransportQueueElement * OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element | ( | ) | const [protected] |
Definition at line 143 of file TransportSendStrategy.inl.
References elems_, and OpenDDS::DCPS::BasicQueue< T >::peek().
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().
void OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request | ( | TransportQueueElement * | element | ) |
Definition at line 1872 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), and lock_.
void OpenDDS::DCPS::TransportSendStrategy::direct_send | ( | bool | relink | ) | [private] |
Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.
Definition at line 1414 of file TransportSendStrategy.cpp.
References ACE_TEXT(), OpenDDS::DCPS::TransportImpl::config(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportInst::dump(), LM_DEBUG, LM_WARNING, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), transport_, OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.
Referenced by send(), and send_stop().
01415 { 01416 DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6); 01417 01418 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01419 "Prepare the current packet for a direct send attempt.\n")); 01420 01421 // Prepare the packet for sending. 01422 this->prepare_packet(); 01423 01424 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01425 "Now attempt to send the packet.\n")); 01426 01427 // We will try resend the packet if the send() fails and then connection 01428 // is re-established. Only loops if the "continue" line is hit. 01429 while (true) { 01430 // Attempt to send the packet 01431 const SendPacketOutcome outcome = this->send_packet(); 01432 01433 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01434 "The outcome of the send_packet() was %d.\n", outcome)); 01435 01436 if ((outcome == OUTCOME_BACKPRESSURE) || 01437 (outcome == OUTCOME_PARTIAL_SEND)) { 01438 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01439 "The outcome of the send_packet() was either " 01440 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5); 01441 01442 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01443 "Flip into the MODE_QUEUE mode_.\n"), 5); 01444 01445 // We encountered backpressure, or only sent part of the packet. 01446 this->mode_ = MODE_QUEUE; 01447 01448 } else if ((outcome == OUTCOME_PEER_LOST) || 01449 (outcome == OUTCOME_SEND_ERROR)) { 01450 if (outcome == OUTCOME_SEND_ERROR) { 01451 ACE_ERROR((LM_WARNING, 01452 ACE_TEXT("(%P|%t) WARNING: Problem detected in ") 01453 ACE_TEXT("send buffer management: %p.\n"), 01454 ACE_TEXT("send_bytes"))); 01455 01456 if (Transport_debug_level > 0) { 01457 this->transport_.config().dump(); 01458 } 01459 } else { 01460 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01461 "The outcome of the send_packet() was " 01462 "OUTCOME_PEER_LOST.\n")); 01463 } 01464 01465 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01466 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5); 01467 01468 if (this->mode_ != MODE_SUSPEND) { 01469 this->mode_before_suspend_ = this->mode_; 01470 this->mode_ = MODE_SUSPEND; 01471 } 01472 01473 if (relink) { 01474 bool do_suspend = false; 01475 this->relink(do_suspend); 01476 01477 if (this->mode_ == MODE_SUSPEND) { 01478 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01479 "The reconnect has not done yet and we are " 01480 "still in MODE_SUSPEND.\n"), 5); 01481 01482 } else if (this->mode_ == MODE_TERMINATED) { 01483 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01484 "Reconnect failed, we are in MODE_TERMINATED\n"), 5); 01485 01486 } else { 01487 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01488 "Try send the packet again since the connection " 01489 "is re-established.\n"), 5); 01490 01491 // Try send the packet again since the connection is re-established. 01492 continue; 01493 } 01494 } 01495 01496 } else { 01497 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01498 "The outcome of the send_packet() must have been " 01499 "OUTCOME_COMPLETE_SEND.\n")); 01500 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01501 "So, we will just stay in MODE_DIRECT.\n")); 01502 } 01503 01504 break; 01505 } 01506 01507 // We stay in MODE_DIRECT mode if we didn't encounter any backpressure. 01508 }
RemoveResult OpenDDS::DCPS::TransportSendStrategy::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria, | |||
void * | context | |||
) | [protected, virtual] |
Implement framework chain visitations to remove a sample.
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1311 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, header_block_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, mode_, MODE_DIRECT, pkt_chain_, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), replaced_element_db_allocator_, replaced_element_mb_allocator_, OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::QueueRemoveVisitor::status(), status, and VDBG.
Referenced by remove_all_msgs(), and remove_sample().
01314 { 01315 DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6); 01316 01317 //ciju: Tim had the idea that we could do the following check 01318 // if ((this->mode_ == MODE_DIRECT) || 01319 // ((this->pkt_chain_ == 0) && (queue_ == empty))) 01320 // then we can assume that the sample can be safely removed (no need for 01321 // replacement) from the elems_ queue. 01322 if ((this->mode_ == MODE_DIRECT) 01323 || ((this->pkt_chain_ == 0) && (this->queue_.size() == 0))) { 01324 //ciju: I believe this is the only mode where a safe 01325 // assumption can be made that the samples 01326 // in the elems_ queue aren't part of a packet. 01327 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01328 "The mode is MODE_DIRECT, or the queue is empty and no " 01329 "transport packet is in progress.\n")); 01330 01331 QueueRemoveVisitor simple_rem_vis(criteria); 01332 this->elems_.accept_remove_visitor(simple_rem_vis); 01333 01334 const RemoveResult status = simple_rem_vis.status(); 01335 01336 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) { 01337 this->header_.length_ -= simple_rem_vis.removed_bytes(); 01338 01339 } else if (status == REMOVE_NOT_FOUND) { 01340 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01341 "Failed to find the sample to remove.\n")); 01342 } 01343 01344 return status; 01345 } 01346 01347 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01348 "Visit the queue_ with the RemoveElementVisitor.\n")); 01349 01350 QueueRemoveVisitor simple_rem_vis(criteria); 01351 this->queue_.accept_remove_visitor(simple_rem_vis); 01352 01353 RemoveResult status = simple_rem_vis.status(); 01354 01355 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) { 01356 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01357 "The sample was removed from the queue_.\n")); 01358 // This means that the visitor did not encounter any fatal error 01359 // along the way, *AND* the sample was found in the queue_, 01360 // and has now been removed. We are done. 01361 return status; 01362 } 01363 01364 if (status == REMOVE_ERROR) { 01365 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01366 "The RemoveElementVisitor encountered a fatal error in queue_.\n")); 01367 // This means that the visitor encountered some fatal error along 01368 // the way (and it already reported something to the log). 01369 // Return our failure code. 01370 return status; 01371 } 01372 01373 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01374 "The RemoveElementVisitor did not find the sample in queue_.\n")); 01375 01376 // We get here if the visitor did not encounter any fatal error, but it 01377 // also didn't find the sample - and hence it didn't perform any 01378 // "remove sample" logic. 01379 01380 // Now we need to turn our attention to the current transport packet, 01381 // since the packet is likely in a "partially sent" state, and the 01382 // sample may still be contributing unsent bytes in the pkt_chain_. 01383 01384 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01385 "Visit our elems_ with the PacketRemoveVisitor.\n")); 01386 01387 PacketRemoveVisitor pac_rem_vis(criteria, 01388 this->pkt_chain_, 01389 this->header_block_, 01390 this->replaced_element_mb_allocator_, 01391 this->replaced_element_db_allocator_); 01392 01393 this->elems_.accept_replace_visitor(pac_rem_vis); 01394 01395 status = pac_rem_vis.status(); 01396 01397 if (status == REMOVE_ERROR) { 01398 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01399 "The PacketRemoveVisitor encountered a fatal error.\n")); 01400 01401 } else if (status == REMOVE_NOT_FOUND) { 01402 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01403 "The PacketRemoveVisitor didn't find the sample.\n")); 01404 01405 } else { 01406 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01407 "The PacketRemoveVisitor found the sample and removed it.\n")); 01408 } 01409 01410 return status; 01411 }
ssize_t OpenDDS::DCPS::TransportSendStrategy::do_send_packet | ( | const ACE_Message_Block * | packet, | |
int & | bp | |||
) | [private] |
Form an IOV and call the send_bytes() template method.
Definition at line 1684 of file TransportSendStrategy.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), send_bytes(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().
01685 { 01686 if (Transport_debug_level > 9) { 01687 ACE_DEBUG((LM_DEBUG, 01688 ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ") 01689 ACE_TEXT("sending data at 0x%x.\n"), 01690 id(), packet)); 01691 } 01692 DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6); 01693 01694 #if defined(OPENDDS_SECURITY) 01695 // pre_send_packet may provide different data that takes the place of the 01696 // original "packet" (used for security encryption/authentication) 01697 Message_Block_Ptr substitute(pre_send_packet(packet)); 01698 #endif 01699 01700 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01701 "Populate the iovec array using the packet.\n"), 5); 01702 01703 iovec iov[MAX_SEND_BLOCKS]; 01704 01705 #if defined(OPENDDS_SECURITY) 01706 const int num_blocks = mb_to_iov(substitute ? *substitute : *packet, iov); 01707 #else 01708 const int num_blocks = mb_to_iov(*packet, iov); 01709 #endif 01710 01711 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01712 "There are [%d] number of entries in the iovec array.\n", 01713 num_blocks), 5); 01714 01715 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01716 "Attempt to send_bytes() now.\n"), 5); 01717 01718 const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp); 01719 01720 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01721 "The send_bytes() said that num_bytes_sent == [%d].\n", 01722 num_bytes_sent), 5); 01723 01724 #if defined(OPENDDS_SECURITY) 01725 if (substitute && num_bytes_sent > 0) { 01726 // Although the "substitute" data took the place of "packet", the rest 01727 // of the framework needs to account for the bytes in "packet" being taken 01728 // care of, as if they were actually sent. 01729 // Since this is done with datagram sockets, partial sends aren't possible. 01730 return packet->total_length(); 01731 } 01732 #endif 01733 01734 return num_bytes_sent; 01735 }
ACE_INLINE ACE_HANDLE OpenDDS::DCPS::TransportSendStrategy::get_handle | ( | void | ) | [virtual] |
Implements OpenDDS::DCPS::ThreadSynchWorker.
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 130 of file TransportSendStrategy.inl.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().
void OpenDDS::DCPS::TransportSendStrategy::get_packet_elems_from_queue | ( | ) | [private] |
This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.
After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.
Definition at line 1511 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, LM_TRACE, max_message_size(), max_samples_, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), OpenDDS::DCPS::BasicQueue< T >::size(), space_available(), and VDBG_LVL.
Referenced by perform_work().
01512 { 01513 DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6); 01514 01515 for (TransportQueueElement* element = this->queue_.peek(); element != 0; 01516 element = this->queue_.peek()) { 01517 01518 // Total number of bytes in the current element's message block chain. 01519 size_t element_length = element->msg()->total_length(); 01520 01521 // Flag used to determine if the element requires a packet all to itself. 01522 const bool exclusive_packet = element->requires_exclusive_packet(); 01523 01524 const size_t avail = this->space_available(); 01525 01526 bool frag = false; 01527 if (element_length > avail) { 01528 // The current element won't fit into the current packet 01529 if (this->max_message_size()) { // fragmentation enabled 01530 this->header_.first_fragment_ = !element->is_fragment(); 01531 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0); 01532 ElementPair ep = element->fragment(avail); 01533 element = ep.first; 01534 element_length = element->msg()->total_length(); 01535 this->queue_.replace_head(ep.second); 01536 frag = true; // queue_ is already taken care of, don't get() later 01537 } else { 01538 break; 01539 } 01540 } 01541 01542 // If exclusive and the current packet is empty, we won't violate the 01543 // exclusive_packet requirement by put()'ing the element 01544 // into the elems_ collection. 01545 if ((exclusive_packet && this->elems_.size() == 0) 01546 || !exclusive_packet) { 01547 // At this point, we have passed all of the pre-conditions and we can 01548 // now extract the current element from the queue_, put it into the 01549 // packet elems_, and adjust the packet header_.length_. 01550 this->elems_.put(frag ? element : this->queue_.get()); 01551 if (this->header_.length_ == 0) { 01552 this->header_.last_fragment_ = !frag && element->is_fragment(); 01553 } 01554 this->header_.length_ += static_cast<ACE_UINT32>(element_length); 01555 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0); 01556 } 01557 01558 // With exclusive and (elems_.size() != 0), we don't use the current 01559 // element as part of the packet. We know that there is already 01560 // at least one element in the packet, and the current element 01561 // is going to need its own (exclusive) packet. We will just 01562 // use the packet elems_ as it is now. Always break once 01563 // we've encountered and dealt with the exclusive_packet case. 01564 // Also break if fragmentation was required. 01565 if (exclusive_packet || frag 01566 // If the current number of packet elems_ has reached the maximum 01567 // number of samples per packet, then we are done. 01568 || this->elems_.size() == this->max_samples_ 01569 // If the current value of the header_.length_ exceeds (or equals) 01570 // the optimum_size_ for a packet, then we are done. 01571 || this->header_.length_ >= this->optimum_size_) { 01572 break; 01573 } 01574 } 01575 }
ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode | ( | ) |
Definition at line 116 of file TransportSendStrategy.inl.
References mode_, and MODE_DIRECT.
00117 { 00118 return this->mode_ == MODE_DIRECT; 00119 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released | ( | bool | flag | ) |
Definition at line 42 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, link_released_, and lock_.
Referenced by OpenDDS::DCPS::MulticastSendStrategy::MulticastSendStrategy().
00043 { 00044 DBG_ENTRY_LVL("TransportSendStrategy","link_released",6); 00045 00046 GuardType guard(this->lock_); 00047 this->link_released_ = flag; 00048 }
bool OpenDDS::DCPS::TransportSendStrategy::marshal_transport_header | ( | ACE_Message_Block * | mb | ) | [private, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1664 of file TransportSendStrategy.cpp.
References header_.
Referenced by prepare_packet().
01665 { 01666 return *mb << this->header_; 01667 }
ACE_INLINE size_t OpenDDS::DCPS::TransportSendStrategy::max_message_size | ( | void | ) | const [protected, virtual] |
The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.
Reimplemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Definition at line 137 of file TransportSendStrategy.inl.
Referenced by get_packet_elems_from_queue(), send(), and space_available().
int OpenDDS::DCPS::TransportSendStrategy::mb_to_iov | ( | const ACE_Message_Block & | msg, | |
iovec * | iov | |||
) | [static] |
Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
Definition at line 1890 of file TransportSendStrategy.cpp.
References iovec::iov_base, iovec::iov_len, and OpenDDS::DCPS::MAX_SEND_BLOCKS.
Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().
01891 { 01892 int num_blocks = 0; 01893 #ifdef _MSC_VER 01894 #pragma warning(push) 01895 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here 01896 // since on other platforms iov_len is 64-bit 01897 #pragma warning(disable : 4267) 01898 #endif 01899 for (const ACE_Message_Block* block = &msg; 01900 block && num_blocks < MAX_SEND_BLOCKS; 01901 block = block->cont()) { 01902 iov[num_blocks].iov_len = block->length(); 01903 iov[num_blocks++].iov_base = block->rd_ptr(); 01904 } 01905 #ifdef _MSC_VER 01906 #pragma warning(pop) 01907 #endif 01908 return num_blocks; 01909 }
OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL ACE_INLINE OpenDDS::DCPS::TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode | ( | void | ) | const |
Access the current sending mode.
Definition at line 14 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, and mode_.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().
00015 { 00016 DBG_ENTRY_LVL("TransportSendStrategy","mode",6); 00017 00018 return mode_; 00019 }
ACE_INLINE const char * OpenDDS::DCPS::TransportSendStrategy::mode_as_str | ( | SendMode | mode | ) | [static, private] |
Helper function to debugging.
Definition at line 102 of file TransportSendStrategy.inl.
Referenced by perform_work(), send(), and send_stop().
00103 { 00104 static const char* SendModeStr[] = { "MODE_NOT_SET", 00105 "MODE_DIRECT", 00106 "MODE_QUEUE", 00107 "MODE_SUSPEND", 00108 "MODE_TERMINATED", 00109 "UNKNOWN" 00110 }; 00111 00112 return SendModeStr [mode]; 00113 }
ssize_t OpenDDS::DCPS::TransportSendStrategy::non_blocking_send | ( | const iovec | iov[], | |
int | n, | |||
int & | bp | |||
) | [protected, virtual] |
Definition at line 1810 of file TransportSendStrategy.cpp.
References ACE_TEXT(), get_handle(), LM_DEBUG, LM_ERROR, ACE::record_and_set_non_blocking_mode(), ACE::restore_non_blocking_mode(), send_bytes_i(), VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().
01811 { 01812 int val = 0; 01813 ACE_HANDLE handle = this->get_handle(); 01814 01815 if (handle == ACE_INVALID_HANDLE) 01816 return -1; 01817 01818 ACE::record_and_set_non_blocking_mode(handle, val); 01819 01820 // Set the back-pressure flag to false. 01821 bp = 0; 01822 01823 // Clear errno 01824 errno = 0; 01825 01826 ssize_t result = this->send_bytes_i(iov, n); 01827 01828 if (result == -1) { 01829 if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) { 01830 VDBG((LM_DEBUG,"(%P|%t) DBG: " 01831 "Backpressure encountered.\n")); 01832 // Set the back-pressure flag to true 01833 bp = 1; 01834 01835 } else { 01836 VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n", 01837 ACE_TEXT("sendv"), n),1); 01838 01839 // try to get the application to core when "Bad Address" is returned 01840 // by looking at the iovec 01841 for (int ii = 0; ii < n; ii++) { 01842 ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n", 01843 ii, iov[ii].iov_len, iov[ii].iov_base)); 01844 } 01845 } 01846 } 01847 01848 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 01849 "The sendv() returned [%d].\n", result), 5); 01850 01851 ACE::restore_non_blocking_mode(handle, val); 01852 01853 return result; 01854 }
OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR | ( | TQESendModePair | ) | [private] |
ThreadSynchWorker::WorkOutcome OpenDDS::DCPS::TransportSendStrategy::perform_work | ( | void | ) | [virtual] |
Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.
Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.
Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.
Implements OpenDDS::DCPS::ThreadSynchWorker.
Definition at line 122 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), queue_, relink(), send_delayed_notifications(), send_packet(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.
00123 { 00124 DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6); 00125 00126 SendPacketOutcome outcome; 00127 bool no_more_work = false; 00128 00129 { // scope for the guard(this->lock_); 00130 GuardType guard(this->lock_); 00131 00132 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5); 00133 00134 if (this->mode_ == MODE_TERMINATED) { 00135 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00136 "Entered perform_work() and mode_ is MODE_TERMINATED - " 00137 "we lost connection and could not reconnect, just return " 00138 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5); 00139 return WORK_OUTCOME_BROKEN_RESOURCE; 00140 } 00141 00142 // The perform_work() is called by our synch_ object using 00143 // a thread designated to call this method when it thinks 00144 // we need to be called in order to "service" the queue_ and/or 00145 // deal with a partially sent current packet. 00146 // 00147 // We will return a 0 if we don't see a need to have our perform_work() 00148 // called again, and we will return a 1 if we do see the need to have our 00149 // perform_work() method called again. 00150 00151 // First, make sure that the mode_ indicates that we are, indeed, in 00152 // the MODE_QUEUE mode. If we are not in MODE_QUEUE mode (meaning we are 00153 // in MODE_DIRECT), then it means we didn't need to have this perform_work() 00154 // method called - in this case, do nothing other than return 00155 // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't 00156 // see a need for it to call our perform_work() again (at least not 00157 // right now). 00158 if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) { 00159 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00160 "Entered perform_work() and mode_ is %C - just return " 00161 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5); 00162 return WORK_OUTCOME_NO_MORE_TO_DO; 00163 } 00164 00165 // Check the "state" of the current packet. We will either find that the 00166 // current packet is in a state of being "partially sent", or we will find 00167 // it in a state of being "empty". When the current packet is "empty", it 00168 // means that it is time to build up the current packet using elements 00169 // extracted from the queue_, and then we will attempt to send the 00170 // packet. When we find the current packet in the "partially sent" state, 00171 // we will not touch the queue_ - we will just try to send the unsent 00172 // bytes in the current (partially sent) packet. 00173 const size_t header_length = this->header_.length_; 00174 00175 if (header_length == 0) { 00176 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00177 "The current packet doesn't have any unsent bytes - we " 00178 "need to 'populate' the current packet with elems from " 00179 "the queue.\n"), 5); 00180 00181 // The current packet is "empty". Build up the current packet using 00182 // elements from the queue_, and prepare the current packet to be sent. 00183 00184 // Before we build the packet from the queue_, let's make sure that 00185 // there is actually something on the queue_ to build from. 00186 if (this->queue_.size() == 0) { 00187 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00188 "But the queue is empty. We have cleared the " 00189 "backpressure situation.\n"),5); 00190 // We are here because the queue_ is empty, and there isn't 00191 // any "partial packet" bytes left to send. We have overcome 00192 // the backpressure situation and don't have anything to do 00193 // right now. 00194 00195 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00196 "Flip mode to MODE_DIRECT, and return " 00197 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5); 00198 00199 // Flip the mode back to MODE_DIRECT. 00200 this->mode_ = MODE_DIRECT; 00201 00202 // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that 00203 // perform_work() doesn't need to be called again (at this time). 00204 return WORK_OUTCOME_NO_MORE_TO_DO; 00205 } 00206 00207 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00208 "There is at least one elem in the queue - get the packet " 00209 "elems from the queue.\n"), 5); 00210 00211 // There is stuff in the queue_ if we get to this point in the logic. 00212 // Build-up the current packet using element(s) from the queue_. 00213 this->get_packet_elems_from_queue(); 00214 00215 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00216 "Prepare the packet from the packet elems_.\n"), 5); 00217 00218 // Now we can prepare the new packet to be sent. 00219 this->prepare_packet(); 00220 00221 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00222 "Packet has been prepared from packet elems_.\n"), 5); 00223 00224 } else { 00225 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00226 "We have a current packet that still has unsent bytes.\n"), 5); 00227 } 00228 00229 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00230 "Attempt to send the current packet.\n"), 5); 00231 00232 // Now we can attempt to send the current packet - whether it is 00233 // a "partially sent" packet or one that we just built-up using elements 00234 // from the queue_ (and subsequently prepared for sending) - it doesn't 00235 // matter. Just attempt to send as many of the "unsent" bytes in the 00236 // packet as possible. 00237 outcome = this->send_packet(); 00238 00239 // If we sent the whole packet (eg, partial_send is false), and the queue_ 00240 // is now empty, then we've cleared the backpressure situation. 00241 if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_.size() == 0)) { 00242 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00243 "Flip the mode to MODE_DIRECT, and then return " 00244 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5); 00245 00246 // Revert back to MODE_DIRECT mode. 00247 this->mode_ = MODE_DIRECT; 00248 no_more_work = true; 00249 } 00250 } // End of scope for guard(this->lock_); 00251 00252 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00253 "The outcome of the send_packet() was %d.\n", outcome), 5); 00254 00255 send_delayed_notifications(); 00256 00257 // If we sent the whole packet (eg, partial_send is false), and the queue_ 00258 // is now empty, then we've cleared the backpressure situation. 00259 if (no_more_work) { 00260 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00261 "We sent the whole packet, and there is nothing left on " 00262 "the queue now.\n"), 5); 00263 00264 // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we 00265 // don't desire another call to this perform_work() method. 00266 return WORK_OUTCOME_NO_MORE_TO_DO; 00267 } 00268 00269 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00270 "We still have unsent bytes in the current packet AND/OR there " 00271 "are still elements in the queue.\n"), 5); 00272 00273 if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) { 00274 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00275 "We lost our connection, or had some fatal connection " 00276 "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5); 00277 00278 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00279 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5); 00280 00281 bool do_suspend = true; 00282 this->relink(do_suspend); 00283 00284 if (this->mode_ == MODE_SUSPEND) { 00285 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00286 "The reconnect has not done yet and we are still in MODE_SUSPEND. " 00287 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5); 00288 // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we 00289 // don't desire another call to this perform_work() method. 00290 return WORK_OUTCOME_NO_MORE_TO_DO; 00291 00292 } else if (this->mode_ == MODE_TERMINATED) { 00293 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00294 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5); 00295 return WORK_OUTCOME_BROKEN_RESOURCE; 00296 00297 } else { 00298 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00299 "Reconnect succeeded, Notify synch thread of work " 00300 "availability.\n"), 5); 00301 // If the datalink is re-established then notify the synch 00302 // thread to perform work. We do not hold the object lock at 00303 // this point. 00304 this->synch_->work_available(); 00305 } 00306 } 00307 00308 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00309 "We still have an 'unbroken' connection.\n"), 5); 00310 00311 if (outcome == OUTCOME_BACKPRESSURE) { 00312 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00313 "We experienced backpressure on our attempt to send the " 00314 "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5); 00315 // We have a "clogged resource". 00316 return WORK_OUTCOME_CLOGGED_RESOURCE; 00317 } 00318 00319 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00320 "We may have sent the whole current packet, but still have " 00321 "elements on the queue.\n"), 5); 00322 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00323 "Or, we may have only partially sent the current packet.\n"), 5); 00324 00325 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00326 "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5); 00327 00328 // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff 00329 // in the queue_ to be sent. *OR* we have have had an OUTCOME_PARTIAL_SEND, 00330 // which equates to the same thing - we still have work to do. 00331 00332 // We are still in MODE_QUEUE mode, thus there is still work to be 00333 // done to service the queue_ and/or a partially sent current packet. 00334 // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still 00335 // want it to call this perform_work() method. 00336 return WORK_OUTCOME_MORE_TO_DO; 00337 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_header | ( | ) | [private] |
This method is responsible for updating the packet header. Called exclusively by prepare_packet.
Definition at line 1578 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.
Referenced by prepare_packet().
01579 { 01580 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6); 01581 01582 // Increment header sequence for packet: 01583 this->header_.sequence_ = ++this->header_sequence_; 01584 01585 // Allow the specific implementation the opportunity to set 01586 // values in the packet header. 01587 this->prepare_header_i(); 01588 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_header_i | ( | ) | [protected, virtual] |
Specific implementation processing of prepared packet header.
Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.
Definition at line 1591 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_header().
01592 { 01593 DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6); 01594 01595 // Default implementation does nothing. 01596 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_packet | ( | ) | [private] |
This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.
Definition at line 1599 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::BuildChainVisitor::chain(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, ACE_Message_Block::duplicate(), elems_, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), header_block_, header_complete_, header_db_allocator_, header_mb_allocator_, LM_DEBUG, marshal_transport_header(), max_header_size_, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, pkt_chain_, prepare_header(), prepare_packet_i(), ACE_Message_Block::release(), VDBG, and ACE_Time_Value::zero.
Referenced by direct_send(), and perform_work().
01600 { 01601 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6); 01602 01603 // Prepare the header for sending. 01604 this->prepare_header(); 01605 01606 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01607 "Marshal the packet header.\n")); 01608 01609 if (this->header_block_ != 0) { 01610 this->header_block_->release(); 01611 } 01612 01613 ACE_NEW_MALLOC(this->header_block_, 01614 static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()), 01615 ACE_Message_Block(this->max_header_size_, 01616 ACE_Message_Block::MB_DATA, 01617 0, 01618 0, 01619 0, 01620 0, 01621 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 01622 ACE_Time_Value::zero, 01623 ACE_Time_Value::max_time, 01624 this->header_db_allocator_.get(), 01625 this->header_mb_allocator_.get())); 01626 01627 marshal_transport_header(this->header_block_); 01628 01629 this->pkt_chain_ = this->header_block_->duplicate(); 01630 01631 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01632 "Use a BuildChainVisitor to visit the packet elems_.\n")); 01633 01634 // Build up a chain of blocks by duplicating the message block chain 01635 // held by each element (in elems_), and then chaining the new duplicate 01636 // blocks together to form one long chain. 01637 BuildChainVisitor visitor; 01638 this->elems_.accept_visitor(visitor); 01639 01640 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01641 "Attach the visitor's chain of blocks to the lone (packet " 01642 "header) block currently in the pkt_chain_.\n")); 01643 01644 // Attach the visitor's chain of blocks to the packet header block. 01645 this->pkt_chain_->cont(visitor.chain()); 01646 01647 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01648 "Increment header sequence for next packet.\n")); 01649 01650 // Allow the specific implementation the opportunity to process the 01651 // newly prepared packet. 01652 this->prepare_packet_i(); 01653 01654 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01655 "Set the header_complete_ flag to false.\n")); 01656 01657 // Set the header_complete_ to false to indicate 01658 // that the first block in the pkt_chain_ is the packet header block 01659 // (actually a duplicate() of the packet header_block_). 01660 this->header_complete_ = false; 01661 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_packet_i | ( | ) | [protected, virtual] |
Specific implementation processing of prepared packet.
Definition at line 1670 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_packet().
01671 { 01672 DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6); 01673 01674 // Default implementation does nothing. 01675 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::relink | ( | bool | do_suspend = true |
) | [virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 51 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL.
Referenced by direct_send(), and perform_work().
00052 { 00053 DBG_ENTRY_LVL("TransportSendStrategy","relink",6); 00054 // The subsclass needs implement this function for re-establishing 00055 // the link upon send failure. 00056 }
void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 1264 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, do_remove_sample(), lock_, OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().
01265 { 01266 DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6); 01267 01268 const TransportQueueElement::MatchOnPubId match(pub_id); 01269 send_delayed_notifications(&match); 01270 01271 GuardType guard(this->lock_); 01272 01273 if (this->send_buffer_ != 0) { 01274 // If a secondary send buffer is bound, removed samples must 01275 // be retained in order to properly maintain the buffer: 01276 this->send_buffer_->retain_all(pub_id); 01277 } 01278 01279 do_remove_sample(pub_id, match, 0); 01280 }
RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample | ( | const DataSampleElement * | sample, | |
void * | context | |||
) |
Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.
Definition at line 1283 of file TransportSendStrategy.cpp.
References ACE_Message_Block::cont(), DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.
01284 { 01285 DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6); 01286 01287 VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5); 01288 01289 // The sample to remove is either in temporary delayed notification list or 01290 // internal list (elems_ or queue_). If it's going to be removed from temporary delayed 01291 // notification list by transport thread, it needs acquire WriterDataContainer lock for 01292 // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call 01293 // complete as this call already hold the WriterContainer's lock. So this call is safe to 01294 // access the sample to remove. If it's going to be removed by this remove_sample() calling 01295 // thread, it will be removed either from delayed notification list or from internal list 01296 // in which case the element carry the info if the sample is released so the datalinkset 01297 // can stop calling rest datalinks to remove this sample if it's already released.. 01298 01299 const char* const payload = sample->get_sample()->cont()->rd_ptr(); 01300 RepoId pub_id = sample->get_pub_id(); 01301 const TransportQueueElement::MatchOnDataPayload modp(payload); 01302 if (send_delayed_notifications(&modp)) { 01303 return REMOVE_RELEASED; 01304 } 01305 01306 GuardType guard(this->lock_); 01307 return do_remove_sample(pub_id, modp, context); 01308 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send | ( | ) |
This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.
Definition at line 71 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, lock_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, queue_, OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, and synch_.
00072 { 00073 DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6); 00074 GuardType guard(this->lock_); 00075 00076 // If this send strategy is reused when the connection is reestablished, then 00077 // we need re-initialize the mode_ and mode_before_suspend_. 00078 if (this->mode_ == MODE_TERMINATED) { 00079 this->header_.length_ = 0; 00080 this->pkt_chain_ = 0; 00081 this->header_complete_ = false; 00082 this->start_counter_ = 0; 00083 this->mode_ = MODE_DIRECT; 00084 this->mode_before_suspend_ = MODE_NOT_SET; 00085 this->delayed_delivered_notification_queue_.clear(); 00086 00087 } else if (this->mode_ == MODE_SUSPEND) { 00088 this->mode_ = this->mode_before_suspend_; 00089 this->mode_before_suspend_ = MODE_NOT_SET; 00090 if (this->queue_.size() > 0) { 00091 this->mode_ = MODE_QUEUE; 00092 this->synch_->work_available(); 00093 } 00094 00095 } else { 00096 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send The suspend or terminate" 00097 " is not called previously.\n")); 00098 } 00099 }
void OpenDDS::DCPS::TransportSendStrategy::send | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) |
Our DataLink has been requested by some particular TransportClient to send the element.
Definition at line 890 of file TransportSendStrategy.cpp.
References ACE_TEXT(), add_delayed_notification(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, LM_TRACE, lock_, max_header_size_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_message_size(), max_samples_, max_size_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), optimum_size_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), space_available(), synch_, ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.
00891 { 00892 if (Transport_debug_level > 9) { 00893 ACE_DEBUG((LM_DEBUG, 00894 ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ") 00895 ACE_TEXT("sending data at 0x%x.\n"), 00896 id(), element)); 00897 } 00898 00899 DBG_ENTRY_LVL("TransportSendStrategy", "send", 6); 00900 00901 { 00902 GuardType guard(this->lock_); 00903 00904 if (this->link_released_) { 00905 this->add_delayed_notification(element); 00906 00907 } else { 00908 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) { 00909 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00910 "TransportSendStrategy::send: mode is MODE_TERMINATED and not in " 00911 "graceful disconnecting, so discard message.\n")); 00912 element->data_dropped(true); 00913 return; 00914 } 00915 00916 size_t element_length = element->msg()->total_length(); 00917 00918 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00919 "Send element msg() has total_length() == [%d].\n", 00920 element_length)); 00921 00922 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00923 "this->max_header_size_ == [%d].\n", 00924 this->max_header_size_)); 00925 00926 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00927 "this->max_size_ == [%d].\n", 00928 this->max_size_)); 00929 00930 const size_t max_message_size = this->max_message_size(); 00931 00932 // Really an assert. We can't accept any element that wouldn't fit into 00933 // a transport packet by itself (ie, it would be the only element in the 00934 // packet). This max_size_ is the user-configurable maximum, not based 00935 // on the transport's inherent maximum message size. If max_message_size 00936 // is non-zero, we will fragment so max_size_ doesn't apply per-element. 00937 if (max_message_size == 0 && 00938 this->max_header_size_ + element_length > this->max_size_) { 00939 ACE_ERROR((LM_ERROR, 00940 "(%P|%t) ERROR: Element too large (%Q) " 00941 "- won't fit into packet.\n", ACE_UINT64(element_length))); 00942 return; 00943 } 00944 00945 // Check the mode_ to see if we simply put the element on the queue. 00946 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) { 00947 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00948 "this->mode_ == %C, so queue elem and leave.\n", 00949 mode_as_str(this->mode_)), 5); 00950 00951 this->queue_.put(element); 00952 00953 if (this->mode_ != MODE_SUSPEND) { 00954 this->synch_->work_available(); 00955 } 00956 00957 return; 00958 } 00959 00960 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00961 "this->mode_ == MODE_DIRECT.\n")); 00962 00963 // We are in the MODE_DIRECT send mode. When in this mode, the send() 00964 // calls will "build up" the transport packet to be sent directly when it 00965 // reaches the optimal size, contains the maximum number of samples, etc. 00966 00967 // We need to check if the current element (the arg passed-in to this 00968 // send() method) should be appended to the transport packet, or if the 00969 // transport packet should be sent (directly) first, dealing with the 00970 // current element afterwards. 00971 00972 // We will decide to send the packet as it is now, under two circumstances: 00973 // 00974 // Either: 00975 // 00976 // (1) The current element won't fit into the current packet since it 00977 // would violate the max_packet_size_. 00978 // 00979 // -OR- 00980 // 00981 // (2) There is at least one element already in the current packet, 00982 // and the current element says that it must be sent in an 00983 // exclusive packet (ie, in a packet all by itself). 00984 // 00985 const bool exclusive = element->requires_exclusive_packet(); 00986 00987 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00988 "The element %C require an exclusive packet.\n", 00989 (exclusive ? "DOES" : "does NOT") 00990 )); 00991 00992 const size_t space_needed = 00993 (max_message_size > 0) 00994 ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG 00995 : /* not fragmenting */ element_length; 00996 00997 if ((exclusive && (this->elems_.size() != 0)) 00998 || (this->space_available() < space_needed)) { 00999 01000 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01001 "Element won't fit in current packet or requires exclusive" 01002 " - send current packet (directly) now.\n")); 01003 01004 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01005 "max_header_size_: %d, header_.length_: %d, element_length: %d\n" 01006 , this->max_header_size_, this->header_.length_, element_length)); 01007 01008 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01009 "Tot possible length: %d, max_len: %d\n" 01010 , this->max_header_size_ + this->header_.length_ + element_length 01011 , this->max_size_)); 01012 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01013 "current elem size: %d\n" 01014 , this->elems_.size())); 01015 01016 // Send the current packet, and deal with the current element 01017 // afterwards. 01018 // The invocation's relink status should dictate the direct_send's 01019 // relink. We don't want a (relink == false) invocation to end up 01020 // doing a relink. Think of (relink == false) as a non-blocking call. 01021 this->direct_send(relink); 01022 01023 // Now check to see if we flipped into MODE_QUEUE, which would mean 01024 // that the direct_send() experienced backpressure, and the 01025 // packet was only partially sent. If this has happened, we deal with 01026 // the current element by placing it on the queue (and then we are done). 01027 // 01028 // Otherwise, if the mode_ is still MODE_DIRECT, we can just 01029 // "drop" through to the next step in the logic where we append the 01030 // current element to the current packet. 01031 if (this->mode_ == MODE_QUEUE) { 01032 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01033 "We experienced backpressure on that direct send, as " 01034 "the mode_ is now MODE_QUEUE or MODE_SUSPEND. " 01035 "Queue elem and leave.\n"), 5); 01036 this->queue_.put(element); 01037 this->synch_->work_available(); 01038 01039 return; 01040 } 01041 } 01042 01043 // Loop for sending 'element', in fragments if needed 01044 bool first_pkt = true; // enter the loop 1st time through unconditionally 01045 for (TransportQueueElement* next_fragment = 0; 01046 (first_pkt || next_fragment) 01047 && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) { 01048 // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg) 01049 01050 if (next_fragment) { 01051 element = next_fragment; 01052 element_length = next_fragment->msg()->total_length(); 01053 this->header_.first_fragment_ = false; 01054 } 01055 01056 this->header_.last_fragment_ = false; 01057 if (max_message_size) { // fragmentation enabled 01058 const size_t avail = this->space_available(); 01059 if (element_length > avail) { 01060 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting\n"), 0); 01061 ElementPair ep = element->fragment(avail); 01062 element = ep.first; 01063 element_length = element->msg()->total_length(); 01064 next_fragment = ep.second; 01065 this->header_.first_fragment_ = first_pkt; 01066 } else if (next_fragment) { 01067 // We are sending the "tail" element of a previous fragment() 01068 // operation, and this element didn't itself require fragmentation 01069 this->header_.last_fragment_ = true; 01070 next_fragment = 0; 01071 } 01072 } 01073 first_pkt = false; 01074 01075 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01076 "Start the 'append elem' to current packet logic.\n")); 01077 01078 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01079 "Put element into current packet elems_.\n")); 01080 01081 // Now that we know the current element should go into the current 01082 // packet, we can just go ahead and "append" the current element to 01083 // the current packet. 01084 01085 // Add the current element to the collection of packet elements. 01086 this->elems_.put(element); 01087 01088 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01089 "Before, the header_.length_ == [%d].\n", 01090 this->header_.length_)); 01091 01092 // Adjust the header_.length_ to account for the length of the element. 01093 this->header_.length_ += static_cast<ACE_UINT32>(element_length); 01094 const size_t message_length = this->header_.length_; 01095 01096 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01097 "After adding element's length, the header_.length_ == [%d].\n", 01098 message_length)); 01099 01100 // The current packet now contains the current element. We need to 01101 // check to see if the conditions are such that we should go ahead and 01102 // attempt to send the packet "directly" now, or if we can just leave 01103 // and send the current packet later (in another send() call or in a 01104 // send_stop() call). 01105 01106 // There a few conditions that will cause us to attempt to send the 01107 // packet (directly) right now: 01108 // - Fragmentation was needed 01109 // - The current packet has the maximum number of samples per packet. 01110 // - The current packet's total length exceeds the optimum packet size. 01111 // - The current element (currently part of the packet elems_) 01112 // requires an exclusive packet. 01113 // 01114 if (next_fragment || (this->elems_.size() >= this->max_samples_) 01115 || (this->max_header_size_ + message_length > this->optimum_size_) 01116 || exclusive) { 01117 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01118 "Now the current packet looks full - send it (directly).\n")); 01119 01120 this->direct_send(relink); 01121 01122 if (next_fragment && this->mode_ != MODE_DIRECT) { 01123 if (this->mode_ == MODE_QUEUE) { 01124 this->queue_.put(next_fragment); 01125 this->synch_->work_available(); 01126 01127 } else { 01128 next_fragment->data_dropped(true /* dropped by transport */); 01129 } 01130 } else if (mode_ == MODE_QUEUE) { 01131 // Background thread handles packets in progress 01132 this->synch_->work_available(); 01133 } 01134 01135 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01136 "Back from the direct_send() attempt.\n")); 01137 01138 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01139 "And we %C as a result of the direct_send() call.\n", 01140 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE" 01141 : "stayed in MODE_DIRECT"))); 01142 01143 } else { 01144 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01145 "Packet not sent. Send conditions weren't satisfied.\n")); 01146 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01147 "elems_.size(): %d, max_samples_: %d\n", 01148 int(this->elems_.size()), int(this->max_samples_))); 01149 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01150 "header_size_: %d, optimum_size_: %d\n", 01151 int(this->max_header_size_ + message_length), 01152 int(this->optimum_size_))); 01153 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01154 "element_requires_exclusive_packet: %d\n", int(exclusive))); 01155 01156 if (this->mode_ == MODE_QUEUE) { 01157 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01158 "We flipped into MODE_QUEUE.\n")); 01159 01160 } else { 01161 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01162 "We stayed in MODE_DIRECT.\n")); 01163 } 01164 } 01165 } 01166 } 01167 } 01168 01169 send_delayed_notifications(); 01170 }
void OpenDDS::DCPS::TransportSendStrategy::send_buffer | ( | TransportSendBuffer * | send_buffer | ) |
Assigns an optional send buffer.
Definition at line 112 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::open().
00113 { 00114 this->send_buffer_ = send_buffer; 00115 00116 if (this->send_buffer_ != 0) { 00117 this->send_buffer_->bind(this); 00118 } 00119 }
ACE_INLINE ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes | ( | const iovec | iov[], | |
int | n, | |||
int & | bp | |||
) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 122 of file TransportSendStrategy.inl.
References send_bytes_i().
Referenced by do_send_packet().
00125 { 00126 return send_bytes_i(iov, n); 00127 }
virtual ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by non_blocking_send(), and send_bytes().
bool OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications | ( | const TransportQueueElement::MatchCriteria * | match = 0 |
) | [protected] |
If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.
Definition at line 647 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::is_shut_down(), lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_.
Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().
00648 { 00649 DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6); 00650 TransportQueueElement* sample = 0; 00651 SendMode mode = MODE_NOT_SET; 00652 00653 OPENDDS_VECTOR(TQESendModePair) samples; 00654 00655 size_t num_delayed_notifications = 0; 00656 bool found_element = false; 00657 00658 { 00659 GuardType guard(lock_); 00660 00661 num_delayed_notifications = delayed_delivered_notification_queue_.size(); 00662 00663 if (num_delayed_notifications == 0) { 00664 return false; 00665 00666 } else if (num_delayed_notifications == 1) { 00667 // Optimization for the most common case (doesn't need vectors) 00668 00669 if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) { 00670 found_element = true; 00671 sample = delayed_delivered_notification_queue_[0].first; 00672 mode = delayed_delivered_notification_queue_[0].second; 00673 00674 delayed_delivered_notification_queue_.clear(); 00675 } 00676 00677 } else { 00678 OPENDDS_VECTOR(TQESendModePair)::iterator iter; 00679 for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) { 00680 sample = iter->first; 00681 mode = iter->second; 00682 if (!match || match->matches(*sample)) { 00683 found_element = true; 00684 samples.push_back(*iter); 00685 iter = delayed_delivered_notification_queue_.erase(iter); 00686 } else { 00687 ++iter; 00688 } 00689 } 00690 } 00691 } 00692 00693 if (!found_element) 00694 return false; 00695 00696 bool transport_shutdown = this->transport_.is_shut_down(); 00697 00698 if (num_delayed_notifications == 1) { 00699 // optimization for the common case 00700 if (mode == MODE_TERMINATED) { 00701 if (!transport_shutdown || sample->owned_by_transport()) { 00702 sample->data_dropped(true); 00703 } 00704 } else { 00705 if (!transport_shutdown || sample->owned_by_transport()) { 00706 sample->data_delivered(); 00707 } 00708 } 00709 00710 } else { 00711 for (size_t i = 0; i < samples.size(); ++i) { 00712 if (samples[i].second == MODE_TERMINATED) { 00713 if (!transport_shutdown || samples[i].first->owned_by_transport()) { 00714 samples[i].first->data_dropped(true); 00715 } 00716 } else { 00717 if (!transport_shutdown || samples[i].first->owned_by_transport()) { 00718 samples[i].first->data_delivered(); 00719 } 00720 } 00721 } 00722 } 00723 return true; 00724 }
TransportSendStrategy::SendPacketOutcome OpenDDS::DCPS::TransportSendStrategy::send_packet | ( | ) | [private] |
This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().
Definition at line 1738 of file TransportSendStrategy.cpp.
References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), header_, OpenDDS::DCPS::TransportSendBuffer::insert(), LM_DEBUG, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, pkt_chain_, send_buffer_, OpenDDS::DCPS::TransportHeader::sequence_, VDBG, and VDBG_LVL.
Referenced by direct_send(), and perform_work().
01739 { 01740 DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6); 01741 01742 int bp_flag = 0; 01743 const ssize_t num_bytes_sent = 01744 this->do_send_packet(this->pkt_chain_, bp_flag); 01745 01746 if (num_bytes_sent == 0) { 01747 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01748 "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5); 01749 // This means that the peer has disconnected. 01750 return OUTCOME_PEER_LOST; 01751 } 01752 01753 if (num_bytes_sent < 0) { 01754 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01755 "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5); 01756 01757 // Check for backpressure... 01758 if (bp_flag == 1) { 01759 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01760 "Since backpressure flag is true, return " 01761 "OUTCOME_BACKPRESSURE.\n"), 5); 01762 // Ok. Not really an error - just backpressure. 01763 return OUTCOME_BACKPRESSURE; 01764 } 01765 01766 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01767 "Since backpressure flag is false, return " 01768 "OUTCOME_SEND_ERROR.\n"), 5); 01769 01770 // Not backpressure - it's a real error. 01771 // Note: moved this to send_bytes so the errno msg could be written. 01772 //ACE_ERROR((LM_ERROR, 01773 // "(%P|%t) ERROR: Call to peer().send() failed with negative " 01774 // "return code.\n")); 01775 01776 return OUTCOME_SEND_ERROR; 01777 } 01778 01779 if (this->send_buffer_ != 0) { 01780 // If a secondary send buffer is bound, sent samples must 01781 // be inserted in order to properly maintain the buffer: 01782 this->send_buffer_->insert(this->header_.sequence_, 01783 &this->elems_, this->pkt_chain_); 01784 } 01785 01786 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01787 "Since num_bytes_sent > 0, adjust the packet to account for " 01788 "the bytes that did get sent.\n"),5); 01789 01790 // We sent some bytes - adjust the current packet (elems_ and pkt_chain_) 01791 // to account for the bytes that have been sent. 01792 const int result = 01793 this->adjust_packet_after_send(num_bytes_sent); 01794 01795 if (result == 0) { 01796 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01797 "The adjustment logic says that the complete packet was " 01798 "sent. Return OUTCOME_COMPLETE_SEND.\n")); 01799 return OUTCOME_COMPLETE_SEND; 01800 } 01801 01802 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01803 "The adjustment logic says that only a part of the packet was " 01804 "sent. Return OUTCOME_PARTIAL_SEND.\n")); 01805 01806 return OUTCOME_PARTIAL_SEND; 01807 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start | ( | ) |
Invoked prior to one or more send() invocations from a particular TransportClient.
Definition at line 31 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, link_released_, lock_, and start_counter_.
00032 { 00033 DBG_ENTRY_LVL("TransportSendStrategy","send_start",6); 00034 00035 GuardType guard(this->lock_); 00036 00037 if (!this->link_released_) 00038 ++this->start_counter_; 00039 }
void OpenDDS::DCPS::TransportSendStrategy::send_stop | ( | RepoId | repoId | ) |
Invoked after one or more send() invocations from a particular TransportClient.
Definition at line 1173 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, direct_send(), elems_, graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, lock_, mode_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, synch_, VDBG, and VDBG_LVL.
01174 { 01175 DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6); 01176 { 01177 GuardType guard(this->lock_); 01178 01179 if (this->link_released_) 01180 return; 01181 01182 if (this->start_counter_ == 0) { 01183 // This is an indication of a logic error. This is more of an assert. 01184 VDBG_LVL((LM_ERROR, 01185 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5); 01186 return; 01187 } 01188 01189 --this->start_counter_; 01190 01191 if (this->start_counter_ != 0) { 01192 // This wasn't the last send_stop() that we are expecting. We only 01193 // really honor the first send_start() and the last send_stop(). 01194 // We can return without doing anything else in this case. 01195 return; 01196 } 01197 01198 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) { 01199 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01200 "TransportSendStrategy::send_stop: dont try to send current packet " 01201 "since mode is MODE_TERMINATED and not in graceful disconnecting.\n")); 01202 return; 01203 } 01204 01205 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01206 "This is an 'important' send_stop() event since our " 01207 "start_counter_ is 0.\n")); 01208 01209 // We just caused the start_counter_ to become zero. This 01210 // means that we aren't expecting another send() or send_stop() at any 01211 // time in the near future (ie, it isn't imminent). 01212 01213 // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have 01214 // anything to do here because samples have already been going to the 01215 // queue. 01216 01217 // We only need to do something if the mode_ is 01218 // MODE_DIRECT. It means that we may have some sample(s) in the 01219 // current packet that have never been sent. This is our 01220 // opportunity to send the current packet directly if this is the case. 01221 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) { 01222 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01223 "But since we are in %C, we don't have to do " 01224 "anything more in this important send_stop().\n", 01225 mode_as_str(this->mode_))); 01226 // We don't do anything if we are in MODE_QUEUE. Just leave. 01227 return; 01228 } 01229 01230 size_t header_length = this->header_.length_; 01231 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01232 "We are in MODE_DIRECT in an important send_stop() - " 01233 "header_.length_ == [%d].\n", header_length)); 01234 01235 // Only attempt to send the current packet (directly) if the current 01236 // packet actually contains something (it could be empty). 01237 if ((header_length > 0) && 01238 //(this->elems_.size ()+this->not_yet_pac_q_->size() > 0)) 01239 (this->elems_.size() > 0)) { 01240 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01241 "There is something in the current packet - attempt to send " 01242 "it (directly) now.\n")); 01243 // If a relink needs to be done for this packet to be sent, do it. 01244 this->direct_send(true); 01245 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01246 "Back from the attempt to send leftover packet directly.\n")); 01247 01248 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01249 "But we %C as a result.\n", 01250 ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE": 01251 "stayed in MODE_DIRECT" ))); 01252 if (this->mode_ == MODE_QUEUE && this->mode_ != MODE_SUSPEND) { 01253 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01254 "Notify Synch thread of work availability\n")); 01255 this->synch_->work_available(); 01256 } 01257 } 01258 } 01259 01260 send_delayed_notifications(); 01261 }
void OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting | ( | bool | flag | ) | [protected] |
Set graceful disconnecting flag.
Definition at line 1678 of file TransportSendStrategy.cpp.
References graceful_disconnecting_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().
01679 { 01680 this->graceful_disconnecting_ = flag; 01681 }
size_t OpenDDS::DCPS::TransportSendStrategy::space_available | ( | ) | const [private] |
How much space is available in the current packet before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]
Definition at line 1879 of file TransportSendStrategy.cpp.
References header_, OpenDDS::DCPS::TransportHeader::length_, max_header_size_, max_message_size(), and max_size_.
Referenced by get_packet_elems_from_queue(), and send().
01880 { 01881 const size_t used = this->max_header_size_ + this->header_.length_, 01882 max_msg = this->max_message_size(); 01883 if (max_msg) { 01884 return std::min(this->max_size_ - used, max_msg - used); 01885 } 01886 return this->max_size_ - used; 01887 }
int OpenDDS::DCPS::TransportSendStrategy::start | ( | void | ) |
Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.
Definition at line 808 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, header_db_allocator_, header_mb_allocator_, LM_ERROR, lock_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), send_buffer_, start_i(), and synch_.
Referenced by OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i().
00809 { 00810 DBG_ENTRY_LVL("TransportSendStrategy","start",6); 00811 00812 { 00813 GuardType guard(this->lock_); 00814 00815 if (!this->start_i()) { 00816 return -1; 00817 } 00818 } 00819 00820 size_t header_chunks(1); 00821 00822 // If a secondary send buffer is bound, sent headers should 00823 // be cached to properly maintain the buffer: 00824 if (this->send_buffer_ != 0) { 00825 header_chunks += this->send_buffer_->capacity(); 00826 00827 } else { 00828 header_chunks += 1; 00829 } 00830 00831 this->header_db_allocator_.reset( new TransportDataBlockAllocator(header_chunks)); 00832 this->header_mb_allocator_.reset( new TransportMessageBlockAllocator(header_chunks)); 00833 00834 // Since we (the TransportSendStrategy object) are a reference-counted 00835 // object, but the synch_ object doesn't necessarily know this, we need 00836 // to give a "copy" of a reference to ourselves to the synch_ object here. 00837 // We will do the reverse when we unregister ourselves (as a worker) from 00838 // the synch_ object. 00839 00840 if (this->synch_->register_worker(*this) == -1) { 00841 00842 ACE_ERROR_RETURN((LM_ERROR, 00843 "(%P|%t) ERROR: TransportSendStrategy failed to register " 00844 "as a worker with the ThreadSynch object.\n"), 00845 -1); 00846 } 00847 00848 return 0; 00849 }
virtual bool OpenDDS::DCPS::TransportSendStrategy::start_i | ( | ) | [inline, virtual] |
Let the subclass start.
Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.
Definition at line 123 of file TransportSendStrategy.h.
Referenced by start().
void OpenDDS::DCPS::TransportSendStrategy::stop | ( | void | ) |
Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.
Definition at line 852 of file TransportSendStrategy.cpp.
References ACE_TEXT(), DBG_ENTRY_LVL, header_block_, LM_WARNING, lock_, pkt_chain_, ACE_Message_Block::release(), size, stop_i(), synch_, and ACE_Message_Block::total_length().
00853 { 00854 DBG_ENTRY_LVL("TransportSendStrategy","stop",6); 00855 00856 if (this->header_block_ != 0) { 00857 this->header_block_->release (); 00858 this->header_block_ = 0; 00859 } 00860 00861 this->synch_->unregister_worker(); 00862 00863 { 00864 GuardType guard(this->lock_); 00865 00866 if (this->pkt_chain_ != 0) { 00867 size_t size = this->pkt_chain_->total_length(); 00868 00869 if (size > 0) { 00870 this->pkt_chain_->release(); 00871 ACE_DEBUG((LM_WARNING, 00872 ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ") 00873 ACE_TEXT("terminating with %d unsent bytes.\n"), 00874 size)); 00875 } 00876 } 00877 } 00878 00879 { 00880 GuardType guard(this->lock_); 00881 00882 this->stop_i(); 00883 } 00884 00885 // TBD SOON - What about all of the samples that may still be stuck in 00886 // our queue_ and/or elems_? 00887 }
virtual void OpenDDS::DCPS::TransportSendStrategy::stop_i | ( | ) | [pure virtual] |
Let the subclass stop.
Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by stop().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send | ( | ) |
This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.
Definition at line 59 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, lock_, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.
00060 { 00061 DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6); 00062 GuardType guard(this->lock_); 00063 00064 if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) { 00065 this->mode_before_suspend_ = this->mode_; 00066 this->mode_ = MODE_SUSPEND; 00067 } 00068 }
ACE_INLINE OpenDDS::DCPS::ThreadSynch * OpenDDS::DCPS::TransportSendStrategy::synch | ( | ) | const [protected] |
Definition at line 23 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, OpenDDS::DCPS::unique_ptr< T, Deleter >::get(), and synch_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().
00024 { 00025 DBG_ENTRY_LVL("TransportSendStrategy","synch",6); 00026 00027 return synch_.get(); 00028 }
void OpenDDS::DCPS::TransportSendStrategy::terminate_send | ( | bool | graceful_disconnecting = false |
) |
Remove all samples in the backpressure queue and packet queue.
This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.
Definition at line 728 of file TransportSendStrategy.cpp.
References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, LM_DEBUG, lock_, mode_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.
00729 { 00730 DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6); 00731 00732 bool reset_flag = true; 00733 00734 { 00735 GuardType guard(this->lock_); 00736 00737 // If the terminate_send call due to a non-graceful disconnection before 00738 // a datalink shutdown then we will not try to send the graceful disconnect 00739 // message. 00740 if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND) 00741 && !this->graceful_disconnecting_) { 00742 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00743 "It was already terminated non gracefully, will not set to graceful disconnecting \n")); 00744 reset_flag = false; 00745 } 00746 } 00747 00748 VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED \n")); 00749 00750 this->clear(MODE_TERMINATED); 00751 00752 if (reset_flag) { 00753 GuardType guard(this->lock_); 00754 this->graceful_disconnecting_ = graceful_disconnecting; 00755 } 00756 }
friend class TransportSendBuffer [friend] |
Definition at line 398 of file TransportSendStrategy.h.
Current elements that have contributed blocks to the current transport packet.
Definition at line 337 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), send(), and send_stop().
Definition at line 389 of file TransportSendStrategy.h.
Referenced by send(), send_stop(), set_graceful_disconnecting(), and terminate_send().
Current transport packet header.
Definition at line 404 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), get_packet_elems_from_queue(), marshal_transport_header(), perform_work(), prepare_header(), OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i(), resume_send(), send(), send_packet(), send_stop(), and space_available().
Current transport packet header, marshalled.
Definition at line 330 of file TransportSendStrategy.h.
Referenced by do_remove_sample(), prepare_packet(), and stop().
bool OpenDDS::DCPS::TransportSendStrategy::header_complete_ [private] |
Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.
Definition at line 345 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().
unique_ptr<TransportDataBlockAllocator> OpenDDS::DCPS::TransportSendStrategy::header_db_allocator_ [private] |
Allocator for header message block.
Definition at line 374 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
unique_ptr<TransportMessageBlockAllocator> OpenDDS::DCPS::TransportSendStrategy::header_mb_allocator_ [private] |
Allocator for header data block.
Definition at line 371 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
Current transport header sequence number.
Definition at line 333 of file TransportSendStrategy.h.
Referenced by prepare_header().
bool OpenDDS::DCPS::TransportSendStrategy::link_released_ [private] |
Definition at line 391 of file TransportSendStrategy.h.
Referenced by link_released(), send(), send_start(), and send_stop().
This lock will protect critical sections of code that play a role in the sending of data.
Definition at line 381 of file TransportSendStrategy.h.
Referenced by clear(), deliver_ack_request(), link_released(), perform_work(), remove_all_msgs(), remove_sample(), resume_send(), send(), send_delayed_notifications(), send_start(), send_stop(), start(), stop(), suspend_send(), and terminate_send().
Maximum marshalled size of the transport packet header.
Definition at line 327 of file TransportSendStrategy.h.
Referenced by prepare_packet(), send(), space_available(), and TransportSendStrategy().
Configuration - max number of samples per transport packet.
Definition at line 311 of file TransportSendStrategy.h.
Referenced by add_delayed_notification(), get_packet_elems_from_queue(), send(), and TransportSendStrategy().
Configuration - max transport packet size (bytes).
Definition at line 317 of file TransportSendStrategy.h.
Referenced by send(), and space_available().
This mode determines how send() calls will be handled.
Definition at line 359 of file TransportSendStrategy.h.
Referenced by add_delayed_notification(), clear(), direct_send(), do_remove_sample(), isDirectMode(), mode(), perform_work(), resume_send(), send(), send_stop(), suspend_send(), and terminate_send().
This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.
Definition at line 364 of file TransportSendStrategy.h.
Referenced by clear(), direct_send(), resume_send(), and suspend_send().
Configuration - optimum transport packet size (bytes).
Definition at line 314 of file TransportSendStrategy.h.
Referenced by get_packet_elems_from_queue(), and send().
Current (head of chain) block containing unsent bytes for the current transport packet.
Definition at line 341 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), prepare_packet(), resume_send(), send_packet(), and stop().
Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.
Definition at line 324 of file TransportSendStrategy.h.
Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), perform_work(), resume_send(), and send().
Definition at line 385 of file TransportSendStrategy.h.
Referenced by do_remove_sample().
MessageBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_mb_allocator_ [private] |
Cached allocator for TransportReplaceElement.
Definition at line 384 of file TransportSendStrategy.h.
Referenced by do_remove_sample().
Definition at line 393 of file TransportSendStrategy.h.
Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().
unsigned OpenDDS::DCPS::TransportSendStrategy::start_counter_ [private] |
Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accomodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().
Definition at line 356 of file TransportSendStrategy.h.
Referenced by clear(), resume_send(), send_start(), and send_stop().
The thread synch object.
Definition at line 377 of file TransportSendStrategy.h.
Referenced by perform_work(), resume_send(), send(), send_stop(), start(), stop(), synch(), and TransportSendStrategy().
Definition at line 387 of file TransportSendStrategy.h.
Referenced by direct_send(), and send_delayed_notifications().
const size_t OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE = 65466 [static, protected] |
Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.
Definition at line 181 of file TransportSendStrategy.h.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().