#include <TransportSendStrategy.h>
Inheritance diagram for OpenDDS::DCPS::TransportSendStrategy:
Public Types | |
typedef BasicQueue< TransportQueueElement > | QueueType |
MODE_NOT_SET | |
MODE_DIRECT | |
MODE_QUEUE | |
MODE_SUSPEND | |
MODE_TERMINATED | |
enum | SendMode { MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED } |
Public Member Functions | |
virtual | ~TransportSendStrategy () |
void | send_buffer (TransportSendBuffer *send_buffer) |
Assigns an optional send buffer. | |
int | start () |
void | stop () |
void | send_start () |
void | send (TransportQueueElement *element, bool relink=true) |
void | send_stop (RepoId repoId) |
RemoveResult | remove_sample (const DataSampleElement *sample) |
void | remove_all_msgs (RepoId pub_id) |
virtual WorkOutcome | perform_work () |
virtual void | relink (bool do_suspend=true) |
void | suspend_send () |
void | resume_send () |
void | terminate_send (bool graceful_disconnecting=false) |
Remove all samples in the backpressure queue and packet queue. | |
virtual void | stop_i ()=0 |
Let the subclass stop. | |
virtual bool | start_i () |
Let the subclass start. | |
void | link_released (bool flag) |
bool | isDirectMode () |
void | transport_shutdown () |
virtual ACE_HANDLE | get_handle () |
void | clear (SendMode mode=MODE_DIRECT) |
Clear queued messages and messages in current packet. | |
SendMode | mode () const |
Access the current sending mode. | |
Static Public Member Functions | |
static int | mb_to_iov (const ACE_Message_Block &msg, iovec *iov) |
Protected Member Functions | |
TransportSendStrategy (std::size_t id, const TransportInst_rch &transport_inst, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy) | |
virtual ssize_t | send_bytes (const iovec iov[], int n, int &bp) |
virtual ssize_t | non_blocking_send (const iovec iov[], int n, int &bp) |
virtual ssize_t | send_bytes_i (const iovec iov[], int n)=0 |
virtual void | prepare_header_i () |
Specific implementation processing of prepared packet header. | |
virtual void | prepare_packet_i () |
Specific implementation processing of prepared packet. | |
TransportQueueElement * | current_packet_first_element () const |
virtual size_t | max_message_size () const |
void | set_graceful_disconnecting (bool flag) |
Set graceful disconnecting flag. | |
virtual void | add_delayed_notification (TransportQueueElement *element) |
virtual RemoveResult | do_remove_sample (const RepoId &pub_id, const TransportQueueElement::MatchCriteria &criteria) |
Implement framework chain visitations to remove a sample. | |
ThreadSynch * | synch () const |
Protected Attributes | |
TransportHeader | header_ |
Current transport packet header. | |
Static Protected Attributes | |
static const size_t | UDP_MAX_MESSAGE_SIZE = 65466 |
Private Types | |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef std::pair< TransportQueueElement *, SendMode > | TQESendModePair |
Used for delayed notifications when performing work. | |
OUTCOME_COMPLETE_SEND | |
OUTCOME_PARTIAL_SEND | |
OUTCOME_BACKPRESSURE | |
OUTCOME_PEER_LOST | |
OUTCOME_SEND_ERROR | |
enum | SendPacketOutcome { OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR } |
Private Member Functions | |
void | direct_send (bool relink) |
void | get_packet_elems_from_queue () |
void | prepare_header () |
void | prepare_packet () |
SendPacketOutcome | send_packet () |
ssize_t | do_send_packet (const ACE_Message_Block *packet, int &bp) |
Form an IOV and call the send_bytes() template method. | |
int | adjust_packet_after_send (ssize_t num_bytes_sent) |
bool | send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0) |
size_t | space_available () const |
virtual void | marshal_transport_header (ACE_Message_Block *mb) |
OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_ | |
Static Private Member Functions | |
static const char * | mode_as_str (SendMode mode) |
Helper function to debugging. | |
Private Attributes | |
size_t | max_samples_ |
Configuration - max number of samples per transport packet. | |
ACE_UINT32 | optimum_size_ |
Configuration - optimum transport packet size (bytes). | |
ACE_UINT32 | max_size_ |
Configuration - max transport packet size (bytes). | |
QueueType * | queue_ |
size_t | max_header_size_ |
Maximum marshalled size of the transport packet header. | |
ACE_Message_Block * | header_block_ |
Current transport packet header, marshalled. | |
SequenceNumber | header_sequence_ |
Current transport header sequence number. | |
QueueType * | elems_ |
ACE_Message_Block * | pkt_chain_ |
bool | header_complete_ |
unsigned | start_counter_ |
SendMode | mode_ |
This mode determines how send() calls will be handled. | |
SendMode | mode_before_suspend_ |
TransportMessageBlockAllocator * | header_mb_allocator_ |
Allocator for header data block. | |
TransportDataBlockAllocator * | header_db_allocator_ |
Allocator for header message block. | |
ThreadSynch * | synch_ |
The thread synch object. | |
LockType | lock_ |
TransportReplacedElementAllocator | replaced_element_allocator_ |
Cached allocator for TransportReplaceElement. | |
MessageBlockAllocator | replaced_element_mb_allocator_ |
DataBlockAllocator | replaced_element_db_allocator_ |
TransportRetainedElementAllocator * | retained_element_allocator_ |
TransportInst_rch | transport_inst_ |
bool | graceful_disconnecting_ |
bool | link_released_ |
TransportSendBuffer * | send_buffer_ |
bool | transport_shutdown_ |
Friends | |
class | TransportSendBuffer |
Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.
Definition at line 48 of file TransportSendStrategy.h.
typedef ACE_Guard<LockType> OpenDDS::DCPS::TransportSendStrategy::GuardType [private] |
Definition at line 257 of file TransportSendStrategy.h.
typedef ACE_SYNCH_MUTEX OpenDDS::DCPS::TransportSendStrategy::LockType [private] |
Definition at line 256 of file TransportSendStrategy.h.
Definition at line 133 of file TransportSendStrategy.h.
typedef std::pair<TransportQueueElement*, SendMode> OpenDDS::DCPS::TransportSendStrategy::TQESendModePair [private] |
Used for delayed notifications when performing work.
Definition at line 356 of file TransportSendStrategy.h.
Definition at line 260 of file TransportSendStrategy.h.
00260 { 00261 // MODE_NOT_SET is used as the initial value of mode_before_suspend_ so 00262 // we can check if the resume_send is paired with suspend_send. 00263 MODE_NOT_SET, 00264 // Send out the sample with current packet. 00265 MODE_DIRECT, 00266 // The samples need be queued because of the backpressure or partial send. 00267 MODE_QUEUE, 00268 // The samples need be queued because the connection is lost and we are 00269 // trying to reconnect. 00270 MODE_SUSPEND, 00271 // The samples need be dropped since we lost connection and could not 00272 // reconnect. 00273 MODE_TERMINATED 00274 };
enum OpenDDS::DCPS::TransportSendStrategy::SendPacketOutcome [private] |
OUTCOME_COMPLETE_SEND | |
OUTCOME_PARTIAL_SEND | |
OUTCOME_BACKPRESSURE | |
OUTCOME_PEER_LOST | |
OUTCOME_SEND_ERROR |
Definition at line 191 of file TransportSendStrategy.h.
00191 { 00192 OUTCOME_COMPLETE_SEND, 00193 OUTCOME_PARTIAL_SEND, 00194 OUTCOME_BACKPRESSURE, 00195 OUTCOME_PEER_LOST, 00196 OUTCOME_SEND_ERROR 00197 };
OpenDDS::DCPS::TransportSendStrategy::~TransportSendStrategy | ( | ) | [virtual] |
Definition at line 115 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, elems_, queue_, and synch_.
00116 { 00117 DBG_ENTRY_LVL("TransportSendStrategy","~TransportSendStrategy",6); 00118 00119 delete this->synch_; 00120 00121 this->delayed_delivered_notification_queue_.clear(); 00122 00123 delete this->elems_; 00124 delete this->queue_; 00125 }
OpenDDS::DCPS::TransportSendStrategy::TransportSendStrategy | ( | std::size_t | id, | |
const TransportInst_rch & | transport_inst, | |||
ThreadSynchResource * | synch_resource, | |||
Priority | priority, | |||
const ThreadSynchStrategy_rch & | thread_sync_strategy | |||
) | [protected] |
Definition at line 56 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, max_header_size_, max_marshaled_size(), NUM_REPLACED_ELEMENT_CHUNKS, replaced_element_allocator_, synch_, TheServiceParticipant, OpenDDS::DCPS::DirectPriorityMapper::thread_priority(), and OpenDDS::DCPS::Transport_debug_level.
00062 : ThreadSynchWorker(id), 00063 max_samples_(transport_inst->max_samples_per_packet_), 00064 optimum_size_(transport_inst->optimum_packet_size_), 00065 max_size_(transport_inst->max_packet_size_), 00066 queue_(new QueueType(transport_inst->queue_messages_per_pool_, 00067 transport_inst->queue_initial_pools_)), 00068 max_header_size_(0), 00069 header_block_(0), 00070 elems_(new QueueType(1, transport_inst->max_samples_per_packet_)), 00071 pkt_chain_(0), 00072 header_complete_(false), 00073 start_counter_(0), 00074 mode_(MODE_DIRECT), 00075 mode_before_suspend_(MODE_NOT_SET), 00076 header_mb_allocator_(0), 00077 header_db_allocator_(0), 00078 synch_(0), 00079 lock_(), 00080 replaced_element_allocator_(NUM_REPLACED_ELEMENT_CHUNKS), 00081 replaced_element_mb_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2), 00082 replaced_element_db_allocator_(NUM_REPLACED_ELEMENT_CHUNKS * 2), 00083 retained_element_allocator_(0), 00084 transport_inst_(transport_inst), 00085 graceful_disconnecting_(false), 00086 link_released_(true), 00087 send_buffer_(0), 00088 transport_shutdown_(false) 00089 { 00090 DBG_ENTRY_LVL("TransportSendStrategy","TransportSendStrategy",6); 00091 00092 // Create a ThreadSynch object just for us. 00093 DirectPriorityMapper mapper(priority); 00094 this->synch_ = thread_sync_strategy->create_synch_object( 00095 synch_resource, 00096 #ifdef ACE_WIN32 00097 ACE_DEFAULT_THREAD_PRIORITY, 00098 #else 00099 mapper.thread_priority(), 00100 #endif 00101 TheServiceParticipant->scheduler()); 00102 00103 // We cache this value in data member since it doesn't change, and we 00104 // don't want to keep asking for it over and over. 00105 this->max_header_size_ = TransportHeader::max_marshaled_size(); 00106 00107 if (Transport_debug_level >= 2) { 00108 ACE_DEBUG((LM_DEBUG, "(%P|%t) TransportSendStrategy replaced_element_allocator %x with %d chunks\n", 00109 &replaced_element_allocator_, NUM_REPLACED_ELEMENT_CHUNKS)); 00110 } 00111 00112 delayed_delivered_notification_queue_.reserve(this->max_samples_); 00113 }
void OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification | ( | TransportQueueElement * | element | ) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1872 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::Transport_debug_level.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification(), adjust_packet_after_send(), and send().
01873 { 01874 if (Transport_debug_level) { 01875 size_t size = this->delayed_delivered_notification_queue_.size(); 01876 if ((size > 0) && (size % this->max_samples_ == 0)) { 01877 ACE_DEBUG((LM_DEBUG, 01878 "(%P|%t) Transport send strategy notification queue threshold, size=%d\n", 01879 size)); 01880 } 01881 } 01882 01883 this->delayed_delivered_notification_queue_.push_back(std::make_pair(element, this->mode_)); 01884 }
int OpenDDS::DCPS::TransportSendStrategy::adjust_packet_after_send | ( | ssize_t | num_bytes_sent | ) | [private] |
This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.
Definition at line 369 of file TransportSendStrategy.cpp.
References add_delayed_notification(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, and VDBG.
Referenced by clear(), and send_packet().
00370 { 00371 DBG_ENTRY_LVL("TransportSendStrategy", "adjust_packet_after_send", 6); 00372 00373 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00374 "Adjusting the current packet because %d bytes of the packet " 00375 "have been sent.\n", num_bytes_sent)); 00376 00377 ssize_t num_bytes_left = num_bytes_sent; 00378 ssize_t num_non_header_bytes_sent = 0; 00379 00380 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00381 "Set num_bytes_left to %d.\n", num_bytes_left)); 00382 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00383 "Set num_non_header_bytes_sent to %d.\n", 00384 num_non_header_bytes_sent)); 00385 00386 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00387 "Peek at the element at the front of the packet elems_.\n")); 00388 00389 // This is the element currently at the front of elems_. 00390 TransportQueueElement* element = this->elems_->peek(); 00391 00392 if(!element){ 00393 ACE_DEBUG((LM_INFO, "(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n")); 00394 } else { 00395 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00396 "Use the element's msg() to find the last block in " 00397 "the msg() chain.\n")); 00398 00399 // Get a pointer to the last message block in the element. 00400 const ACE_Message_Block* elem_tail_block = element->msg(); 00401 00402 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00403 "Start with tail block == element->msg().\n")); 00404 00405 while (elem_tail_block->cont() != 0) { 00406 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00407 "Set tail block to its cont() block (next in chain).\n")); 00408 elem_tail_block = elem_tail_block->cont(); 00409 } 00410 00411 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00412 "Tail block now set (because tail block's cont() is 0).\n")); 00413 00414 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00415 "Start the 'while (num_bytes_left > 0)' loop.\n")); 00416 00417 while (num_bytes_left > 0) { 00418 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00419 "At top of 'num bytes left' loop. num_bytes_left == [%d].\n", 00420 num_bytes_left)); 00421 00422 const int block_length = static_cast<int>(this->pkt_chain_->length()); 00423 00424 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00425 "Length of block at front of pkt_chain_ is [%d].\n", 00426 block_length)); 00427 00428 if (block_length <= num_bytes_left) { 00429 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00430 "The whole block at the front of pkt_chain_ was sent.\n")); 00431 00432 // The entire message block at the front of the chain has been sent. 00433 // Detach the head message block from the chain and adjust 00434 // the pkt_chain_ to point to the next block (if any) in 00435 // the chain. 00436 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00437 "Extract the fully sent block from the pkt_chain_.\n")); 00438 00439 ACE_Message_Block* fully_sent_block = this->pkt_chain_; 00440 00441 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00442 "Set pkt_chain_ to pkt_chain_->cont().\n")); 00443 00444 this->pkt_chain_ = this->pkt_chain_->cont(); 00445 00446 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00447 "Set the fully sent block's cont() to 0.\n")); 00448 00449 fully_sent_block->cont(0); 00450 00451 // Update the num_bytes_left to indicate that we have 00452 // processed the entire length of the block. 00453 num_bytes_left -= block_length; 00454 00455 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00456 "Updated num_bytes_left to account for fully sent " 00457 "block (block_length == [%d]).\n", block_length)); 00458 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00459 "Now, num_bytes_left == [%d].\n", num_bytes_left)); 00460 00461 if (!this->header_complete_) { 00462 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00463 "Since the header_complete_ flag is false, it means " 00464 "that the packet header block was still in the " 00465 "pkt_chain_.\n")); 00466 00467 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00468 "Not anymore... Set the header_complete_ flag " 00469 "to true.\n")); 00470 00471 // That was the packet header block. And now we know that it 00472 // has been completely sent. 00473 this->header_complete_ = true; 00474 00475 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00476 "Release the fully sent block.\n")); 00477 00478 // Release the fully_sent_block 00479 fully_sent_block->release(); 00480 00481 } else { 00482 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00483 "Since the header_complete_ flag is true, it means " 00484 "that the packet header block was not in the " 00485 "pkt_chain_.\n")); 00486 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00487 "So, the fully sent block was part of an element.\n")); 00488 00489 // That wasn't the packet header block. It was from the 00490 // element currently at the front of the elems_ 00491 // collection. If it was the last block from the 00492 // element, then we need to extract the element from the 00493 // elems_ collection and invoke data_delivered() on it. 00494 num_non_header_bytes_sent += block_length; 00495 00496 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00497 "Updated num_non_header_bytes_sent to account for " 00498 "fully sent block (block_length == [%d]).\n", 00499 block_length)); 00500 00501 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00502 "Now, num_non_header_bytes_sent == [%d].\n", 00503 num_non_header_bytes_sent)); 00504 00505 if (fully_sent_block->base() == elem_tail_block->base()) { 00506 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00507 "Ok. The fully sent block was a duplicate of " 00508 "the tail block of the element that is at the " 00509 "front of the packet elems_.\n")); 00510 00511 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00512 "This means that we have completely sent the " 00513 "element at the front of the packet elems_.\n")); 00514 00515 // This means that we have completely sent the element 00516 // that is currently at the front of the elems_ collection. 00517 00518 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00519 "We can release the fully sent block now.\n")); 00520 00521 // Release the fully_sent_block 00522 fully_sent_block->release(); 00523 00524 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00525 "We can extract the element from the front of " 00526 "the packet elems_ (we were just peeking).\n")); 00527 00528 // Extract the element from the elems_ collection 00529 element = this->elems_->get(); 00530 00531 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00532 "Tell the element that a decision has been made " 00533 "regarding its fate - data_delivered().\n")); 00534 00535 // Inform the element that the data has been delivered. 00536 this->add_delayed_notification(element); 00537 00538 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00539 "Peek at the next element in the packet " 00540 "elems_.\n")); 00541 00542 // Set up for the next element in elems_ by peek()'ing. 00543 element = this->elems_->peek(); 00544 00545 if (element != 0) { 00546 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00547 "The is an element still in the packet " 00548 "elems_ (we are peeking at it now).\n")); 00549 00550 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00551 "We are going to find the tail block for the " 00552 "current element (we are peeking at).\n")); 00553 00554 // There was a "next element". Determine the 00555 // elem_tail_block for it. 00556 elem_tail_block = element->msg(); 00557 00558 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00559 "Start w/tail block == element->msg().\n")); 00560 00561 while (elem_tail_block->cont() != 0) { 00562 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00563 "Set tail block to next in chain.\n")); 00564 elem_tail_block = elem_tail_block->cont(); 00565 } 00566 00567 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00568 "Done finding tail block.\n")); 00569 } 00570 00571 } else { 00572 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00573 "Ok. The fully sent block is *not* a " 00574 "duplicate of the tail block of the element " 00575 "at the front of the packet elems_.\n")); 00576 00577 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00578 "Thus, we have not completely sent the " 00579 "element yet.\n")); 00580 00581 // We didn't completely send the element - it has more 00582 // message blocks that haven't been sent (that we know of). 00583 00584 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00585 "We can release the fully_sent_block now.\n")); 00586 00587 // Release the fully_sent_block 00588 fully_sent_block->release(); 00589 } 00590 } 00591 00592 } else { 00593 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00594 "Only part of the block at the front of pkt_chain_ " 00595 "was sent.\n")); 00596 00597 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00598 "Advance the rd_ptr() of the front block (of pkt_chain_) " 00599 "by the num_bytes_left (%d).\n", num_bytes_left)); 00600 00601 // Only part of the current block was sent. 00602 this->pkt_chain_->rd_ptr(num_bytes_left); 00603 00604 if (this->header_complete_) { 00605 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00606 "And since the packet header block has already been " 00607 "completely sent, add num_bytes_left to the " 00608 "num_non_header_bytes_sent.\n")); 00609 00610 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00611 "Before, num_non_header_bytes_sent == %d.\n", 00612 num_non_header_bytes_sent)); 00613 00614 // We know that the current block isn't the packet header 00615 // block because the packet header block has already been 00616 // completely sent. We need to count these bytes in the 00617 // num_non_header_bytes_sent. 00618 num_non_header_bytes_sent += num_bytes_left; 00619 00620 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00621 "After, num_non_header_bytes_sent == %d.\n", 00622 num_non_header_bytes_sent)); 00623 } 00624 00625 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00626 "Set the num_bytes_left to 0 now.\n")); 00627 00628 num_bytes_left = 0; 00629 } 00630 } 00631 } 00632 00633 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00634 "The 'num_bytes_left' loop has completed.\n")); 00635 00636 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00637 "Adjust the header_.length_ to account for the " 00638 "num_non_header_bytes_sent.\n")); 00639 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00640 "Before, header_.length_ == %d.\n", 00641 this->header_.length_)); 00642 00643 // Adjust the packet header_.length_ to indicate how many non header 00644 // bytes are left to send. 00645 this->header_.length_ -= static_cast<ACE_UINT32>(num_non_header_bytes_sent); 00646 00647 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00648 "After, header_.length_ == %d.\n", 00649 this->header_.length_)); 00650 00651 // Returns 0 if the entire packet was sent, and returns 1 otherwise. 00652 int rc = (this->header_.length_ == 0) ? 0 : 1; 00653 00654 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00655 "Adjustments all done. Returning [%d]. 0 means entire packet " 00656 "has been sent. 1 means otherwise.\n", 00657 rc)); 00658 00659 return rc; 00660 }
void OpenDDS::DCPS::TransportSendStrategy::clear | ( | SendMode | mode = MODE_DIRECT |
) |
Clear queued messages and messages in current packet.
Definition at line 773 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, and VDBG.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), and terminate_send().
00774 { 00775 DBG_ENTRY_LVL("TransportSendStrategy","clear",6); 00776 00777 send_delayed_notifications(); 00778 QueueType* elems = 0; 00779 QueueType* queue = 0; 00780 { 00781 GuardType guard(this->lock_); 00782 00783 if (this->header_.length_ > 0) { 00784 // Clear the messages in the pkt_chain_ that is partially sent. 00785 // We just reuse these functions for normal partial send except actual sending. 00786 int num_bytes_left = static_cast<int>(this->pkt_chain_->total_length()); 00787 int result = this->adjust_packet_after_send(num_bytes_left); 00788 00789 if (result == 0) { 00790 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00791 "The adjustment logic says that the packet is cleared.\n")); 00792 00793 } else { 00794 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00795 "The adjustment returned partial sent.\n")); 00796 } 00797 } 00798 00799 elems = this->elems_; 00800 this->elems_ = new QueueType(1, this->transport_inst_->max_samples_per_packet_); 00801 queue = this->queue_; 00802 this->queue_ = new QueueType(this->transport_inst_->queue_messages_per_pool_, 00803 this->transport_inst_->queue_initial_pools_); 00804 00805 this->header_.length_ = 0; 00806 this->pkt_chain_ = 0; 00807 this->header_complete_ = false; 00808 this->start_counter_ = 0; 00809 this->mode_ = mode; 00810 this->mode_before_suspend_ = MODE_NOT_SET; 00811 } 00812 00813 // We need remove the queued elements outside the lock, 00814 // otherwise we have a deadlock situation when remove vistor 00815 // calls the data_droped on each dropped elements. 00816 00817 // Clear all samples in queue. 00818 RemoveAllVisitor remove_all_visitor; 00819 00820 elems->accept_remove_visitor(remove_all_visitor); 00821 queue->accept_remove_visitor(remove_all_visitor); 00822 00823 delete elems; 00824 delete queue; 00825 }
ACE_INLINE OpenDDS::DCPS::TransportQueueElement * OpenDDS::DCPS::TransportSendStrategy::current_packet_first_element | ( | ) | const [protected] |
Definition at line 148 of file TransportSendStrategy.inl.
References elems_, and OpenDDS::DCPS::BasicQueue< T >::peek().
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i().
void OpenDDS::DCPS::TransportSendStrategy::direct_send | ( | bool | relink | ) | [private] |
Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.
Definition at line 1449 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), OpenDDS::DCPS::Transport_debug_level, transport_inst_, VDBG, and VDBG_LVL.
Referenced by send(), and send_stop().
01450 { 01451 DBG_ENTRY_LVL("TransportSendStrategy", "direct_send", 6); 01452 01453 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01454 "Prepare the current packet for a direct send attempt.\n")); 01455 01456 // Prepare the packet for sending. 01457 this->prepare_packet(); 01458 01459 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01460 "Now attempt to send the packet.\n")); 01461 01462 // We will try resend the packet if the send() fails and then connection 01463 // is re-established. Only loops if the "continue" line is hit. 01464 while (true) { 01465 // Attempt to send the packet 01466 const SendPacketOutcome outcome = this->send_packet(); 01467 01468 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01469 "The outcome of the send_packet() was %d.\n", outcome)); 01470 01471 if ((outcome == OUTCOME_BACKPRESSURE) || 01472 (outcome == OUTCOME_PARTIAL_SEND)) { 01473 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01474 "The outcome of the send_packet() was either " 01475 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5); 01476 01477 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01478 "Flip into the MODE_QUEUE mode_.\n"), 5); 01479 01480 // We encountered backpressure, or only sent part of the packet. 01481 this->mode_ = MODE_QUEUE; 01482 01483 } else if ((outcome == OUTCOME_PEER_LOST) || 01484 (outcome == OUTCOME_SEND_ERROR)) { 01485 if (outcome == OUTCOME_SEND_ERROR) { 01486 ACE_ERROR((LM_WARNING, 01487 ACE_TEXT("(%P|%t) WARNING: Problem detected in ") 01488 ACE_TEXT("send buffer management: %p.\n"), 01489 ACE_TEXT("send_bytes"))); 01490 01491 if (Transport_debug_level > 0) { 01492 this->transport_inst_->dump(); 01493 } 01494 } else { 01495 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01496 "The outcome of the send_packet() was " 01497 "OUTCOME_PEER_LOST.\n")); 01498 } 01499 01500 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01501 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5); 01502 01503 if (this->mode_ != MODE_SUSPEND) { 01504 this->mode_before_suspend_ = this->mode_; 01505 this->mode_ = MODE_SUSPEND; 01506 } 01507 01508 if (relink) { 01509 bool do_suspend = false; 01510 this->relink(do_suspend); 01511 01512 if (this->mode_ == MODE_SUSPEND) { 01513 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01514 "The reconnect has not done yet and we are " 01515 "still in MODE_SUSPEND.\n"), 5); 01516 01517 } else if (this->mode_ == MODE_TERMINATED) { 01518 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01519 "Reconnect failed, we are in MODE_TERMINATED\n"), 5); 01520 01521 } else { 01522 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01523 "Try send the packet again since the connection " 01524 "is re-established.\n"), 5); 01525 01526 // Try send the packet again since the connection is re-established. 01527 continue; 01528 } 01529 } 01530 01531 } else { 01532 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01533 "The outcome of the send_packet() must have been " 01534 "OUTCOME_COMPLETE_SEND.\n")); 01535 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01536 "So, we will just stay in MODE_DIRECT.\n")); 01537 } 01538 01539 break; 01540 } 01541 01542 // We stay in MODE_DIRECT mode if we didn't encounter any backpressure. 01543 }
RemoveResult OpenDDS::DCPS::TransportSendStrategy::do_remove_sample | ( | const RepoId & | pub_id, | |
const TransportQueueElement::MatchCriteria & | criteria | |||
) | [protected, virtual] |
Implement framework chain visitations to remove a sample.
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1346 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, OpenDDS::DCPS::TransportHeader::length_, MODE_DIRECT, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::QueueRemoveVisitor::status(), and VDBG.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::do_remove_sample(), remove_all_msgs(), and remove_sample().
01348 { 01349 DBG_ENTRY_LVL("TransportSendStrategy", "do_remove_sample", 6); 01350 01351 //ciju: Tim had the idea that we could do the following check 01352 // if ((this->mode_ == MODE_DIRECT) || 01353 // ((this->pkt_chain_ == 0) && (queue_ == empty))) 01354 // then we can assume that the sample can be safely removed (no need for 01355 // replacement) from the elems_ queue. 01356 if ((this->mode_ == MODE_DIRECT) 01357 || ((this->pkt_chain_ == 0) && (this->queue_->size() == 0))) { 01358 //ciju: I believe this is the only mode where a safe 01359 // assumption can be made that the samples 01360 // in the elems_ queue aren't part of a packet. 01361 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01362 "The mode is MODE_DIRECT, or the queue is empty and no " 01363 "transport packet is in progress.\n")); 01364 01365 QueueRemoveVisitor simple_rem_vis(criteria); 01366 this->elems_->accept_remove_visitor(simple_rem_vis); 01367 01368 const RemoveResult status = simple_rem_vis.status(); 01369 01370 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) { 01371 this->header_.length_ -= simple_rem_vis.removed_bytes(); 01372 01373 } else if (status == REMOVE_NOT_FOUND) { 01374 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01375 "Failed to find the sample to remove.\n")); 01376 } 01377 01378 return status; 01379 } 01380 01381 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01382 "Visit the queue_ with the RemoveElementVisitor.\n")); 01383 01384 QueueRemoveVisitor simple_rem_vis(criteria); 01385 this->queue_->accept_remove_visitor(simple_rem_vis); 01386 01387 RemoveResult status = simple_rem_vis.status(); 01388 01389 if (status == REMOVE_RELEASED || status == REMOVE_FOUND) { 01390 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01391 "The sample was removed from the queue_.\n")); 01392 // This means that the visitor did not encounter any fatal error 01393 // along the way, *AND* the sample was found in the queue_, 01394 // and has now been removed. We are done. 01395 return status; 01396 } 01397 01398 if (status == REMOVE_ERROR) { 01399 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01400 "The RemoveElementVisitor encountered a fatal error in queue_.\n")); 01401 // This means that the visitor encountered some fatal error along 01402 // the way (and it already reported something to the log). 01403 // Return our failure code. 01404 return status; 01405 } 01406 01407 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01408 "The RemoveElementVisitor did not find the sample in queue_.\n")); 01409 01410 // We get here if the visitor did not encounter any fatal error, but it 01411 // also didn't find the sample - and hence it didn't perform any 01412 // "remove sample" logic. 01413 01414 // Now we need to turn our attention to the current transport packet, 01415 // since the packet is likely in a "partially sent" state, and the 01416 // sample may still be contributing unsent bytes in the pkt_chain_. 01417 01418 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01419 "Visit our elems_ with the PacketRemoveVisitor.\n")); 01420 01421 PacketRemoveVisitor pac_rem_vis(criteria, 01422 this->pkt_chain_, 01423 this->header_block_, 01424 this->replaced_element_allocator_, 01425 this->replaced_element_mb_allocator_, 01426 this->replaced_element_db_allocator_); 01427 01428 this->elems_->accept_replace_visitor(pac_rem_vis); 01429 01430 status = pac_rem_vis.status(); 01431 01432 if (status == REMOVE_ERROR) { 01433 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01434 "The PacketRemoveVisitor encountered a fatal error.\n")); 01435 01436 } else if (status == REMOVE_NOT_FOUND) { 01437 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01438 "The PacketRemoveVisitor didn't find the sample.\n")); 01439 01440 } else { 01441 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01442 "The PacketRemoveVisitor found the sample and removed it.\n")); 01443 } 01444 01445 return status; 01446 }
ssize_t OpenDDS::DCPS::TransportSendStrategy::do_send_packet | ( | const ACE_Message_Block * | packet, | |
int & | bp | |||
) | [private] |
Form an IOV and call the send_bytes() template method.
Definition at line 1719 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), send_bytes(), OpenDDS::DCPS::Transport_debug_level, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().
01720 { 01721 if (Transport_debug_level > 9) { 01722 ACE_DEBUG((LM_DEBUG, 01723 ACE_TEXT("(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ") 01724 ACE_TEXT("sending data at 0x%x.\n"), 01725 id(), packet)); 01726 } 01727 DBG_ENTRY_LVL("TransportSendStrategy", "do_send_packet", 6); 01728 01729 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01730 "Populate the iovec array using the packet.\n"), 5); 01731 01732 iovec iov[MAX_SEND_BLOCKS]; 01733 01734 int num_blocks = mb_to_iov(*packet, iov); 01735 01736 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01737 "There are [%d] number of entries in the iovec array.\n", 01738 num_blocks), 5); 01739 01740 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01741 "Attempt to send_bytes() now.\n"), 5); 01742 01743 const ssize_t num_bytes_sent = this->send_bytes(iov, num_blocks, bp); 01744 01745 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01746 "The send_bytes() said that num_bytes_sent == [%d].\n", 01747 num_bytes_sent), 5); 01748 01749 return num_bytes_sent; 01750 }
ACE_INLINE ACE_HANDLE OpenDDS::DCPS::TransportSendStrategy::get_handle | ( | ) | [virtual] |
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 129 of file TransportSendStrategy.inl.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().
void OpenDDS::DCPS::TransportSendStrategy::get_packet_elems_from_queue | ( | ) | [private] |
This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.
After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.
Definition at line 1546 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, elems_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, max_samples_, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), space_available(), and VDBG_LVL.
Referenced by perform_work().
01547 { 01548 DBG_ENTRY_LVL("TransportSendStrategy", "get_packet_elems_from_queue", 6); 01549 01550 for (TransportQueueElement* element = this->queue_->peek(); element != 0; 01551 element = this->queue_->peek()) { 01552 01553 // Total number of bytes in the current element's message block chain. 01554 size_t element_length = element->msg()->total_length(); 01555 01556 // Flag used to determine if the element requires a packet all to itself. 01557 const bool exclusive_packet = element->requires_exclusive_packet(); 01558 01559 const size_t avail = this->space_available(); 01560 01561 bool frag = false; 01562 if (element_length > avail) { 01563 // The current element won't fit into the current packet 01564 if (this->max_message_size()) { // fragmentation enabled 01565 this->header_.first_fragment_ = !element->is_fragment(); 01566 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting from queue\n"), 0); 01567 ElementPair ep = element->fragment(avail); 01568 element = ep.first; 01569 element_length = element->msg()->total_length(); 01570 this->queue_->replace_head(ep.second); 01571 frag = true; // queue_ is already taken care of, don't get() later 01572 } else { 01573 break; 01574 } 01575 } 01576 01577 // If exclusive and the current packet is empty, we won't violate the 01578 // exclusive_packet requirement by put()'ing the element 01579 // into the elems_ collection. 01580 if ((exclusive_packet && this->elems_->size() == 0) 01581 || !exclusive_packet) { 01582 // At this point, we have passed all of the pre-conditions and we can 01583 // now extract the current element from the queue_, put it into the 01584 // packet elems_, and adjust the packet header_.length_. 01585 this->elems_->put(frag ? element : this->queue_->get()); 01586 if (this->header_.length_ == 0) { 01587 this->header_.last_fragment_ = !frag && element->is_fragment(); 01588 } 01589 this->header_.length_ += static_cast<ACE_UINT32>(element_length); 01590 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Packetizing from queue\n"), 0); 01591 } 01592 01593 // With exclusive and (elems_.size() != 0), we don't use the current 01594 // element as part of the packet. We know that there is already 01595 // at least one element in the packet, and the current element 01596 // is going to need its own (exclusive) packet. We will just 01597 // use the packet elems_ as it is now. Always break once 01598 // we've encountered and dealt with the exclusive_packet case. 01599 // Also break if fragmentation was required. 01600 if (exclusive_packet || frag 01601 // If the current number of packet elems_ has reached the maximum 01602 // number of samples per packet, then we are done. 01603 || this->elems_->size() == this->max_samples_ 01604 // If the current value of the header_.length_ exceeds (or equals) 01605 // the optimum_size_ for a packet, then we are done. 01606 || this->header_.length_ >= this->optimum_size_) { 01607 break; 01608 } 01609 } 01610 }
ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode | ( | ) |
Definition at line 115 of file TransportSendStrategy.inl.
References mode_, and MODE_DIRECT.
00116 { 00117 return this->mode_ == MODE_DIRECT; 00118 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released | ( | bool | flag | ) |
Definition at line 41 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, and link_released_.
Referenced by OpenDDS::DCPS::MulticastSendStrategy::MulticastSendStrategy().
00042 { 00043 DBG_ENTRY_LVL("TransportSendStrategy","link_released",6); 00044 00045 GuardType guard(this->lock_); 00046 this->link_released_ = flag; 00047 }
void OpenDDS::DCPS::TransportSendStrategy::marshal_transport_header | ( | ACE_Message_Block * | mb | ) | [private, virtual] |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1699 of file TransportSendStrategy.cpp.
References header_.
Referenced by prepare_packet().
01700 { 01701 *mb << this->header_; 01702 }
ACE_INLINE size_t OpenDDS::DCPS::TransportSendStrategy::max_message_size | ( | ) | const [protected, virtual] |
The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.
Reimplemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Definition at line 142 of file TransportSendStrategy.inl.
Referenced by send(), and space_available().
int OpenDDS::DCPS::TransportSendStrategy::mb_to_iov | ( | const ACE_Message_Block & | msg, | |
iovec * | iov | |||
) | [static] |
Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
Definition at line 1898 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::MAX_SEND_BLOCKS.
Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().
01899 { 01900 int num_blocks = 0; 01901 #ifdef _MSC_VER 01902 #pragma warning(push) 01903 // iov_len is 32-bit on 64-bit VC++, but we don't want a cast here 01904 // since on other platforms iov_len is 64-bit 01905 #pragma warning(disable : 4267) 01906 #endif 01907 for (const ACE_Message_Block* block = &msg; 01908 block && num_blocks < MAX_SEND_BLOCKS; 01909 block = block->cont()) { 01910 iov[num_blocks].iov_len = block->length(); 01911 iov[num_blocks++].iov_base = block->rd_ptr(); 01912 } 01913 #ifdef _MSC_VER 01914 #pragma warning(pop) 01915 #endif 01916 return num_blocks; 01917 }
ACE_INLINE OpenDDS::DCPS::TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode | ( | ) | const |
Access the current sending mode.
Definition at line 13 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, and mode_.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().
00014 { 00015 DBG_ENTRY_LVL("TransportSendStrategy","mode",6); 00016 00017 return mode_; 00018 }
ACE_INLINE const char * OpenDDS::DCPS::TransportSendStrategy::mode_as_str | ( | SendMode | mode | ) | [static, private] |
Helper function to debugging.
Definition at line 101 of file TransportSendStrategy.inl.
Referenced by perform_work(), send(), and send_stop().
00102 { 00103 static const char* SendModeStr[] = { "MODE_NOT_SET", 00104 "MODE_DIRECT", 00105 "MODE_QUEUE", 00106 "MODE_SUSPEND", 00107 "MODE_TERMINATED", 00108 "UNKNOWN" 00109 }; 00110 00111 return SendModeStr [mode]; 00112 }
ssize_t OpenDDS::DCPS::TransportSendStrategy::non_blocking_send | ( | const iovec | iov[], | |
int | n, | |||
int & | bp | |||
) | [protected, virtual] |
Definition at line 1825 of file TransportSendStrategy.cpp.
References get_handle(), send_bytes_i(), VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().
01826 { 01827 int val = 0; 01828 ACE_HANDLE handle = this->get_handle(); 01829 01830 if (handle == ACE_INVALID_HANDLE) 01831 return -1; 01832 01833 ACE::record_and_set_non_blocking_mode(handle, val); 01834 01835 // Set the back-pressure flag to false. 01836 bp = 0; 01837 01838 // Clear errno 01839 errno = 0; 01840 01841 ssize_t result = this->send_bytes_i(iov, n); 01842 01843 if (result == -1) { 01844 if ((errno == EWOULDBLOCK) || (errno == ENOBUFS)) { 01845 VDBG((LM_DEBUG,"(%P|%t) DBG: " 01846 "Backpressure encountered.\n")); 01847 // Set the back-pressure flag to true 01848 bp = 1; 01849 01850 } else { 01851 VDBG_LVL((LM_ERROR, "(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n", 01852 ACE_TEXT("sendv"), n),1); 01853 01854 // try to get the application to core when "Bad Address" is returned 01855 // by looking at the iovec 01856 for (int ii = 0; ii < n; ii++) { 01857 ACE_DEBUG((LM_DEBUG, "(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n", 01858 ii, iov[ii].iov_len, iov[ii].iov_base)); 01859 } 01860 } 01861 } 01862 01863 VDBG_LVL((LM_DEBUG,"(%P|%t) DBG: " 01864 "The sendv() returned [%d].\n", result), 5); 01865 01866 ACE::restore_non_blocking_mode(handle, val); 01867 01868 return result; 01869 }
OpenDDS::DCPS::TransportSendStrategy::OPENDDS_VECTOR | ( | TQESendModePair | ) | [private] |
Referenced by send_delayed_notifications().
ThreadSynchWorker::WorkOutcome OpenDDS::DCPS::TransportSendStrategy::perform_work | ( | ) | [virtual] |
Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.
Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.
Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.
Implements OpenDDS::DCPS::ThreadSynchWorker.
Definition at line 138 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_delayed_notifications(), send_packet(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynch::work_available(), OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.
00139 { 00140 DBG_ENTRY_LVL("TransportSendStrategy","perform_work",6); 00141 00142 SendPacketOutcome outcome; 00143 bool no_more_work = false; 00144 00145 { // scope for the guard(this->lock_); 00146 GuardType guard(this->lock_); 00147 00148 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: perform_work mode: %C\n", mode_as_str(this->mode_)), 5); 00149 00150 if (this->mode_ == MODE_TERMINATED) { 00151 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00152 "Entered perform_work() and mode_ is MODE_TERMINATED - " 00153 "we lost connection and could not reconnect, just return " 00154 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5); 00155 return WORK_OUTCOME_BROKEN_RESOURCE; 00156 } 00157 00158 // The perform_work() is called by our synch_ object using 00159 // a thread designated to call this method when it thinks 00160 // we need to be called in order to "service" the queue_ and/or 00161 // deal with a partially sent current packet. 00162 // 00163 // We will return a 0 if we don't see a need to have our perform_work() 00164 // called again, and we will return a 1 if we do see the need to have our 00165 // perform_work() method called again. 00166 00167 // First, make sure that the mode_ indicates that we are, indeed, in 00168 // the MODE_QUEUE mode. If we are not in MODE_QUEUE mode (meaning we are 00169 // in MODE_DIRECT), then it means we didn't need to have this perform_work() 00170 // method called - in this case, do nothing other than return 00171 // WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we really don't 00172 // see a need for it to call our perform_work() again (at least not 00173 // right now). 00174 if (this->mode_ != MODE_QUEUE && this->mode_ != MODE_SUSPEND) { 00175 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00176 "Entered perform_work() and mode_ is %C - just return " 00177 "WORK_OUTCOME_NO_MORE_TO_DO.\n", mode_as_str(this->mode_)), 5); 00178 return WORK_OUTCOME_NO_MORE_TO_DO; 00179 } 00180 00181 // Check the "state" of the current packet. We will either find that the 00182 // current packet is in a state of being "partially sent", or we will find 00183 // it in a state of being "empty". When the current packet is "empty", it 00184 // means that it is time to build up the current packet using elements 00185 // extracted from the queue_, and then we will attempt to send the 00186 // packet. When we find the current packet in the "partially sent" state, 00187 // we will not touch the queue_ - we will just try to send the unsent 00188 // bytes in the current (partially sent) packet. 00189 const size_t header_length = this->header_.length_; 00190 00191 if (header_length == 0) { 00192 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00193 "The current packet doesn't have any unsent bytes - we " 00194 "need to 'populate' the current packet with elems from " 00195 "the queue.\n"), 5); 00196 00197 // The current packet is "empty". Build up the current packet using 00198 // elements from the queue_, and prepare the current packet to be sent. 00199 00200 // Before we build the packet from the queue_, let's make sure that 00201 // there is actually something on the queue_ to build from. 00202 if (this->queue_->size() == 0) { 00203 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00204 "But the queue is empty. We have cleared the " 00205 "backpressure situation.\n"),5); 00206 // We are here because the queue_ is empty, and there isn't 00207 // any "partial packet" bytes left to send. We have overcome 00208 // the backpressure situation and don't have anything to do 00209 // right now. 00210 00211 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00212 "Flip mode to MODE_DIRECT, and return " 00213 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5); 00214 00215 // Flip the mode back to MODE_DIRECT. 00216 this->mode_ = MODE_DIRECT; 00217 00218 // And return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that 00219 // perform_work() doesn't need to be called again (at this time). 00220 return WORK_OUTCOME_NO_MORE_TO_DO; 00221 } 00222 00223 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00224 "There is at least one elem in the queue - get the packet " 00225 "elems from the queue.\n"), 5); 00226 00227 // There is stuff in the queue_ if we get to this point in the logic. 00228 // Build-up the current packet using element(s) from the queue_. 00229 this->get_packet_elems_from_queue(); 00230 00231 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00232 "Prepare the packet from the packet elems_.\n"), 5); 00233 00234 // Now we can prepare the new packet to be sent. 00235 this->prepare_packet(); 00236 00237 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00238 "Packet has been prepared from packet elems_.\n"), 5); 00239 00240 } else { 00241 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00242 "We have a current packet that still has unsent bytes.\n"), 5); 00243 } 00244 00245 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00246 "Attempt to send the current packet.\n"), 5); 00247 00248 // Now we can attempt to send the current packet - whether it is 00249 // a "partially sent" packet or one that we just built-up using elements 00250 // from the queue_ (and subsequently prepared for sending) - it doesn't 00251 // matter. Just attempt to send as many of the "unsent" bytes in the 00252 // packet as possible. 00253 outcome = this->send_packet(); 00254 00255 // If we sent the whole packet (eg, partial_send is false), and the queue_ 00256 // is now empty, then we've cleared the backpressure situation. 00257 if ((outcome == OUTCOME_COMPLETE_SEND) && (this->queue_->size() == 0)) { 00258 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00259 "Flip the mode to MODE_DIRECT, and then return " 00260 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5); 00261 00262 // Revert back to MODE_DIRECT mode. 00263 this->mode_ = MODE_DIRECT; 00264 no_more_work = true; 00265 } 00266 } // End of scope for guard(this->lock_); 00267 00268 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00269 "The outcome of the send_packet() was %d.\n", outcome), 5); 00270 00271 send_delayed_notifications(); 00272 00273 // If we sent the whole packet (eg, partial_send is false), and the queue_ 00274 // is now empty, then we've cleared the backpressure situation. 00275 if (no_more_work) { 00276 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00277 "We sent the whole packet, and there is nothing left on " 00278 "the queue now.\n"), 5); 00279 00280 // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we 00281 // don't desire another call to this perform_work() method. 00282 return WORK_OUTCOME_NO_MORE_TO_DO; 00283 } 00284 00285 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00286 "We still have unsent bytes in the current packet AND/OR there " 00287 "are still elements in the queue.\n"), 5); 00288 00289 if ((outcome == OUTCOME_PEER_LOST) || (outcome == OUTCOME_SEND_ERROR)) { 00290 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00291 "We lost our connection, or had some fatal connection " 00292 "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5); 00293 00294 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00295 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5); 00296 00297 bool do_suspend = true; 00298 this->relink(do_suspend); 00299 00300 if (this->mode_ == MODE_SUSPEND) { 00301 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00302 "The reconnect has not done yet and we are still in MODE_SUSPEND. " 00303 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5); 00304 // Return WORK_OUTCOME_NO_MORE_TO_DO to tell our caller that we 00305 // don't desire another call to this perform_work() method. 00306 return WORK_OUTCOME_NO_MORE_TO_DO; 00307 00308 } else if (this->mode_ == MODE_TERMINATED) { 00309 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00310 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5); 00311 return WORK_OUTCOME_BROKEN_RESOURCE; 00312 00313 } else { 00314 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00315 "Reconnect succeeded, Notify synch thread of work " 00316 "availability.\n"), 5); 00317 // If the datalink is re-established then notify the synch 00318 // thread to perform work. We do not hold the object lock at 00319 // this point. 00320 this->synch_->work_available(); 00321 } 00322 } 00323 00324 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00325 "We still have an 'unbroken' connection.\n"), 5); 00326 00327 if (outcome == OUTCOME_BACKPRESSURE) { 00328 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00329 "We experienced backpressure on our attempt to send the " 00330 "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5); 00331 // We have a "clogged resource". 00332 return WORK_OUTCOME_CLOGGED_RESOURCE; 00333 } 00334 00335 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00336 "We may have sent the whole current packet, but still have " 00337 "elements on the queue.\n"), 5); 00338 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00339 "Or, we may have only partially sent the current packet.\n"), 5); 00340 00341 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00342 "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5); 00343 00344 // We may have had an OUTCOME_COMPLETE_SEND, but there is still stuff 00345 // in the queue_ to be sent. *OR* we have have had an OUTCOME_PARTIAL_SEND, 00346 // which equates to the same thing - we still have work to do. 00347 00348 // We are still in MODE_QUEUE mode, thus there is still work to be 00349 // done to service the queue_ and/or a partially sent current packet. 00350 // Return WORK_OUTCOME_MORE_TO_DO so that our caller knows that we still 00351 // want it to call this perform_work() method. 00352 return WORK_OUTCOME_MORE_TO_DO; 00353 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_header | ( | ) | [private] |
This method is responsible for updating the packet header. Called exclusively by prepare_packet.
Definition at line 1613 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.
Referenced by prepare_packet().
01614 { 01615 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_header", 6); 01616 01617 // Increment header sequence for packet: 01618 this->header_.sequence_ = ++this->header_sequence_; 01619 01620 // Allow the specific implementation the opportunity to set 01621 // values in the packet header. 01622 this->prepare_header_i(); 01623 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_header_i | ( | ) | [protected, virtual] |
Specific implementation processing of prepared packet header.
Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.
Definition at line 1626 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_header().
01627 { 01628 DBG_ENTRY_LVL("TransportSendStrategy","prepare_header_i",6); 01629 01630 // Default implementation does nothing. 01631 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_packet | ( | ) | [private] |
This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.
Definition at line 1634 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), OpenDDS::DCPS::BuildChainVisitor::chain(), DBG_ENTRY_LVL, elems_, header_block_, header_complete_, marshal_transport_header(), pkt_chain_, prepare_header(), prepare_packet_i(), and VDBG.
Referenced by direct_send(), and perform_work().
01635 { 01636 DBG_ENTRY_LVL("TransportSendStrategy", "prepare_packet", 6); 01637 01638 // Prepare the header for sending. 01639 this->prepare_header(); 01640 01641 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01642 "Marshal the packet header.\n")); 01643 01644 if (this->header_block_ != 0) { 01645 this->header_block_->release(); 01646 } 01647 01648 ACE_NEW_MALLOC(this->header_block_, 01649 static_cast<ACE_Message_Block*>(this->header_mb_allocator_->malloc()), 01650 ACE_Message_Block(this->max_header_size_, 01651 ACE_Message_Block::MB_DATA, 01652 0, 01653 0, 01654 0, 01655 0, 01656 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 01657 ACE_Time_Value::zero, 01658 ACE_Time_Value::max_time, 01659 this->header_db_allocator_, 01660 this->header_mb_allocator_)); 01661 01662 marshal_transport_header(this->header_block_); 01663 01664 this->pkt_chain_ = this->header_block_->duplicate(); 01665 01666 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01667 "Use a BuildChainVisitor to visit the packet elems_.\n")); 01668 01669 // Build up a chain of blocks by duplicating the message block chain 01670 // held by each element (in elems_), and then chaining the new duplicate 01671 // blocks together to form one long chain. 01672 BuildChainVisitor visitor; 01673 this->elems_->accept_visitor(visitor); 01674 01675 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01676 "Attach the visitor's chain of blocks to the lone (packet " 01677 "header) block currently in the pkt_chain_.\n")); 01678 01679 // Attach the visitor's chain of blocks to the packet header block. 01680 this->pkt_chain_->cont(visitor.chain()); 01681 01682 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01683 "Increment header sequence for next packet.\n")); 01684 01685 // Allow the specific implementation the opportunity to process the 01686 // newly prepared packet. 01687 this->prepare_packet_i(); 01688 01689 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01690 "Set the header_complete_ flag to false.\n")); 01691 01692 // Set the header_complete_ to false to indicate 01693 // that the first block in the pkt_chain_ is the packet header block 01694 // (actually a duplicate() of the packet header_block_). 01695 this->header_complete_ = false; 01696 }
void OpenDDS::DCPS::TransportSendStrategy::prepare_packet_i | ( | ) | [protected, virtual] |
Specific implementation processing of prepared packet.
Definition at line 1705 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_packet().
01706 { 01707 DBG_ENTRY_LVL("TransportSendStrategy","prepare_packet_i",6); 01708 01709 // Default implementation does nothing. 01710 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::relink | ( | bool | do_suspend = true |
) | [virtual] |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 50 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL.
Referenced by direct_send(), and perform_work().
00051 { 00052 DBG_ENTRY_LVL("TransportSendStrategy","relink",6); 00053 // The subsclass needs implement this function for re-establishing 00054 // the link upon send failure. 00055 }
void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs | ( | RepoId | pub_id | ) |
Definition at line 1299 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().
01300 { 01301 DBG_ENTRY_LVL("TransportSendStrategy","remove_all_msgs",6); 01302 01303 const TransportQueueElement::MatchOnPubId match(pub_id); 01304 send_delayed_notifications(&match); 01305 01306 GuardType guard(this->lock_); 01307 01308 if (this->send_buffer_ != 0) { 01309 // If a secondary send buffer is bound, removed samples must 01310 // be retained in order to properly maintain the buffer: 01311 this->send_buffer_->retain_all(pub_id); 01312 } 01313 01314 do_remove_sample(pub_id, match); 01315 }
RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample | ( | const DataSampleElement * | sample | ) |
Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.
Definition at line 1318 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.
01319 { 01320 DBG_ENTRY_LVL("TransportSendStrategy", "remove_sample", 6); 01321 01322 VDBG_LVL((LM_DEBUG, "(%P|%t) Removing sample: %@\n", sample->get_sample()), 5); 01323 01324 // The sample to remove is either in temporary delayed notification list or 01325 // internal list (elems_ or queue_). If it's going to be removed from temporary delayed 01326 // notification list by transport thread, it needs acquire WriterDataContainer lock for 01327 // data_dropped/data_delivered callback, then it needs wait for this remove_sample() call 01328 // complete as this call already hold the WriterContainer's lock. So this call is safe to 01329 // access the sample to remove. If it's going to be removed by this remove_sample() calling 01330 // thread, it will be removed either from delayed notification list or from internal list 01331 // in which case the element carry the info if the sample is released so the datalinkset 01332 // can stop calling rest datalinks to remove this sample if it's already released.. 01333 01334 const char* const payload = sample->get_sample()->cont()->rd_ptr(); 01335 RepoId pub_id = sample->get_pub_id(); 01336 const TransportQueueElement::MatchOnDataPayload modp(payload); 01337 if (send_delayed_notifications(&modp)) { 01338 return REMOVE_RELEASED; 01339 } 01340 01341 GuardType guard(this->lock_); 01342 return do_remove_sample(pub_id, modp); 01343 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send | ( | ) |
This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.
Definition at line 70 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, and start_counter_.
00071 { 00072 DBG_ENTRY_LVL("TransportSendStrategy","resume_send",6); 00073 GuardType guard(this->lock_); 00074 00075 // If this send strategy is reused when the connection is reestablished, then 00076 // we need re-initialize the mode_ and mode_before_suspend_. 00077 if (this->mode_ == MODE_TERMINATED) { 00078 this->header_.length_ = 0; 00079 this->pkt_chain_ = 0; 00080 this->header_complete_ = false; 00081 this->start_counter_ = 0; 00082 this->mode_ = MODE_DIRECT; 00083 this->mode_before_suspend_ = MODE_NOT_SET; 00084 this->delayed_delivered_notification_queue_.clear(); 00085 00086 } else if (this->mode_ == MODE_SUSPEND) { 00087 this->mode_ = this->mode_before_suspend_; 00088 this->mode_before_suspend_ = MODE_NOT_SET; 00089 if (this->queue_->size() > 0) { 00090 this->mode_ = MODE_QUEUE; 00091 this->synch_->work_available(); 00092 } 00093 00094 } else { 00095 ACE_ERROR((LM_ERROR, "(%P|%t) ERROR: TransportSendStrategy::resume_send The suspend or terminate" 00096 " is not called previously.\n")); 00097 } 00098 }
void OpenDDS::DCPS::TransportSendStrategy::send | ( | TransportQueueElement * | element, | |
bool | relink = true | |||
) |
Our DataLink has been requested by some particular TransportClient to send the element.
Definition at line 925 of file TransportSendStrategy.cpp.
References add_delayed_notification(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), max_message_size(), max_samples_, OpenDDS::DCPS::MIN_FRAG, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), space_available(), synch_, OpenDDS::DCPS::Transport_debug_level, VDBG, VDBG_LVL, and OpenDDS::DCPS::ThreadSynch::work_available().
00926 { 00927 if (Transport_debug_level > 9) { 00928 ACE_DEBUG((LM_DEBUG, 00929 ACE_TEXT("(%P|%t) TransportSendStrategy::send() [%d] - ") 00930 ACE_TEXT("sending data at 0x%x.\n"), 00931 id(), element)); 00932 } 00933 00934 DBG_ENTRY_LVL("TransportSendStrategy", "send", 6); 00935 00936 { 00937 GuardType guard(this->lock_); 00938 00939 if (this->link_released_) { 00940 this->add_delayed_notification(element); 00941 00942 } else { 00943 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) { 00944 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00945 "TransportSendStrategy::send: mode is MODE_TERMINATED and not in " 00946 "graceful disconnecting, so discard message.\n")); 00947 element->data_dropped(true); 00948 return; 00949 } 00950 00951 size_t element_length = element->msg()->total_length(); 00952 00953 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00954 "Send element msg() has total_length() == [%d].\n", 00955 element_length)); 00956 00957 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00958 "this->max_header_size_ == [%d].\n", 00959 this->max_header_size_)); 00960 00961 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00962 "this->max_size_ == [%d].\n", 00963 this->max_size_)); 00964 00965 const size_t max_message_size = this->max_message_size(); 00966 00967 // Really an assert. We can't accept any element that wouldn't fit into 00968 // a transport packet by itself (ie, it would be the only element in the 00969 // packet). This max_size_ is the user-configurable maximum, not based 00970 // on the transport's inherent maximum message size. If max_message_size 00971 // is non-zero, we will fragment so max_size_ doesn't apply per-element. 00972 if (max_message_size == 0 && 00973 this->max_header_size_ + element_length > this->max_size_) { 00974 ACE_ERROR((LM_ERROR, 00975 "(%P|%t) ERROR: Element too large (%Q) " 00976 "- won't fit into packet.\n", ACE_UINT64(element_length))); 00977 return; 00978 } 00979 00980 // Check the mode_ to see if we simply put the element on the queue. 00981 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) { 00982 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 00983 "this->mode_ == %C, so queue elem and leave.\n", 00984 mode_as_str(this->mode_)), 5); 00985 00986 this->queue_->put(element); 00987 00988 if (this->mode_ != MODE_SUSPEND) { 00989 this->synch_->work_available(); 00990 } 00991 00992 return; 00993 } 00994 00995 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00996 "this->mode_ == MODE_DIRECT.\n")); 00997 00998 // We are in the MODE_DIRECT send mode. When in this mode, the send() 00999 // calls will "build up" the transport packet to be sent directly when it 01000 // reaches the optimal size, contains the maximum number of samples, etc. 01001 01002 // We need to check if the current element (the arg passed-in to this 01003 // send() method) should be appended to the transport packet, or if the 01004 // transport packet should be sent (directly) first, dealing with the 01005 // current element afterwards. 01006 01007 // We will decide to send the packet as it is now, under two circumstances: 01008 // 01009 // Either: 01010 // 01011 // (1) The current element won't fit into the current packet since it 01012 // would violate the max_packet_size_. 01013 // 01014 // -OR- 01015 // 01016 // (2) There is at least one element already in the current packet, 01017 // and the current element says that it must be sent in an 01018 // exclusive packet (ie, in a packet all by itself). 01019 // 01020 const bool exclusive = element->requires_exclusive_packet(); 01021 01022 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01023 "The element %C require an exclusive packet.\n", 01024 (exclusive ? "DOES" : "does NOT") 01025 )); 01026 01027 const size_t space_needed = 01028 (max_message_size > 0) 01029 ? /* fragmenting */ DataSampleHeader::max_marshaled_size() + MIN_FRAG 01030 : /* not fragmenting */ element_length; 01031 01032 if ((exclusive && (this->elems_->size() != 0)) 01033 || (this->space_available() < space_needed)) { 01034 01035 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01036 "Element won't fit in current packet or requires exclusive" 01037 " - send current packet (directly) now.\n")); 01038 01039 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01040 "max_header_size_: %d, header_.length_: %d, element_length: %d\n" 01041 , this->max_header_size_, this->header_.length_, element_length)); 01042 01043 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01044 "Tot possible length: %d, max_len: %d\n" 01045 , this->max_header_size_ + this->header_.length_ + element_length 01046 , this->max_size_)); 01047 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01048 "current elem size: %d\n" 01049 , this->elems_->size())); 01050 01051 // Send the current packet, and deal with the current element 01052 // afterwards. 01053 // The invocation's relink status should dictate the direct_send's 01054 // relink. We don't want a (relink == false) invocation to end up 01055 // doing a relink. Think of (relink == false) as a non-blocking call. 01056 this->direct_send(relink); 01057 01058 // Now check to see if we flipped into MODE_QUEUE, which would mean 01059 // that the direct_send() experienced backpressure, and the 01060 // packet was only partially sent. If this has happened, we deal with 01061 // the current element by placing it on the queue (and then we are done). 01062 // 01063 // Otherwise, if the mode_ is still MODE_DIRECT, we can just 01064 // "drop" through to the next step in the logic where we append the 01065 // current element to the current packet. 01066 if (this->mode_ == MODE_QUEUE) { 01067 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01068 "We experienced backpressure on that direct send, as " 01069 "the mode_ is now MODE_QUEUE or MODE_SUSPEND. " 01070 "Queue elem and leave.\n"), 5); 01071 this->queue_->put(element); 01072 this->synch_->work_available(); 01073 01074 return; 01075 } 01076 } 01077 01078 // Loop for sending 'element', in fragments if needed 01079 bool first_pkt = true; // enter the loop 1st time through unconditionally 01080 for (TransportQueueElement* next_fragment = 0; 01081 (first_pkt || next_fragment) 01082 && (this->mode_ == MODE_DIRECT || this->mode_ == MODE_TERMINATED);) { 01083 // We do need to send in MODE_TERMINATED (GRACEFUL_DISCONNECT msg) 01084 01085 if (next_fragment) { 01086 element = next_fragment; 01087 element_length = next_fragment->msg()->total_length(); 01088 this->header_.first_fragment_ = false; 01089 } 01090 01091 this->header_.last_fragment_ = false; 01092 if (max_message_size) { // fragmentation enabled 01093 const size_t avail = this->space_available(); 01094 if (element_length > avail) { 01095 VDBG_LVL((LM_TRACE, "(%P|%t) DBG: Fragmenting\n"), 0); 01096 ElementPair ep = element->fragment(avail); 01097 element = ep.first; 01098 element_length = element->msg()->total_length(); 01099 next_fragment = ep.second; 01100 this->header_.first_fragment_ = first_pkt; 01101 } else if (next_fragment) { 01102 // We are sending the "tail" element of a previous fragment() 01103 // operation, and this element didn't itself require fragmentation 01104 this->header_.last_fragment_ = true; 01105 next_fragment = 0; 01106 } 01107 } 01108 first_pkt = false; 01109 01110 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01111 "Start the 'append elem' to current packet logic.\n")); 01112 01113 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01114 "Put element into current packet elems_.\n")); 01115 01116 // Now that we know the current element should go into the current 01117 // packet, we can just go ahead and "append" the current element to 01118 // the current packet. 01119 01120 // Add the current element to the collection of packet elements. 01121 this->elems_->put(element); 01122 01123 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01124 "Before, the header_.length_ == [%d].\n", 01125 this->header_.length_)); 01126 01127 // Adjust the header_.length_ to account for the length of the element. 01128 this->header_.length_ += static_cast<ACE_UINT32>(element_length); 01129 const size_t message_length = this->header_.length_; 01130 01131 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01132 "After adding element's length, the header_.length_ == [%d].\n", 01133 message_length)); 01134 01135 // The current packet now contains the current element. We need to 01136 // check to see if the conditions are such that we should go ahead and 01137 // attempt to send the packet "directly" now, or if we can just leave 01138 // and send the current packet later (in another send() call or in a 01139 // send_stop() call). 01140 01141 // There a few conditions that will cause us to attempt to send the 01142 // packet (directly) right now: 01143 // - Fragmentation was needed 01144 // - The current packet has the maximum number of samples per packet. 01145 // - The current packet's total length exceeds the optimum packet size. 01146 // - The current element (currently part of the packet elems_) 01147 // requires an exclusive packet. 01148 // 01149 if (next_fragment || (this->elems_->size() >= this->max_samples_) 01150 || (this->max_header_size_ + message_length > this->optimum_size_) 01151 || exclusive) { 01152 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01153 "Now the current packet looks full - send it (directly).\n")); 01154 01155 this->direct_send(relink); 01156 01157 if (next_fragment && this->mode_ != MODE_DIRECT) { 01158 if (this->mode_ == MODE_QUEUE) { 01159 this->queue_->put(next_fragment); 01160 this->synch_->work_available(); 01161 01162 } else { 01163 next_fragment->data_dropped(true /* dropped by transport */); 01164 } 01165 } else if (mode_ == MODE_QUEUE) { 01166 // Background thread handles packets in progress 01167 this->synch_->work_available(); 01168 } 01169 01170 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01171 "Back from the direct_send() attempt.\n")); 01172 01173 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01174 "And we %C as a result of the direct_send() call.\n", 01175 ((this->mode_ == MODE_QUEUE) ? "flipped into MODE_QUEUE" 01176 : "stayed in MODE_DIRECT"))); 01177 01178 } else { 01179 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01180 "Packet not sent. Send conditions weren't satisfied.\n")); 01181 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01182 "elems_->size(): %d, max_samples_: %d\n", 01183 int(this->elems_->size()), int(this->max_samples_))); 01184 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01185 "header_size_: %d, optimum_size_: %d\n", 01186 int(this->max_header_size_ + message_length), 01187 int(this->optimum_size_))); 01188 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01189 "element_requires_exclusive_packet: %d\n", int(exclusive))); 01190 01191 if (this->mode_ == MODE_QUEUE) { 01192 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01193 "We flipped into MODE_QUEUE.\n")); 01194 01195 } else { 01196 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01197 "We stayed in MODE_DIRECT.\n")); 01198 } 01199 } 01200 } 01201 } 01202 } 01203 01204 send_delayed_notifications(); 01205 }
void OpenDDS::DCPS::TransportSendStrategy::send_buffer | ( | TransportSendBuffer * | send_buffer | ) |
Assigns an optional send buffer.
Definition at line 128 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.
Referenced by OpenDDS::DCPS::MulticastDataLink::send_strategy().
00129 { 00130 this->send_buffer_ = send_buffer; 00131 00132 if (this->send_buffer_ != 0) { 00133 this->send_buffer_->bind(this); 00134 } 00135 }
ACE_INLINE ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes | ( | const iovec | iov[], | |
int | n, | |||
int & | bp | |||
) | [protected, virtual] |
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 121 of file TransportSendStrategy.inl.
References send_bytes_i().
Referenced by do_send_packet().
00124 { 00125 return send_bytes_i(iov, n); 00126 }
virtual ssize_t OpenDDS::DCPS::TransportSendStrategy::send_bytes_i | ( | const iovec | iov[], | |
int | n | |||
) | [protected, pure virtual] |
Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by non_blocking_send(), and send_bytes().
bool OpenDDS::DCPS::TransportSendStrategy::send_delayed_notifications | ( | const TransportQueueElement::MatchCriteria * | match = 0 |
) | [private] |
If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.
Definition at line 663 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_shutdown_.
Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().
00664 { 00665 DBG_ENTRY_LVL("TransportSendStrategy","send_delayed_notifications",6); 00666 TransportQueueElement* sample = 0; 00667 SendMode mode = MODE_NOT_SET; 00668 00669 OPENDDS_VECTOR(TQESendModePair) samples; 00670 00671 size_t num_delayed_notifications = 0; 00672 bool found_element = false; 00673 00674 { 00675 GuardType guard(lock_); 00676 00677 num_delayed_notifications = delayed_delivered_notification_queue_.size(); 00678 00679 if (num_delayed_notifications == 0) { 00680 return false; 00681 00682 } else if (num_delayed_notifications == 1) { 00683 // Optimization for the most common case (doesn't need vectors) 00684 00685 if (!match || match->matches(*delayed_delivered_notification_queue_[0].first)) { 00686 found_element = true; 00687 sample = delayed_delivered_notification_queue_[0].first; 00688 mode = delayed_delivered_notification_queue_[0].second; 00689 00690 delayed_delivered_notification_queue_.clear(); 00691 } 00692 00693 } else { 00694 OPENDDS_VECTOR(TQESendModePair)::iterator iter; 00695 for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) { 00696 sample = iter->first; 00697 mode = iter->second; 00698 if (!match || match->matches(*sample)) { 00699 found_element = true; 00700 samples.push_back(*iter); 00701 iter = delayed_delivered_notification_queue_.erase(iter); 00702 } else { 00703 ++iter; 00704 } 00705 } 00706 } 00707 } 00708 00709 if (!found_element) 00710 return false; 00711 00712 if (num_delayed_notifications == 1) { 00713 // optimization for the common case 00714 if (mode == MODE_TERMINATED) { 00715 if (!transport_shutdown_ || sample->owned_by_transport()) { 00716 sample->data_dropped(true); 00717 } 00718 } else { 00719 if (!transport_shutdown_ || sample->owned_by_transport()) { 00720 sample->data_delivered(); 00721 } 00722 } 00723 00724 } else { 00725 for (size_t i = 0; i < samples.size(); ++i) { 00726 if (samples[i].second == MODE_TERMINATED) { 00727 if (!transport_shutdown_ || samples[i].first->owned_by_transport()) { 00728 samples[i].first->data_dropped(true); 00729 } 00730 } else { 00731 if (!transport_shutdown_ || samples[i].first->owned_by_transport()) { 00732 samples[i].first->data_delivered(); 00733 } 00734 } 00735 } 00736 } 00737 return true; 00738 }
TransportSendStrategy::SendPacketOutcome OpenDDS::DCPS::TransportSendStrategy::send_packet | ( | ) | [private] |
This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().
Definition at line 1753 of file TransportSendStrategy.cpp.
References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), OpenDDS::DCPS::TransportSendBuffer::insert(), OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, send_buffer_, VDBG, and VDBG_LVL.
Referenced by direct_send(), and perform_work().
01754 { 01755 DBG_ENTRY_LVL("TransportSendStrategy", "send_packet", 6); 01756 01757 int bp_flag = 0; 01758 const ssize_t num_bytes_sent = 01759 this->do_send_packet(this->pkt_chain_, bp_flag); 01760 01761 if (num_bytes_sent == 0) { 01762 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01763 "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5); 01764 // This means that the peer has disconnected. 01765 return OUTCOME_PEER_LOST; 01766 } 01767 01768 if (num_bytes_sent < 0) { 01769 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01770 "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5); 01771 01772 // Check for backpressure... 01773 if (bp_flag == 1) { 01774 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01775 "Since backpressure flag is true, return " 01776 "OUTCOME_BACKPRESSURE.\n"), 5); 01777 // Ok. Not really an error - just backpressure. 01778 return OUTCOME_BACKPRESSURE; 01779 } 01780 01781 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01782 "Since backpressure flag is false, return " 01783 "OUTCOME_SEND_ERROR.\n"), 5); 01784 01785 // Not backpressure - it's a real error. 01786 // Note: moved this to send_bytes so the errno msg could be written. 01787 //ACE_ERROR((LM_ERROR, 01788 // "(%P|%t) ERROR: Call to peer().send() failed with negative " 01789 // "return code.\n")); 01790 01791 return OUTCOME_SEND_ERROR; 01792 } 01793 01794 if (this->send_buffer_ != 0) { 01795 // If a secondary send buffer is bound, sent samples must 01796 // be inserted in order to properly maintain the buffer: 01797 this->send_buffer_->insert(this->header_.sequence_, 01798 this->elems_, this->pkt_chain_); 01799 } 01800 01801 VDBG_LVL((LM_DEBUG, "(%P|%t) DBG: " 01802 "Since num_bytes_sent > 0, adjust the packet to account for " 01803 "the bytes that did get sent.\n"),5); 01804 01805 // We sent some bytes - adjust the current packet (elems_ and pkt_chain_) 01806 // to account for the bytes that have been sent. 01807 const int result = 01808 this->adjust_packet_after_send(num_bytes_sent); 01809 01810 if (result == 0) { 01811 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01812 "The adjustment logic says that the complete packet was " 01813 "sent. Return OUTCOME_COMPLETE_SEND.\n")); 01814 return OUTCOME_COMPLETE_SEND; 01815 } 01816 01817 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01818 "The adjustment logic says that only a part of the packet was " 01819 "sent. Return OUTCOME_PARTIAL_SEND.\n")); 01820 01821 return OUTCOME_PARTIAL_SEND; 01822 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start | ( | ) |
Invoked prior to one or more send() invocations from a particular TransportClient.
Definition at line 30 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, and start_counter_.
00031 { 00032 DBG_ENTRY_LVL("TransportSendStrategy","send_start",6); 00033 00034 GuardType guard(this->lock_); 00035 00036 if (!this->link_released_) 00037 ++this->start_counter_; 00038 }
void OpenDDS::DCPS::TransportSendStrategy::send_stop | ( | RepoId | repoId | ) |
Invoked after one or more send() invocations from a particular TransportClient.
Definition at line 1208 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, direct_send(), header_, OpenDDS::DCPS::TransportHeader::length_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), start_counter_, synch_, VDBG, VDBG_LVL, and OpenDDS::DCPS::ThreadSynch::work_available().
01209 { 01210 DBG_ENTRY_LVL("TransportSendStrategy","send_stop",6); 01211 { 01212 GuardType guard(this->lock_); 01213 01214 if (this->link_released_) 01215 return; 01216 01217 if (this->start_counter_ == 0) { 01218 // This is an indication of a logic error. This is more of an assert. 01219 VDBG_LVL((LM_ERROR, 01220 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5); 01221 return; 01222 } 01223 01224 --this->start_counter_; 01225 01226 if (this->start_counter_ != 0) { 01227 // This wasn't the last send_stop() that we are expecting. We only 01228 // really honor the first send_start() and the last send_stop(). 01229 // We can return without doing anything else in this case. 01230 return; 01231 } 01232 01233 if (this->mode_ == MODE_TERMINATED && !this->graceful_disconnecting_) { 01234 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01235 "TransportSendStrategy::send_stop: dont try to send current packet " 01236 "since mode is MODE_TERMINATED and not in graceful disconnecting.\n")); 01237 return; 01238 } 01239 01240 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01241 "This is an 'important' send_stop() event since our " 01242 "start_counter_ is 0.\n")); 01243 01244 // We just caused the start_counter_ to become zero. This 01245 // means that we aren't expecting another send() or send_stop() at any 01246 // time in the near future (ie, it isn't imminent). 01247 01248 // If our mode_ is currently MODE_QUEUE or MODE_SUSPEND, then we don't have 01249 // anything to do here because samples have already been going to the 01250 // queue. 01251 01252 // We only need to do something if the mode_ is 01253 // MODE_DIRECT. It means that we may have some sample(s) in the 01254 // current packet that have never been sent. This is our 01255 // opportunity to send the current packet directly if this is the case. 01256 if (this->mode_ == MODE_QUEUE || this->mode_ == MODE_SUSPEND) { 01257 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01258 "But since we are in %C, we don't have to do " 01259 "anything more in this important send_stop().\n", 01260 mode_as_str(this->mode_))); 01261 // We don't do anything if we are in MODE_QUEUE. Just leave. 01262 return; 01263 } 01264 01265 size_t header_length = this->header_.length_; 01266 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01267 "We are in MODE_DIRECT in an important send_stop() - " 01268 "header_.length_ == [%d].\n", header_length)); 01269 01270 // Only attempt to send the current packet (directly) if the current 01271 // packet actually contains something (it could be empty). 01272 if ((header_length > 0) && 01273 //(this->elems_->size ()+this->not_yet_pac_q_->size() > 0)) 01274 (this->elems_->size() > 0)) { 01275 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01276 "There is something in the current packet - attempt to send " 01277 "it (directly) now.\n")); 01278 // If a relink needs to be done for this packet to be sent, do it. 01279 this->direct_send(true); 01280 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01281 "Back from the attempt to send leftover packet directly.\n")); 01282 01283 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01284 "But we %C as a result.\n", 01285 ((this->mode_ == MODE_QUEUE)? "flipped into MODE_QUEUE": 01286 "stayed in MODE_DIRECT" ))); 01287 if (this->mode_ == MODE_QUEUE && this->mode_ != MODE_SUSPEND) { 01288 VDBG((LM_DEBUG, "(%P|%t) DBG: " 01289 "Notify Synch thread of work availability\n")); 01290 this->synch_->work_available(); 01291 } 01292 } 01293 } 01294 01295 send_delayed_notifications(); 01296 }
void OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting | ( | bool | flag | ) | [protected] |
Set graceful disconnecting flag.
Definition at line 1713 of file TransportSendStrategy.cpp.
References graceful_disconnecting_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().
01714 { 01715 this->graceful_disconnecting_ = flag; 01716 }
size_t OpenDDS::DCPS::TransportSendStrategy::space_available | ( | ) | const [private] |
How much space is available in the current packet before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]
Definition at line 1887 of file TransportSendStrategy.cpp.
References header_, OpenDDS::DCPS::TransportHeader::length_, max_header_size_, max_message_size(), and max_size_.
Referenced by get_packet_elems_from_queue(), and send().
01888 { 01889 const size_t used = this->max_header_size_ + this->header_.length_, 01890 max_msg = this->max_message_size(); 01891 if (max_msg) { 01892 return std::min(this->max_size_ - used, max_msg - used); 01893 } 01894 return this->max_size_ - used; 01895 }
int OpenDDS::DCPS::TransportSendStrategy::start | ( | ) |
Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.
Definition at line 828 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_add_ref(), OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, and send_buffer_.
00829 { 00830 DBG_ENTRY_LVL("TransportSendStrategy","start",6); 00831 00832 { 00833 GuardType guard(this->lock_); 00834 00835 if (!this->start_i()) { 00836 return -1; 00837 } 00838 } 00839 00840 size_t header_chunks(1); 00841 00842 // If a secondary send buffer is bound, sent headers should 00843 // be cached to properly maintain the buffer: 00844 if (this->send_buffer_ != 0) { 00845 header_chunks += this->send_buffer_->capacity(); 00846 00847 } else { 00848 header_chunks += 1; 00849 } 00850 00851 ACE_NEW_RETURN(this->header_db_allocator_, 00852 TransportDataBlockAllocator(header_chunks), 00853 -1); 00854 00855 ACE_NEW_RETURN(this->header_mb_allocator_, 00856 TransportMessageBlockAllocator(header_chunks), 00857 -1); 00858 00859 // Since we (the TransportSendStrategy object) are a reference-counted 00860 // object, but the synch_ object doesn't necessarily know this, we need 00861 // to give a "copy" of a reference to ourselves to the synch_ object here. 00862 // We will do the reverse when we unregister ourselves (as a worker) from 00863 // the synch_ object. 00864 //MJM: The synch thingie knows to not "delete" us, right? 00865 this->_add_ref(); 00866 00867 if (this->synch_->register_worker(this) == -1) { 00868 // Take back our "copy". 00869 this->_remove_ref(); 00870 ACE_ERROR_RETURN((LM_ERROR, 00871 "(%P|%t) ERROR: TransportSendStrategy failed to register " 00872 "as a worker with the ThreadSynch object.\n"), 00873 -1); 00874 } 00875 00876 return 0; 00877 }
virtual bool OpenDDS::DCPS::TransportSendStrategy::start_i | ( | ) | [inline, virtual] |
Let the subclass start.
Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.
Definition at line 123 of file TransportSendStrategy.h.
void OpenDDS::DCPS::TransportSendStrategy::stop | ( | ) |
Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.
Definition at line 880 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::RcObject< ACE_SYNCH_MUTEX >::_remove_ref(), DBG_ENTRY_LVL, header_block_, header_db_allocator_, header_mb_allocator_, pkt_chain_, stop_i(), synch_, and OpenDDS::DCPS::ThreadSynch::unregister_worker().
00881 { 00882 DBG_ENTRY_LVL("TransportSendStrategy","stop",6); 00883 00884 if (this->header_block_ != 0) { 00885 this->header_block_->release (); 00886 this->header_block_ = 0; 00887 } 00888 00889 this->synch_->unregister_worker(); 00890 00891 // Since we gave the synch_ a "copy" of a reference to ourselves, we need 00892 // to take it back now. 00893 this->_remove_ref(); 00894 00895 { 00896 GuardType guard(this->lock_); 00897 00898 if (this->pkt_chain_ != 0) { 00899 size_t size = this->pkt_chain_->total_length(); 00900 00901 if (size > 0) { 00902 this->pkt_chain_->release(); 00903 ACE_DEBUG((LM_WARNING, 00904 ACE_TEXT("(%P|%t) WARNING: TransportSendStrategy::stop() - ") 00905 ACE_TEXT("terminating with %d unsent bytes.\n"), 00906 size)); 00907 } 00908 } 00909 } 00910 00911 delete this->header_mb_allocator_; 00912 delete this->header_db_allocator_; 00913 00914 { 00915 GuardType guard(this->lock_); 00916 00917 this->stop_i(); 00918 } 00919 00920 // TBD SOON - What about all of the samples that may still be stuck in 00921 // our queue_ and/or elems_? 00922 }
virtual void OpenDDS::DCPS::TransportSendStrategy::stop_i | ( | ) | [pure virtual] |
Let the subclass stop.
Implemented in OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::TcpSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by stop().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send | ( | ) |
This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.
Definition at line 58 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.
00059 { 00060 DBG_ENTRY_LVL("TransportSendStrategy","suspend_send",6); 00061 GuardType guard(this->lock_); 00062 00063 if (this->mode_ != MODE_TERMINATED && this->mode_ != MODE_SUSPEND) { 00064 this->mode_before_suspend_ = this->mode_; 00065 this->mode_ = MODE_SUSPEND; 00066 } 00067 }
ACE_INLINE OpenDDS::DCPS::ThreadSynch * OpenDDS::DCPS::TransportSendStrategy::synch | ( | ) | const [protected] |
Definition at line 22 of file TransportSendStrategy.inl.
References DBG_ENTRY_LVL, and synch_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().
00023 { 00024 DBG_ENTRY_LVL("TransportSendStrategy","synch",6); 00025 00026 return synch_; 00027 }
void OpenDDS::DCPS::TransportSendStrategy::terminate_send | ( | bool | graceful_disconnecting = false |
) |
Remove all samples in the backpressure queue and packet queue.
This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.
Definition at line 742 of file TransportSendStrategy.cpp.
References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.
00743 { 00744 DBG_ENTRY_LVL("TransportSendStrategy","terminate_send",6); 00745 00746 bool reset_flag = true; 00747 00748 { 00749 GuardType guard(this->lock_); 00750 00751 // If the terminate_send call due to a non-graceful disconnection before 00752 // a datalink shutdown then we will not try to send the graceful disconnect 00753 // message. 00754 if ((this->mode_ == MODE_TERMINATED || this->mode_ == MODE_SUSPEND) 00755 && !this->graceful_disconnecting_) { 00756 VDBG((LM_DEBUG, "(%P|%t) DBG: " 00757 "It was already terminated non gracefully, will not set to graceful disconnecting \n")); 00758 reset_flag = false; 00759 } 00760 } 00761 00762 VDBG((LM_DEBUG, "(%P|%t) DBG: Now flip to MODE_TERMINATED \n")); 00763 00764 this->clear(MODE_TERMINATED); 00765 00766 if (reset_flag) { 00767 GuardType guard(this->lock_); 00768 this->graceful_disconnecting_ = graceful_disconnecting; 00769 } 00770 }
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::transport_shutdown | ( | ) |
Informed transport shutdown so no more notifications to listener.
Definition at line 136 of file TransportSendStrategy.inl.
References transport_shutdown_.
00137 { 00138 this->transport_shutdown_ = true; 00139 }
friend class TransportSendBuffer [friend] |
Definition at line 393 of file TransportSendStrategy.h.
Current elements that have contributed blocks to the current transport packet.
Definition at line 326 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), send(), and ~TransportSendStrategy().
Definition at line 384 of file TransportSendStrategy.h.
Referenced by set_graceful_disconnecting(), and terminate_send().
Current transport packet header.
Definition at line 401 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), get_packet_elems_from_queue(), marshal_transport_header(), perform_work(), prepare_header(), OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i(), resume_send(), send(), send_stop(), and space_available().
ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::header_block_ [private] |
Current transport packet header, marshalled.
Definition at line 319 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and stop().
bool OpenDDS::DCPS::TransportSendStrategy::header_complete_ [private] |
Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.
Definition at line 334 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().
Allocator for header message block.
Definition at line 363 of file TransportSendStrategy.h.
Referenced by stop().
TransportMessageBlockAllocator* OpenDDS::DCPS::TransportSendStrategy::header_mb_allocator_ [private] |
Allocator for header data block.
Definition at line 360 of file TransportSendStrategy.h.
Referenced by stop().
Current transport header sequence number.
Definition at line 322 of file TransportSendStrategy.h.
Referenced by prepare_header().
bool OpenDDS::DCPS::TransportSendStrategy::link_released_ [private] |
This lock will protect critical sections of code that play a role in the sending of data.
Definition at line 370 of file TransportSendStrategy.h.
Referenced by send_delayed_notifications().
size_t OpenDDS::DCPS::TransportSendStrategy::max_header_size_ [private] |
Maximum marshalled size of the transport packet header.
Definition at line 316 of file TransportSendStrategy.h.
Referenced by space_available(), and TransportSendStrategy().
size_t OpenDDS::DCPS::TransportSendStrategy::max_samples_ [private] |
Configuration - max number of samples per transport packet.
Definition at line 300 of file TransportSendStrategy.h.
Referenced by get_packet_elems_from_queue(), and send().
ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::max_size_ [private] |
Configuration - max transport packet size (bytes).
Definition at line 306 of file TransportSendStrategy.h.
Referenced by space_available().
This mode determines how send() calls will be handled.
Definition at line 348 of file TransportSendStrategy.h.
Referenced by clear(), direct_send(), isDirectMode(), mode(), perform_work(), resume_send(), send(), and suspend_send().
This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.
Definition at line 353 of file TransportSendStrategy.h.
Referenced by clear(), direct_send(), resume_send(), and suspend_send().
ACE_UINT32 OpenDDS::DCPS::TransportSendStrategy::optimum_size_ [private] |
Configuration - optimum transport packet size (bytes).
Definition at line 303 of file TransportSendStrategy.h.
Referenced by get_packet_elems_from_queue().
ACE_Message_Block* OpenDDS::DCPS::TransportSendStrategy::pkt_chain_ [private] |
Current (head of chain) block containing unsent bytes for the current transport packet.
Definition at line 330 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), prepare_packet(), resume_send(), and stop().
Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.
Definition at line 313 of file TransportSendStrategy.h.
Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), send(), and ~TransportSendStrategy().
TransportReplacedElementAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_allocator_ [private] |
Cached allocator for TransportReplaceElement.
Definition at line 373 of file TransportSendStrategy.h.
Referenced by TransportSendStrategy().
Definition at line 375 of file TransportSendStrategy.h.
MessageBlockAllocator OpenDDS::DCPS::TransportSendStrategy::replaced_element_mb_allocator_ [private] |
Definition at line 374 of file TransportSendStrategy.h.
TransportRetainedElementAllocator* OpenDDS::DCPS::TransportSendStrategy::retained_element_allocator_ [private] |
Cached allocator for TransportRetainedElements used by reliable datagram transports to retain PDUs after they have been sent. This is created in start if the transport needs it.
Definition at line 380 of file TransportSendStrategy.h.
Definition at line 388 of file TransportSendStrategy.h.
Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().
unsigned OpenDDS::DCPS::TransportSendStrategy::start_counter_ [private] |
Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accomodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().
Definition at line 345 of file TransportSendStrategy.h.
Referenced by clear(), resume_send(), send_start(), and send_stop().
The thread synch object.
Definition at line 366 of file TransportSendStrategy.h.
Referenced by perform_work(), send(), send_stop(), stop(), synch(), TransportSendStrategy(), and ~TransportSendStrategy().
bool OpenDDS::DCPS::TransportSendStrategy::transport_shutdown_ [private] |
Definition at line 395 of file TransportSendStrategy.h.
Referenced by send_delayed_notifications(), and transport_shutdown().
const size_t OpenDDS::DCPS::TransportSendStrategy::UDP_MAX_MESSAGE_SIZE = 65466 [static, protected] |
Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.
Definition at line 182 of file TransportSendStrategy.h.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().