32 #if !defined (__ACE_INLINE__) 46 #define NUM_REPLACED_ELEMENT_CHUNKS 20 53 static const size_t MIN_FRAG = 68;
74 header_complete_(false),
77 mode_before_suspend_(MODE_NOT_SET),
81 transport_(transport),
82 graceful_disconnecting_(false),
86 DBG_ENTRY_LVL(
"TransportSendStrategy",
"TransportSendStrategy",6);
97 synch_.reset(thread_sync_strategy->create_synch_object(
110 delayed_delivered_notification_queue_.reserve(
max_samples_);
115 DBG_ENTRY_LVL(
"TransportSendStrategy",
"~TransportSendStrategy",6);
118 delayed_delivered_notification_queue_.clear();
137 bool no_more_work =
false;
146 "Entered perform_work() and mode_ is MODE_TERMINATED - " 147 "we lost connection and could not reconnect, just return " 148 "WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
170 "Entered perform_work() and mode_ is %C - just return " 185 if (header_length == 0) {
187 "The current packet doesn't have any unsent bytes - we " 188 "need to 'populate' the current packet with elems from " 198 "But the queue is empty. We have cleared the " 199 "backpressure situation.\n"),5);
206 "Flip mode to MODE_DIRECT, and return " 207 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
218 "There is at least one elem in the queue - get the packet " 219 "elems from the queue.\n"), 5);
226 "Prepare the packet from the packet elems_.\n"), 5);
232 "Packet has been prepared from packet elems_.\n"), 5);
236 "We have a current packet that still has unsent bytes.\n"), 5);
240 "Attempt to send the current packet.\n"), 5);
253 "Flip the mode to MODE_DIRECT, and then return " 254 "WORK_OUTCOME_NO_MORE_TO_DO.\n"), 5);
263 "The outcome of the send_packet() was %d.\n", outcome), 5);
271 "We sent the whole packet, and there is nothing left on " 272 "the queue now.\n"), 5);
280 "We still have unsent bytes in the current packet AND/OR there " 281 "are still elements in the queue.\n"), 5);
285 "We lost our connection, or had some fatal connection " 286 "error. Return WORK_OUTCOME_BROKEN_RESOURCE.\n"), 5);
289 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
291 bool do_suspend =
true;
296 "The reconnect has not done yet and we are still in MODE_SUSPEND. " 297 "Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
304 "Reconnect failed, now we are in MODE_TERMINATED\n"), 5);
309 "Reconnect succeeded, Notify synch thread of work " 310 "availability.\n"), 5);
319 "We still have an 'unbroken' connection.\n"), 5);
323 "We experienced backpressure on our attempt to send the " 324 "packet. Return WORK_OUTCOME_CLOGGED_RESOURCE.\n"), 5);
330 "We may have sent the whole current packet, but still have " 331 "elements on the queue.\n"), 5);
333 "Or, we may have only partially sent the current packet.\n"), 5);
336 "Either way, we return WORK_OUTCOME_MORE_TO_DO now.\n"), 5);
365 DBG_ENTRY_LVL(
"TransportSendStrategy",
"adjust_packet_after_send", 6);
368 "Adjusting the current packet because %d bytes of the packet " 369 "have been sent.\n", num_bytes_sent));
371 ssize_t num_bytes_left = num_bytes_sent;
372 ssize_t num_non_header_bytes_sent = 0;
375 "Set num_bytes_left to %d.\n", num_bytes_left));
377 "Set num_non_header_bytes_sent to %d.\n",
378 num_non_header_bytes_sent));
381 "Peek at the element at the front of the packet elems_.\n"));
387 ACE_DEBUG((
LM_INFO,
"(%P|%t) WARNING: adjust_packet_after_send skipping due to NULL element\n"));
390 "Use the element's msg() to find the last block in " 391 "the msg() chain.\n"));
397 "Start with tail block == element->msg().\n"));
399 while (elem_tail_block->
cont() != 0) {
401 "Set tail block to its cont() block (next in chain).\n"));
402 elem_tail_block = elem_tail_block->
cont();
406 "Tail block now set (because tail block's cont() is 0).\n"));
409 "Start the 'while (num_bytes_left > 0)' loop.\n"));
411 while (num_bytes_left > 0) {
413 "At top of 'num bytes left' loop. num_bytes_left == [%d].\n",
419 "Length of block at front of pkt_chain_ is [%d].\n",
422 if (block_length <= num_bytes_left) {
424 "The whole block at the front of pkt_chain_ was sent.\n"));
431 "Extract the fully sent block from the pkt_chain_.\n"));
436 "Set pkt_chain_ to pkt_chain_->cont().\n"));
441 "Set the fully sent block's cont() to 0.\n"));
443 fully_sent_block->
cont(0);
447 num_bytes_left -= block_length;
450 "Updated num_bytes_left to account for fully sent " 451 "block (block_length == [%d]).\n", block_length));
453 "Now, num_bytes_left == [%d].\n", num_bytes_left));
457 "Since the header_complete_ flag is false, it means " 458 "that the packet header block was still in the " 462 "Not anymore... Set the header_complete_ flag " 470 "Release the fully sent block.\n"));
477 "Since the header_complete_ flag is true, it means " 478 "that the packet header block was not in the " 481 "So, the fully sent block was part of an element.\n"));
488 num_non_header_bytes_sent += block_length;
491 "Updated num_non_header_bytes_sent to account for " 492 "fully sent block (block_length == [%d]).\n",
496 "Now, num_non_header_bytes_sent == [%d].\n",
497 num_non_header_bytes_sent));
499 if (fully_sent_block->
base() == elem_tail_block->
base()) {
501 "Ok. The fully sent block was a duplicate of " 502 "the tail block of the element that is at the " 503 "front of the packet elems_.\n"));
506 "This means that we have completely sent the " 507 "element at the front of the packet elems_.\n"));
513 "We can release the fully sent block now.\n"));
519 "We can extract the element from the front of " 520 "the packet elems_ (we were just peeking).\n"));
526 "Tell the element that a decision has been made " 527 "regarding its fate - data_delivered().\n"));
533 "Peek at the next element in the packet " 541 "The is an element still in the packet " 542 "elems_ (we are peeking at it now).\n"));
545 "We are going to find the tail block for the " 546 "current element (we are peeking at).\n"));
550 elem_tail_block = element->
msg();
553 "Start w/tail block == element->msg().\n"));
555 while (elem_tail_block->
cont() != 0) {
557 "Set tail block to next in chain.\n"));
558 elem_tail_block = elem_tail_block->
cont();
562 "Done finding tail block.\n"));
567 "Ok. The fully sent block is *not* a " 568 "duplicate of the tail block of the element " 569 "at the front of the packet elems_.\n"));
572 "Thus, we have not completely sent the " 579 "We can release the fully_sent_block now.\n"));
588 "Only part of the block at the front of pkt_chain_ " 592 "Advance the rd_ptr() of the front block (of pkt_chain_) " 593 "by the num_bytes_left (%d).\n", num_bytes_left));
600 "And since the packet header block has already been " 601 "completely sent, add num_bytes_left to the " 602 "num_non_header_bytes_sent.\n"));
605 "Before, num_non_header_bytes_sent == %d.\n",
606 num_non_header_bytes_sent));
612 num_non_header_bytes_sent += num_bytes_left;
615 "After, num_non_header_bytes_sent == %d.\n",
616 num_non_header_bytes_sent));
620 "Set the num_bytes_left to 0 now.\n"));
628 "The 'num_bytes_left' loop has completed.\n"));
631 "Adjust the header_.length_ to account for the " 632 "num_non_header_bytes_sent.\n"));
634 "Before, header_.length_ == %d.\n",
639 header_.
length_ -=
static_cast<ACE_UINT32
>(num_non_header_bytes_sent);
642 "After, header_.length_ == %d.\n",
649 "Adjustments all done. Returning [%d]. 0 means entire packet " 650 "has been sent. 1 means otherwise.\n",
659 DBG_ENTRY_LVL(
"TransportSendStrategy",
"send_delayed_notifications",6);
665 size_t num_delayed_notifications = 0;
666 bool found_element =
false;
671 num_delayed_notifications = delayed_delivered_notification_queue_.size();
673 if (num_delayed_notifications == 0) {
676 }
else if (num_delayed_notifications == 1) {
679 if (!match || match->
matches(*delayed_delivered_notification_queue_[0].first)) {
680 found_element =
true;
681 sample = delayed_delivered_notification_queue_[0].first;
682 mode = delayed_delivered_notification_queue_[0].second;
684 delayed_delivered_notification_queue_.clear();
689 for (iter = delayed_delivered_notification_queue_.begin(); iter != delayed_delivered_notification_queue_.end(); ) {
690 sample = iter->first;
692 if (!match || match->
matches(*sample)) {
693 found_element =
true;
694 samples.push_back(*iter);
695 iter = delayed_delivered_notification_queue_.erase(iter);
703 if (!found_element) {
707 bool transport_shutdown =
true;
713 if (num_delayed_notifications == 1) {
726 for (
size_t i = 0; i < samples.size(); ++i) {
728 if (!transport_shutdown || samples[i].first->owned_by_transport()) {
729 samples[i].first->data_dropped(
true);
732 if (!transport_shutdown || samples[i].first->owned_by_transport()) {
733 samples[i].first->data_delivered();
747 bool reset_flag =
true;
758 "It was already terminated non gracefully, will not set to graceful disconnecting\n"));
763 VDBG((
LM_DEBUG,
"(%P|%t) DBG: Now flip to MODE_TERMINATED\n"));
800 "The adjustment logic says that the packet is cleared.\n"));
804 "The adjustment returned partial sent.\n"));
843 size_t header_chunks(1);
865 if (
synch_->register_worker(*
this) == -1) {
868 "(%P|%t) ERROR: TransportSendStrategy failed to register " 869 "as a worker with the ThreadSynch object.\n"),
886 synch_->unregister_worker();
898 ACE_TEXT(
"(%P|%t) WARNING: TransportSendStrategy::stop() - ")
899 ACE_TEXT(
"terminating with %d unsent bytes.\n"),
908 ACE_TEXT(
"(%P|%t) WARNING: TransportSendStrategy::stop() - ")
909 ACE_TEXT(
"terminating with %d unsent elements.\n"),
916 ACE_TEXT(
"(%P|%t) WARNING: TransportSendStrategy::stop() - ")
917 ACE_TEXT(
"terminating with %d queued elements.\n"),
939 ACE_TEXT(
"(%P|%t) TransportSendStrategy::send() [%d] - ")
940 ACE_TEXT(
"sending data at 0x%x.\n"),
955 "TransportSendStrategy::send: mode is MODE_TERMINATED and not in " 956 "graceful disconnecting, so discard message.\n"));
965 "Send element msg() has total_length() == [%d].\n",
969 "max_header_size_ == [%d].\n",
973 "max_size_ == [%d].\n",
983 if (max_msg_size == 0 &&
986 "(%P|%t) ERROR: Element too large (%Q) " 987 "- won't fit into packet.\n",
ACE_UINT64(element_length)));
994 "mode_ == %C, so queue elem and leave.\n",
1000 synch_->work_available();
1007 "mode_ == MODE_DIRECT.\n"));
1034 "The element %C require an exclusive packet.\n",
1035 (exclusive ?
"DOES" :
"does NOT")
1038 const size_t space_needed =
1047 "Element won't fit in current packet or requires exclusive" 1048 " - send current packet (directly) now.\n"));
1051 "max_header_size_: %d, header_.length_: %d, element_length: %d\n" 1055 "Tot possible length: %d, max_len: %d\n" 1059 "current elem size: %d\n" 1079 "We experienced backpressure on that direct send, as " 1080 "the mode_ is now MODE_QUEUE or MODE_SUSPEND. " 1081 "Queue elem and leave.\n"), 5);
1083 synch_->work_available();
1090 bool first_pkt =
true;
1092 (first_pkt || next_fragment)
1096 if (next_fragment) {
1097 element = next_fragment;
1105 if (element_length > avail) {
1106 VDBG_LVL((
LM_TRACE,
"(%P|%t) DBG: Fragmenting %B > %B\n", element_length, avail), 0);
1110 "Element Fragmentation Failed\n"));
1115 next_fragment = ep.second;
1117 }
else if (next_fragment) {
1127 "Start the 'append elem' to current packet logic.\n"));
1130 "Put element into current packet elems_.\n"));
1140 "Before, the header_.length_ == [%d].\n",
1148 "After adding element's length, the header_.length_ == [%d].\n",
1169 "Now the current packet looks full - send it (directly).\n"));
1176 synch_->work_available();
1179 next_fragment->data_dropped(
true );
1183 synch_->work_available();
1187 "Back from the direct_send() attempt.\n"));
1190 "And we %C as a result of the direct_send() call.\n",
1192 :
"stayed in MODE_DIRECT")));
1196 "Packet not sent. Send conditions weren't satisfied.\n"));
1198 "elems_.size(): %d, max_samples_: %d\n",
1201 "header_size_: %d, optimum_size_: %d\n",
1205 "element_requires_exclusive_packet: %d\n",
int(exclusive)));
1209 "We flipped into MODE_QUEUE.\n"));
1213 "We stayed in MODE_DIRECT.\n"));
1236 "(%P|%t) ERROR: Received unexpected send_stop() call.\n"), 5);
1251 "TransportSendStrategy::send_stop: dont try to send current packet " 1252 "since mode is MODE_TERMINATED and not in graceful disconnecting.\n"));
1257 "This is an 'important' send_stop() event since our " 1258 "start_counter_ is 0.\n"));
1274 "But since we are in %C, we don't have to do " 1275 "anything more in this important send_stop().\n",
1283 "We are in MODE_DIRECT in an important send_stop() - " 1284 "header_.length_ == [%d].\n", header_length));
1288 if ((header_length > 0) &&
1292 "There is something in the current packet - attempt to send " 1293 "it (directly) now.\n"));
1297 "Back from the attempt to send leftover packet directly.\n"));
1300 "But we %C as a result.\n",
1302 "stayed in MODE_DIRECT" )));
1305 "Notify Synch thread of work availability\n"));
1306 synch_->work_available();
1365 DBG_ENTRY_LVL(
"TransportSendStrategy",
"do_remove_sample", 6);
1378 "The mode is MODE_DIRECT, or the queue is empty and no " 1379 "transport packet is in progress.\n"));
1391 "Failed to find the sample to remove.\n"));
1394 if (criteria.
unique() || !remove_all)
return status;
1398 "Visit the queue_ with the RemoveElementVisitor.\n"));
1407 "The sample was removed from the queue_.\n"));
1411 if (criteria.
unique() || !remove_all)
return status;
1416 "The RemoveElementVisitor encountered a fatal error in queue_.\n"));
1424 "The RemoveElementVisitor did not find the sample in queue_.\n"));
1435 "Visit our elems_ with the PacketRemoveVisitor.\n"));
1446 status = pac_rem_vis.
status();
1450 "The PacketRemoveVisitor encountered a fatal error.\n"));
1454 "The PacketRemoveVisitor didn't find the sample.\n"));
1458 "The PacketRemoveVisitor found the sample and removed it.\n"));
1470 "Prepare the current packet for a direct send attempt.\n"));
1476 "Now attempt to send the packet.\n"));
1485 "The outcome of the send_packet() was %d.\n", outcome));
1490 "The outcome of the send_packet() was either " 1491 "OUTCOME_BACKPRESSURE or OUTCOME_PARTIAL_SEND.\n"), 5);
1494 "Flip into the MODE_QUEUE mode_.\n"), 5);
1503 "(%P|%t) WARNING: Problem detected in " 1504 "send buffer management: %p.\n",
1515 "The outcome of the send_packet() was " 1516 "OUTCOME_PEER_LOST.\n"));
1520 "Now flip to MODE_SUSPEND before we try to reconnect.\n"), 5);
1528 bool do_suspend =
false;
1533 "The reconnect has not done yet and we are " 1534 "still in MODE_SUSPEND.\n"), 5);
1538 "Reconnect failed, we are in MODE_TERMINATED\n"), 5);
1542 "Try send the packet again since the connection " 1543 "is re-established.\n"), 5);
1552 "The outcome of the send_packet() must have been " 1553 "OUTCOME_COMPLETE_SEND.\n"));
1555 "So, we will just stay in MODE_DIRECT.\n"));
1567 DBG_ENTRY_LVL(
"TransportSendStrategy",
"get_packet_elems_from_queue", 6);
1573 size_t element_length = element->msg()->total_length();
1576 const bool exclusive_packet = element->requires_exclusive_packet();
1581 if (element_length > avail) {
1586 const TqePair ep = element->fragment(avail);
1588 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: TransportSendStrategy::get_packet_elems_from_queue: " 1589 "Element Fragmentation Failed\n"));
1593 element_length = element->msg()->total_length();
1605 || !exclusive_packet) {
1624 if (exclusive_packet || frag
1639 DBG_ENTRY_LVL(
"TransportSendStrategy",
"prepare_header", 6);
1652 DBG_ENTRY_LVL(
"TransportSendStrategy",
"prepare_header_i",6);
1660 DBG_ENTRY_LVL(
"TransportSendStrategy",
"prepare_packet", 6);
1666 "Marshal the packet header.\n"));
1691 "Use a BuildChainVisitor to visit the packet elems_.\n"));
1700 "Attach the visitor's chain of blocks to the lone (packet " 1701 "header) block currently in the pkt_chain_.\n"));
1707 "Increment header sequence for next packet.\n"));
1714 "Set the header_complete_ flag to false.\n"));
1731 DBG_ENTRY_LVL(
"TransportSendStrategy",
"prepare_packet_i",6);
1747 ACE_TEXT(
"(%P|%t) TransportSendStrategy::do_send_packet() [%d] - ")
1748 ACE_TEXT(
"sending data at 0x%x.\n"),
1751 DBG_ENTRY_LVL(
"TransportSendStrategy",
"do_send_packet", 6);
1753 #ifdef OPENDDS_SECURITY 1756 const DDS::Security::CryptoTransform_var crypto =
security_config()->get_crypto_transform();
1762 VDBG((
LM_DEBUG,
"(%P|%t) DBG: pre_send_packet returned NULL, dropping.\n"));
1770 "Populate the iovec array using the packet.\n"), 5);
1774 #ifdef OPENDDS_SECURITY 1775 const int num_blocks =
mb_to_iov(substitute ? *substitute : *packet, iov);
1777 const int num_blocks =
mb_to_iov(*packet, iov);
1781 "There are [%d] number of entries in the iovec array.\n",
1785 "Attempt to send_bytes() now.\n"), 5);
1790 "The send_bytes() said that num_bytes_sent == [%d].\n",
1791 num_bytes_sent), 5);
1793 #ifdef OPENDDS_SECURITY 1799 return packet->total_length();
1803 return num_bytes_sent;
1812 const ssize_t num_bytes_sent =
1815 if (num_bytes_sent == 0) {
1817 "Since num_bytes_sent == 0, return OUTCOME_PEER_LOST.\n"), 5);
1822 if (num_bytes_sent < 0) {
1824 "Since num_bytes_sent < 0, check the backpressure flag.\n"), 5);
1829 "Since backpressure flag is true, return " 1830 "OUTCOME_BACKPRESSURE.\n"), 5);
1836 "Since backpressure flag is false, return " 1837 "OUTCOME_SEND_ERROR.\n"), 5);
1856 "Since num_bytes_sent > 0, adjust the packet to account for " 1857 "the bytes that did get sent.\n"),5);
1866 "The adjustment logic says that the complete packet was " 1867 "sent. Return OUTCOME_COMPLETE_SEND.\n"));
1872 "The adjustment logic says that only a part of the packet was " 1873 "sent. Return OUTCOME_PARTIAL_SEND.\n"));
1884 if (handle == ACE_INVALID_HANDLE)
1900 "Backpressure encountered.\n"));
1905 VDBG_LVL((
LM_ERROR,
"(%P|%t) TransportSendStrategy::send_bytes: ERROR: %p iovec count: %d\n",
1910 for (
int ii = 0; ii < n; ii++) {
1911 ACE_DEBUG((
LM_DEBUG,
"(%P|%t) send_bytes: iov[%d].iov_len = %d .iov_base =%X\n",
1912 ii, iov[ii].iov_len, iov[ii].iov_base));
1918 "The sendv() returned [%d].\n", result), 5);
1929 size_t size = delayed_delivered_notification_queue_.size();
1932 "(%P|%t) Transport send strategy notification queue threshold, size=%d\n",
1937 delayed_delivered_notification_queue_.push_back(std::make_pair(element,
mode_.
load()));
1956 return std::min(static_cast<size_t>(
max_size_), max_msg) - used;
1971 #pragma warning(push) 1974 #pragma warning(disable : 4267) 1978 block = block->cont()) {
1979 iov[num_blocks].iov_len = block->length();
1980 iov[num_blocks++].iov_base = block->rd_ptr();
1983 #pragma warning(pop) 1994 const size_t esize = e->msg()->total_length();
1995 if (esize > space) {
1996 VDBG_LVL((
LM_DEBUG,
"(%P|%t) TransportSendStrategy::fragmentation_helper: " 1997 "message size %B > space %B: Fragmenting\n", esize, space), 0);
1998 const TqePair pair = e->fragment(space);
2000 ACE_ERROR((
LM_ERROR,
"(%P|%t) ERROR: TransportSendStrategy::fragmentation_helper: " 2001 "Element Fragmentation Failed\n"));
2004 elements_to_send.push_back(pair.first);
2007 elements_to_send.push_back(e);
SendPacketOutcome send_packet()
void get_packet_elems_from_queue()
DataBlockAllocator replaced_element_db_allocator_
void set_graceful_disconnecting(bool flag)
Set graceful disconnecting flag.
RemoveResult status() const
ACE_UINT32 optimum_packet_size_
Optimum size (in bytes) of a packet (packet header + sample(s)).
void direct_send(bool relink)
static const ACE_Time_Value max_time
WeakRcHandle< TransportImpl > transport_
std::pair< TransportQueueElement *, SendMode > TQESendModePair
Used for delayed notifications when performing work.
ACE_Message_Block * header_block_
Current transport packet header, marshalled.
Cached_Allocator_With_Overflow< ACE_Data_Block, RECEIVE_SYNCH > TransportDataBlockAllocator
SendMode mode_before_suspend_
int adjust_packet_after_send(ssize_t num_bytes_sent)
std::pair< TransportQueueElement *, TransportQueueElement * > TqePair
virtual ACE_HANDLE get_handle()
size_t length(void) const
SequenceNumber header_sequence_
Current transport header sequence number.
ACE_Message_Block * pkt_chain_
virtual void relink(bool do_suspend=true)
virtual ACE_Message_Block * pre_send_packet(const ACE_Message_Block *m)
SendMode mode() const
Access the current sending mode.
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
const GUID_t GUID_UNKNOWN
Nil value for GUID.
bool data_dropped(bool dropped_by_transport=false)
ssize_t do_send_packet(const ACE_Message_Block *packet, int &bp)
Form an IOV and call the send_bytes() template method.
GUID_t get_pub_id() const
void swap(BasicQueue &other)
Holds and distributes locks to be used for locking ACE_Data_Blocks. Currently it does not require ret...
virtual RemoveResult do_remove_sample(const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
Implement framework chain visitations to remove a sample.
DataBlockLock * get_lock()
TransportSendBuffer * send_buffer_
#define ACE_NEW_MALLOC(POINTER, ALLOCATOR, CONSTRUCTOR)
unique_ptr< TransportMessageBlockAllocator > header_mb_allocator_
Allocator for header data block.
virtual ~TransportSendStrategy()
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
ACE_Guard< ACE_Thread_Mutex > lock_
TransportSendStrategy(std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
virtual ssize_t send_bytes_i(const iovec iov[], int n)=0
char * rd_ptr(void) const
virtual short thread_priority() const
Access the mapped thread priority value.
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
Atomic< SendMode > mode_
This mode determines how send() calls will be handled.
void record_and_set_non_blocking_mode(ACE_HANDLE handle, int &val)
virtual void prepare_header_i()
Specific implementation processing of prepared packet header.
void bind(TransportSendStrategy *strategy)
virtual void retain_all(const GUID_t &pub_id)
size_t max_samples_per_packet_
Max number of samples that should ever be in a single packet.
TransportHeader header_
Current transport packet header.
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
void remove_all_msgs(const GUID_t &pub_id)
virtual bool matches(const TransportQueueElement &candidate) const =0
RemoveResult remove_sample(const DataSampleElement *sample)
unique_ptr< DataAllocator > header_data_allocator_
DataSample * get_sample() const
int removed_bytes() const
RemoveResult
used by DataLink::remove_sample(), TransportSendStrategy, *RemoveVisitor
void send(TransportQueueElement *element, bool relink=true)
size_t max_samples_
Configuration - max number of samples per transport packet.
ACE_UINT32 optimum_size_
Configuration - optimum transport packet size (bytes)
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
bool graceful_disconnecting_
virtual bool requires_exclusive_packet() const
Does the sample require an exclusive transport packet?
void dump()
Diagnostic aid.
bool is_shut_down() const
virtual void insert(SequenceNumber sequence, TransportSendStrategy::QueueType *queue, ACE_Message_Block *chain)=0
virtual ACE_Message_Block * release(void)
ACE_Data_Block * data_block(void) const
virtual void add_delayed_notification(TransportQueueElement *element)
virtual size_t max_message_size() const
ACE_Message_Block * cont(void) const
virtual ACE_Message_Block * duplicate(void) const
size_t size() const
Accessor for the current number of elements in the queue.
const TqePair null_tqe_pair
virtual bool owned_by_transport()=0
Is the sample created by the transport?
virtual void prepare_packet_i()
Specific implementation processing of prepared packet.
size_t total_length(void) const
virtual void terminate_send_if_suspended()
#define NUM_REPLACED_ELEMENT_CHUNKS
virtual bool start_i()
Let the subclass start.
ACE_UINT32 max_packet_size_
Max size (in bytes) of a packet (packet header + sample(s))
bool send_delayed_notifications(const TransportQueueElement::MatchCriteria *match=0)
void send_stop(GUID_t repoId)
virtual ssize_t non_blocking_send(const iovec iov[], int n, int &bp)
virtual WorkOutcome perform_work()
void terminate_send(bool graceful_disconnecting=false)
Remove all samples in the backpressure queue and packet queue.
map TRANSPORT_PRIORITY values directly.
unsigned long long ACE_UINT64
unique_ptr< ThreadSynch > synch_
The thread synch object.
virtual Security::SecurityConfig_rch security_config() const
void accept_remove_visitor(VisitorType &visitor)
size_t max_header_size_
Maximum marshalled size of the transport packet header.
#define ACE_DEFAULT_THREAD_PRIORITY
unique_ptr< DataBlockLockPool > header_db_lock_pool_
DataBlockLockPool.
bool fragmentation_helper(TransportQueueElement *original_element, TqeVector &elements_to_send)
unique_ptr< TransportDataBlockAllocator > header_db_allocator_
Allocator for header message block.
void accept_replace_visitor(VisitorType &visitor)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
RemoveResult status() const
static const ACE_Time_Value zero
OPENDDS_VECTOR(TQESendModePair) delayed_delivered_notification_queue_
virtual ssize_t send_bytes(const iovec iov[], int n, int &bp)
virtual bool unique() const =0
ACE_UINT32 max_size_
Configuration - max transport packet size (bytes)
void restore_non_blocking_mode(ACE_HANDLE handle, int val)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
void send_buffer(TransportSendBuffer *send_buffer)
Assigns an optional send buffer.
void accept_visitor(VisitorType &visitor) const
void replace_head(T *value)
size_t current_space_available() const
#define ACE_ERROR_RETURN(X, Y)
virtual bool marshal_transport_header(ACE_Message_Block *mb)
static const char * mode_as_str(SendMode mode)
Helper function to debugging.
void deliver_ack_request(TransportQueueElement *element)
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
#define TheServiceParticipant
ACE_Message_Block * chain()
The Internal API and Implementation of OpenDDS.
size_t space_available(size_t already_used=0) const
Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > DataAllocator
Allocator for data buffers.
MessageBlockAllocator replaced_element_mb_allocator_
Cached allocator for TransportReplaceElement.
int put(T *elem)
Put a pointer to an element (T*) on to the queue.
Base wrapper class around a data/control sample to be sent.
Cached_Allocator_With_Overflow< ACE_Message_Block, RECEIVE_SYNCH > TransportMessageBlockAllocator
virtual void stop_i()=0
Let the subclass stop.
virtual TqePair fragment(size_t size)
virtual const ACE_Message_Block * msg() const =0
The marshalled sample (sample header + sample data)
TransportInst_rch config() const