OpenDDS
Snapshot(2023/04/07-19:43)
|
#include <TransportSendStrategy.h>
Public Types | |
enum | SendMode { MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED } |
typedef BasicQueue< TransportQueueElement > | QueueType |
![]() | |
enum | WorkOutcome { WORK_OUTCOME_MORE_TO_DO, WORK_OUTCOME_NO_MORE_TO_DO, WORK_OUTCOME_CLOGGED_RESOURCE, WORK_OUTCOME_BROKEN_RESOURCE } |
Public Member Functions | |
virtual | ~TransportSendStrategy () |
void | send_buffer (TransportSendBuffer *send_buffer) |
Assigns an optional send buffer. More... | |
int | start () |
void | stop () |
void | send_start () |
void | send (TransportQueueElement *element, bool relink=true) |
void | send_stop (GUID_t repoId) |
RemoveResult | remove_sample (const DataSampleElement *sample) |
void | remove_all_msgs (const GUID_t &pub_id) |
virtual WorkOutcome | perform_work () |
virtual void | relink (bool do_suspend=true) |
void | suspend_send () |
void | resume_send () |
void | terminate_send (bool graceful_disconnecting=false) |
Remove all samples in the backpressure queue and packet queue. More... | |
virtual void | terminate_send_if_suspended () |
virtual void | stop_i ()=0 |
Let the subclass stop. More... | |
virtual bool | start_i () |
Let the subclass start. More... | |
void | link_released (bool flag) |
bool | isDirectMode () |
virtual ACE_HANDLE | get_handle () |
void | deliver_ack_request (TransportQueueElement *element) |
bool | fragmentation_helper (TransportQueueElement *original_element, TqeVector &elements_to_send) |
void | clear (SendMode new_mode, SendMode old_mode=MODE_NOT_SET) |
SendMode | mode () const |
Access the current sending mode. More... | |
![]() | |
virtual | ~ThreadSynchWorker () |
virtual void | schedule_output () |
Indicate that queued data is available to be sent. More... | |
std::size_t | id () const |
DataLink reference value for diagnostics. More... | |
![]() | |
virtual | ~RcObject () |
virtual void | _add_ref () |
virtual void | _remove_ref () |
long | ref_count () const |
WeakObject * | _get_weak_object () const |
Static Public Member Functions | |
static int | mb_to_iov (const ACE_Message_Block &msg, iovec *iov) |
Static Public Attributes | |
static const size_t | UDP_MAX_MESSAGE_SIZE = 65466 |
Protected Member Functions | |
TransportSendStrategy (std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy) | |
virtual ssize_t | send_bytes (const iovec iov[], int n, int &bp) |
virtual ssize_t | non_blocking_send (const iovec iov[], int n, int &bp) |
virtual ssize_t | send_bytes_i (const iovec iov[], int n)=0 |
virtual void | prepare_header_i () |
Specific implementation processing of prepared packet header. More... | |
virtual void | prepare_packet_i () |
Specific implementation processing of prepared packet. More... | |
TransportQueueElement * | current_packet_first_element () const |
virtual size_t | max_message_size () const |
void | set_graceful_disconnecting (bool flag) |
Set graceful disconnecting flag. More... | |
virtual void | add_delayed_notification (TransportQueueElement *element) |
bool | send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0) |
virtual Security::SecurityConfig_rch | security_config () const |
virtual RemoveResult | do_remove_sample (const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false) |
Implement framework chain visitations to remove a sample. More... | |
ThreadSynch * | synch () const |
void | set_header_source (ACE_INT64 source) |
![]() | |
ThreadSynchWorker (std::size_t id=0) | |
![]() | |
RcObject () | |
Private Types | |
enum | SendPacketOutcome { OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_BACKPRESSURE, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR } |
typedef ACE_SYNCH_MUTEX | LockType |
typedef ACE_Guard< LockType > | GuardType |
typedef std::pair< TransportQueueElement *, SendMode > | TQESendModePair |
Used for delayed notifications when performing work. More... | |
typedef Dynamic_Cached_Allocator_With_Overflow< ACE_Thread_Mutex > | DataAllocator |
Allocator for data buffers. More... | |
Private Member Functions | |
void | direct_send (bool relink) |
void | get_packet_elems_from_queue () |
void | prepare_header () |
void | prepare_packet () |
SendPacketOutcome | send_packet () |
ssize_t | do_send_packet (const ACE_Message_Block *packet, int &bp) |
Form an IOV and call the send_bytes() template method. More... | |
virtual ACE_Message_Block * | pre_send_packet (const ACE_Message_Block *m) |
int | adjust_packet_after_send (ssize_t num_bytes_sent) |
size_t | space_available (size_t already_used=0) const |
size_t | current_space_available () const |
virtual bool | marshal_transport_header (ACE_Message_Block *mb) |
OPENDDS_VECTOR (TQESendModePair) delayed_delivered_notification_queue_ | |
Static Private Member Functions | |
static const char * | mode_as_str (SendMode mode) |
Helper function to debugging. More... | |
Friends | |
class | TransportSendBuffer |
This class provides methods to fill packets with samples for sending and handles backpressure. It maintains the list of samples in current packets and also the list of samples queued during backpressure. A thread per connection is created to handle the queued samples.
Notes for the object ownership: 1) Owns ThreadSynch object, list of samples in current packet and list of samples in queue.
Definition at line 61 of file TransportSendStrategy.h.
|
private |
Allocator for data buffers.
Definition at line 411 of file TransportSendStrategy.h.
|
private |
Definition at line 302 of file TransportSendStrategy.h.
|
private |
Definition at line 301 of file TransportSendStrategy.h.
Definition at line 142 of file TransportSendStrategy.h.
|
private |
Used for delayed notifications when performing work.
Definition at line 398 of file TransportSendStrategy.h.
Enumerator | |
---|---|
MODE_NOT_SET | |
MODE_DIRECT | |
MODE_QUEUE | |
MODE_SUSPEND | |
MODE_TERMINATED |
Definition at line 305 of file TransportSendStrategy.h.
Enumerator | |
---|---|
OUTCOME_COMPLETE_SEND | |
OUTCOME_PARTIAL_SEND | |
OUTCOME_BACKPRESSURE | |
OUTCOME_PEER_LOST | |
OUTCOME_SEND_ERROR |
Definition at line 224 of file TransportSendStrategy.h.
|
virtual |
|
protected |
Definition at line 61 of file TransportSendStrategy.cpp.
References ACE_DEFAULT_THREAD_PRIORITY, OpenDDS::DCPS::TransportImpl::config(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportHeader::get_max_serialized_size(), max_header_size_, OpenDDS::DCPS::TransportInst::max_packet_size_, max_samples_, OpenDDS::DCPS::TransportInst::max_samples_per_packet_, max_size_, OpenDDS::DCPS::TransportInst::optimum_packet_size_, optimum_size_, synch_, TheServiceParticipant, and OpenDDS::DCPS::DirectPriorityMapper::thread_priority().
|
protectedvirtual |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, and OpenDDS::DCPS::TcpSendStrategy.
Definition at line 1926 of file TransportSendStrategy.cpp.
References ACE_DEBUG, LM_DEBUG, OpenDDS::DCPS::Atomic< T >::load(), max_samples_, mode_, and OpenDDS::DCPS::Transport_debug_level.
Referenced by OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification(), OpenDDS::DCPS::RtpsUdpSendStrategy::add_delayed_notification(), adjust_packet_after_send(), and send().
|
private |
This is called from the send_packet() method after it has sent at least one byte from the current packet. This method will update the current packet appropriately, as well as deal with all of the release()'ing of fully sent ACE_Message_Blocks, and the data_delivered() calls on the fully sent elements. Returns 0 if the entire packet was sent, and returns 1 if the entire packet was not sent.
Definition at line 363 of file TransportSendStrategy.cpp.
References ACE_DEBUG, add_delayed_notification(), ACE_Message_Block::base(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, header_complete_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_INFO, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::BasicQueue< T >::peek(), pkt_chain_, ACE_Message_Block::rd_ptr(), ACE_Message_Block::release(), and VDBG.
Referenced by clear(), and send_packet().
void OpenDDS::DCPS::TransportSendStrategy::clear | ( | SendMode | new_mode, |
SendMode | old_mode = MODE_NOT_SET |
||
) |
Clear queued messages and messages in current packet and set the current mode to new_mod if the current mode equals old_mode or old_mode is MODE_NOT_SET.
Definition at line 779 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), adjust_packet_after_send(), DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_before_suspend_, MODE_NOT_SET, pkt_chain_, queue_, send_delayed_notifications(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), ACE_Message_Block::total_length(), and VDBG.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset(), OpenDDS::DCPS::DataLink::schedule_delayed_release(), terminate_send(), and OpenDDS::DCPS::TcpSendStrategy::terminate_send_if_suspended().
|
protected |
Definition at line 153 of file TransportSendStrategy.inl.
References elems_, OPENDDS_END_VERSIONED_NAMESPACE_DECL, and OpenDDS::DCPS::BasicQueue< T >::peek().
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_bytes_i_helper().
|
private |
Like above, but use the current packet.
Definition at line 1961 of file TransportSendStrategy.cpp.
References header_, OpenDDS::DCPS::TransportHeader::length_, and space_available().
Referenced by get_packet_elems_from_queue(), and send().
void OpenDDS::DCPS::TransportSendStrategy::deliver_ack_request | ( | TransportQueueElement * | element | ) |
Definition at line 1940 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), do_remove_sample(), OpenDDS::DCPS::GUID_UNKNOWN, and lock_.
|
private |
Called from send() when it is time to attempt to send our current packet to the socket while in MODE_DIRECT mode_. If backpressure occurs, our current packet will be adjusted to account for bytes that were sent, and the mode will be changed to MODE_QUEUE. If no backpressure occurs (ie, the entire packet is sent), then our current packet will be "reset" to be an empty packet following the send.
Definition at line 1465 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::dump(), LM_DEBUG, LM_WARNING, mode_, mode_before_suspend_, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), relink(), send_packet(), transport_, OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.
Referenced by send(), and send_stop().
|
protectedvirtual |
Implement framework chain visitations to remove a sample.
Definition at line 1362 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), OpenDDS::DCPS::BasicQueue< T >::accept_replace_visitor(), DBG_ENTRY_LVL, elems_, header_, header_block_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, mode_, MODE_DIRECT, pkt_chain_, queue_, OpenDDS::DCPS::REMOVE_ERROR, OpenDDS::DCPS::REMOVE_FOUND, OpenDDS::DCPS::REMOVE_NOT_FOUND, OpenDDS::DCPS::REMOVE_RELEASED, OpenDDS::DCPS::QueueRemoveVisitor::removed_bytes(), replaced_element_db_allocator_, replaced_element_mb_allocator_, OpenDDS::DCPS::BasicQueue< T >::size(), OpenDDS::DCPS::QueueRemoveVisitor::status(), OpenDDS::DCPS::PacketRemoveVisitor::status(), OpenDDS::DCPS::TransportQueueElement::MatchCriteria::unique(), and VDBG.
Referenced by deliver_ack_request(), remove_all_msgs(), and remove_sample().
|
private |
Form an IOV and call the send_bytes() template method.
Definition at line 1743 of file TransportSendStrategy.cpp.
References ACE_DEBUG, ACE_TEXT(), ACE_Message_Block::data_block(), DBG_ENTRY_LVL, LM_DEBUG, OpenDDS::DCPS::MAX_SEND_BLOCKS, mb_to_iov(), pre_send_packet(), OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), security_config(), send_bytes(), ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TransportSendBuffer::resend_one(), and send_packet().
bool OpenDDS::DCPS::TransportSendStrategy::fragmentation_helper | ( | TransportQueueElement * | original_element, |
TqeVector & | elements_to_send | ||
) |
Alternative to TransportSendStrategy::send for fragmentation
original_element | data sample to send, may be larger than max msg size |
elements_to_send | populated by this method with either original_element or fragments created from it. Elements need to be cleaned up by the caller using data_delivered or data_dropped. |
Definition at line 1988 of file TransportSendStrategy.cpp.
References ACE_ERROR, OpenDDS::DCPS::TransportQueueElement::increment_loan(), LM_DEBUG, LM_ERROR, OpenDDS::DCPS::null_tqe_pair, OPENDDS_END_VERSIONED_NAMESPACE_DECL, space_available(), and VDBG_LVL.
|
virtual |
Implements OpenDDS::DCPS::ThreadSynchWorker.
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 140 of file TransportSendStrategy.inl.
References ACE_INLINE.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), and non_blocking_send().
|
private |
This method is used while in MODE_QUEUE mode, and a new packet needs to be formulated using elements from the queue_. This is the first step of formulating the new packet. It will extract elements from the queue_ and insert those elements into the pkt_elems_ collection.
After this step has been done, the prepare_packet() step can be performed, followed by the actual send_packet() call.
Definition at line 1565 of file TransportSendStrategy.cpp.
References ACE_ERROR, current_space_available(), DBG_ENTRY_LVL, elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::BasicQueue< T >::get(), header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, LM_TRACE, max_message_size(), max_samples_, OpenDDS::DCPS::null_tqe_pair, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::peek(), OpenDDS::DCPS::BasicQueue< T >::put(), queue_, OpenDDS::DCPS::BasicQueue< T >::replace_head(), OpenDDS::DCPS::BasicQueue< T >::size(), and VDBG_LVL.
Referenced by perform_work().
ACE_INLINE bool OpenDDS::DCPS::TransportSendStrategy::isDirectMode | ( | ) |
Definition at line 128 of file TransportSendStrategy.inl.
References ACE_INLINE, mode_, and MODE_DIRECT.
Referenced by OpenDDS::DCPS::DataLink::resume_send().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::link_released | ( | bool | flag | ) |
Definition at line 49 of file TransportSendStrategy.inl.
References ACE_INLINE, DBG_ENTRY_LVL, link_released_, and lock_.
Referenced by OpenDDS::DCPS::TcpDataLink::do_association_actions(), OpenDDS::DCPS::DataLink::make_reservation(), OpenDDS::DCPS::MulticastSendStrategy::MulticastSendStrategy(), and OpenDDS::DCPS::ShmemDataLink::send_association_msg().
|
privatevirtual |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 1723 of file TransportSendStrategy.cpp.
References header_.
Referenced by prepare_packet().
|
protectedvirtual |
The maximum size of a message allowed by the this TransportImpl, or 0 if there is no such limit. This is expected to be a constant, for example UDP/IPv4 can send messages of up to 65466 bytes. The transport framework will use the returned value (if > 0) to fragment larger messages. This fragmentation and reassembly will be transparent to the user.
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::MulticastSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Definition at line 147 of file TransportSendStrategy.inl.
References ACE_INLINE.
Referenced by get_packet_elems_from_queue(), send(), and space_available().
|
static |
Convert ACE_Message_Block chain into iovec[] entries for send(), returns number of iovec[] entries used (up to MAX_SEND_BLOCKS). Precondition: iov must be an iovec[] of size MAX_SEND_BLOCKS or greater.
Definition at line 1967 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::MAX_SEND_BLOCKS.
Referenced by do_send_packet(), OpenDDS::DCPS::UdpDataLink::open(), OpenDDS::DCPS::RtpsUdpTransport::IceEndpoint::send(), and OpenDDS::DCPS::RtpsUdpSendStrategy::send_rtps_control().
ACE_INLINE TransportSendStrategy::SendMode OpenDDS::DCPS::TransportSendStrategy::mode | ( | void | ) | const |
Access the current sending mode.
Definition at line 16 of file TransportSendStrategy.inl.
References ACE_INLINE, DBG_ENTRY_LVL, and mode_.
Referenced by OpenDDS::DCPS::ScheduleOutputHandler::handle_exception(), mode_as_str(), OpenDDS::DCPS::TcpSendStrategy::schedule_output(), OpenDDS::DCPS::ScheduleOutputHandler::schedule_output(), and send_delayed_notifications().
|
staticprivate |
Helper function to debugging.
Definition at line 114 of file TransportSendStrategy.inl.
References ACE_INLINE, and mode().
Referenced by perform_work(), send(), and send_stop().
|
protectedvirtual |
Definition at line 1879 of file TransportSendStrategy.cpp.
References ACE_DEBUG, ACE_TEXT(), ENOBUFS, EWOULDBLOCK, get_handle(), LM_DEBUG, LM_ERROR, ACE::record_and_set_non_blocking_mode(), ACE::restore_non_blocking_mode(), send_bytes_i(), VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::TcpSendStrategy::send_bytes().
|
private |
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::encode_submessages(), and send_delayed_notifications().
|
virtual |
Called by our ThreadSynch object when we should be able to start sending any partial packet bytes and/or compose a new packet using elements from the queue_.
Returns 0 to indicate that the ThreadSynch object doesn't need to call perform_work() again since the queue (and any unsent packet bytes) has been drained, and the mode_ has been switched to MODE_DIRECT.
Returns 1 to indicate that there is more work to do, and the ThreadSynch object should have this perform_work() method called again.
Implements OpenDDS::DCPS::ThreadSynchWorker.
Definition at line 132 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, get_packet_elems_from_queue(), header_, OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, lock_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, prepare_packet(), queue_, relink(), send_delayed_notifications(), send_packet(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, VDBG_LVL, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_BROKEN_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_CLOGGED_RESOURCE, OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO, and OpenDDS::DCPS::ThreadSynchWorker::WORK_OUTCOME_NO_MORE_TO_DO.
|
inlineprivatevirtual |
Derived classes can override to transform the data right before it's sent. If the returned value is non-NULL it will be sent instead of sending the parameter. If the returned value is NULL the original message will be dropped.
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 274 of file TransportSendStrategy.h.
References ACE_Message_Block::duplicate().
Referenced by do_send_packet().
|
private |
This method is responsible for updating the packet header. Called exclusively by prepare_packet.
Definition at line 1637 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, header_, header_sequence_, prepare_header_i(), and OpenDDS::DCPS::TransportHeader::sequence_.
Referenced by prepare_packet().
|
protectedvirtual |
Specific implementation processing of prepared packet header.
Reimplemented in OpenDDS::DCPS::MulticastSendStrategy.
Definition at line 1650 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_header().
|
private |
This method is responsible for actually "creating" the current send packet using the packet header and the collection of packet elements that are to make-up the packet's contents.
Definition at line 1658 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_visitor(), ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, ACE_NEW_MALLOC, OpenDDS::DCPS::BuildChainVisitor::chain(), ACE_Message_Block::cont(), DBG_ENTRY_LVL, ACE_Message_Block::duplicate(), elems_, DataBlockLockPool::get_lock(), header_block_, header_complete_, header_data_allocator_, header_db_allocator_, header_db_lock_pool_, header_mb_allocator_, LM_DEBUG, marshal_transport_header(), max_header_size_, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, pkt_chain_, prepare_header(), prepare_packet_i(), ACE_Message_Block::release(), VDBG, and ACE_Time_Value::zero.
Referenced by direct_send(), and perform_work().
|
protectedvirtual |
Specific implementation processing of prepared packet.
Definition at line 1729 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL.
Referenced by prepare_packet().
|
virtual |
The subclass needs to provide the implementation for re-establishing the datalink. This is called when send returns an error.
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 58 of file TransportSendStrategy.inl.
References ACE_INLINE, and DBG_ENTRY_LVL.
Referenced by direct_send(), and perform_work().
void OpenDDS::DCPS::TransportSendStrategy::remove_all_msgs | ( | const GUID_t & | pub_id | ) |
Definition at line 1315 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, do_remove_sample(), lock_, OpenDDS::DCPS::TransportSendBuffer::retain_all(), send_buffer_, and send_delayed_notifications().
Referenced by OpenDDS::DCPS::RtpsUdpDataLink::client_stop(), OpenDDS::DCPS::DataLink::remove_all_msgs(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_all_msgs().
RemoveResult OpenDDS::DCPS::TransportSendStrategy::remove_sample | ( | const DataSampleElement * | sample | ) |
Our DataLink has been requested by some particular TransportClient to remove the supplied sample (basically, an "unsend" attempt) from this strategy object.
Definition at line 1334 of file TransportSendStrategy.cpp.
References ACE_Message_Block::cont(), DBG_ENTRY_LVL, do_remove_sample(), OpenDDS::DCPS::DataSampleElement::get_pub_id(), OpenDDS::DCPS::DataSampleElement::get_sample(), LM_DEBUG, lock_, ACE_Message_Block::rd_ptr(), OpenDDS::DCPS::REMOVE_RELEASED, send_delayed_notifications(), and VDBG_LVL.
Referenced by OpenDDS::DCPS::DataLink::remove_sample(), and OpenDDS::DCPS::RtpsUdpDataLink::RtpsWriter::remove_sample().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::resume_send | ( | ) |
This is called when connection is lost and reconnect succeeds. The send mode is set to the mode before suspend which is either MODE_QUEUE or MODE_DIRECT.
Definition at line 78 of file TransportSendStrategy.inl.
References ACE_ERROR, ACE_INLINE, DBG_ENTRY_LVL, elems_, header_, header_complete_, OpenDDS::DCPS::TransportHeader::length_, LM_ERROR, lock_, mode_, mode_before_suspend_, MODE_DIRECT, MODE_NOT_SET, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, pkt_chain_, queue_, OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, OpenDDS::DCPS::BasicQueue< T >::swap(), and synch_.
Referenced by OpenDDS::DCPS::DataLink::resume_send().
|
inlineprotectedvirtual |
Reimplemented in OpenDDS::DCPS::RtpsUdpSendStrategy.
Definition at line 219 of file TransportSendStrategy.h.
Referenced by do_send_packet().
void OpenDDS::DCPS::TransportSendStrategy::send | ( | TransportQueueElement * | element, |
bool | relink = true |
||
) |
Our DataLink has been requested by some particular TransportClient to send the element.
Definition at line 935 of file TransportSendStrategy.cpp.
References ACE_DEBUG, ACE_ERROR, ACE_TEXT(), add_delayed_notification(), current_space_available(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, direct_send(), elems_, OpenDDS::DCPS::TransportHeader::first_fragment_, OpenDDS::DCPS::TransportQueueElement::fragment(), OpenDDS::DCPS::DataSampleHeader::get_max_serialized_size(), graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::last_fragment_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, LM_TRACE, lock_, max_header_size_, max_message_size(), max_samples_, max_size_, mode_, mode_as_str(), MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, OpenDDS::DCPS::TransportQueueElement::msg(), OpenDDS::DCPS::null_tqe_pair, optimum_size_, OpenDDS::DCPS::BasicQueue< T >::put(), queue_, ACE_Guard< ACE_LOCK >::release(), OpenDDS::DCPS::TransportQueueElement::requires_exclusive_packet(), send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), synch_, ACE_Message_Block::total_length(), OpenDDS::DCPS::Transport_debug_level, VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::DataLink::send_i().
void OpenDDS::DCPS::TransportSendStrategy::send_buffer | ( | TransportSendBuffer * | send_buffer | ) |
Assigns an optional send buffer.
Definition at line 122 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportSendBuffer::bind(), and send_buffer_.
Referenced by OpenDDS::DCPS::MulticastDataLink::MulticastDataLink(), and OpenDDS::DCPS::MulticastDataLink::~MulticastDataLink().
|
protectedvirtual |
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 134 of file TransportSendStrategy.inl.
References ACE_INLINE, and send_bytes_i().
Referenced by do_send_packet().
|
protectedpure virtual |
Implemented in OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::TcpSendStrategy, OpenDDS::DCPS::MulticastSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by non_blocking_send(), and send_bytes().
|
protected |
If delayed notifications were queued up, issue those callbacks here. The default match is "match all", otherwise match can be used to specify either a certain individual packet or a publication id. Returns true if anything in the delayed notification list matched.
Definition at line 657 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::TransportQueueElement::data_delivered(), OpenDDS::DCPS::TransportQueueElement::data_dropped(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportImpl::is_shut_down(), lock_, OpenDDS::DCPS::TransportQueueElement::MatchCriteria::matches(), mode(), MODE_NOT_SET, MODE_TERMINATED, OPENDDS_VECTOR(), OpenDDS::DCPS::TransportQueueElement::owned_by_transport(), and transport_.
Referenced by clear(), perform_work(), remove_all_msgs(), remove_sample(), send(), and send_stop().
|
private |
This is called to send the current packet. The current packet will either be a "partially sent" packet, or a packet that has just been prepared via a call to prepare_packet().
Definition at line 1807 of file TransportSendStrategy.cpp.
References adjust_packet_after_send(), DBG_ENTRY_LVL, do_send_packet(), elems_, header_, OpenDDS::DCPS::TransportSendBuffer::insert(), LM_DEBUG, OUTCOME_BACKPRESSURE, OUTCOME_COMPLETE_SEND, OUTCOME_PARTIAL_SEND, OUTCOME_PEER_LOST, OUTCOME_SEND_ERROR, pkt_chain_, send_buffer_, OpenDDS::DCPS::TransportHeader::sequence_, VDBG, and VDBG_LVL.
Referenced by direct_send(), and perform_work().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::send_start | ( | ) |
Invoked prior to one or more send() invocations from a particular TransportClient.
Definition at line 38 of file TransportSendStrategy.inl.
References ACE_INLINE, DBG_ENTRY_LVL, link_released_, lock_, and start_counter_.
Referenced by OpenDDS::DCPS::DataLink::send_start_i().
void OpenDDS::DCPS::TransportSendStrategy::send_stop | ( | GUID_t | repoId | ) |
Invoked after one or more send() invocations from a particular TransportClient.
Definition at line 1224 of file TransportSendStrategy.cpp.
References DBG_ENTRY_LVL, direct_send(), elems_, graceful_disconnecting_, header_, OpenDDS::DCPS::TransportHeader::length_, link_released_, LM_DEBUG, LM_ERROR, lock_, mode_, mode_as_str(), MODE_QUEUE, MODE_SUSPEND, MODE_TERMINATED, send_delayed_notifications(), OpenDDS::DCPS::BasicQueue< T >::size(), start_counter_, synch_, VDBG, and VDBG_LVL.
Referenced by OpenDDS::DCPS::DataLink::send_stop_i().
|
protected |
Set graceful disconnecting flag.
Definition at line 1737 of file TransportSendStrategy.cpp.
References graceful_disconnecting_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::reset().
|
protected |
Definition at line 32 of file TransportSendStrategy.inl.
References ACE_INLINE, header_, and OpenDDS::DCPS::TransportHeader::source_.
Referenced by OpenDDS::DCPS::MulticastSendStrategy::prepare_header_i().
|
private |
How much space is available in packet with a given used space before we reach one of the limits: max_message_size() [transport's inherent limitation] or max_size_ [user's configured limit]
Definition at line 1951 of file TransportSendStrategy.cpp.
References max_header_size_, max_message_size(), and max_size_.
Referenced by current_space_available(), and fragmentation_helper().
int OpenDDS::DCPS::TransportSendStrategy::start | ( | void | ) |
Start the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object has established a connection.
Definition at line 831 of file TransportSendStrategy.cpp.
References ACE_ERROR_RETURN, OpenDDS::DCPS::TransportSendBuffer::capacity(), DBG_ENTRY_LVL, header_data_allocator_, header_db_allocator_, header_db_lock_pool_, header_mb_allocator_, LM_ERROR, lock_, max_header_size_, OpenDDS::DCPS::unique_ptr< T, Deleter >::reset(), send_buffer_, start_i(), synch_, and TheServiceParticipant.
Referenced by OpenDDS::DCPS::ShmemSendStrategy::send_bytes_i(), and OpenDDS::DCPS::DataLink::start().
|
inlinevirtual |
Let the subclass start.
Reimplemented in OpenDDS::DCPS::ShmemSendStrategy.
Definition at line 136 of file TransportSendStrategy.h.
Referenced by start().
void OpenDDS::DCPS::TransportSendStrategy::stop | ( | void | ) |
Stop the TransportSendStrategy. This happens once, when the DataLink that "owns" this strategy object is going away.
Definition at line 877 of file TransportSendStrategy.cpp.
References OpenDDS::DCPS::BasicQueue< T >::accept_remove_visitor(), ACE_DEBUG, ACE_TEXT(), DBG_ENTRY_LVL, elems_, header_block_, LM_WARNING, lock_, pkt_chain_, queue_, ACE_Message_Block::release(), OpenDDS::DCPS::BasicQueue< T >::size(), stop_i(), OpenDDS::DCPS::BasicQueue< T >::swap(), synch_, and ACE_Message_Block::total_length().
Referenced by OpenDDS::DCPS::DataLink::start(), and OpenDDS::DCPS::DataLink::stop().
|
pure virtual |
Let the subclass stop.
Implemented in OpenDDS::DCPS::TcpSendStrategy, OpenDDS::DCPS::RtpsUdpSendStrategy, OpenDDS::DCPS::ShmemSendStrategy, OpenDDS::DCPS::MulticastSendStrategy, and OpenDDS::DCPS::UdpSendStrategy.
Referenced by stop().
ACE_INLINE void OpenDDS::DCPS::TransportSendStrategy::suspend_send | ( | ) |
This is called when first time reconnect is attempted. The send mode is set to MODE_SUSPEND. Messages are queued at this state.
Definition at line 66 of file TransportSendStrategy.inl.
References ACE_INLINE, DBG_ENTRY_LVL, lock_, mode_, mode_before_suspend_, MODE_SUSPEND, and MODE_TERMINATED.
|
protected |
Definition at line 24 of file TransportSendStrategy.inl.
References ACE_INLINE, DBG_ENTRY_LVL, and synch_.
Referenced by OpenDDS::DCPS::TcpSendStrategy::schedule_output().
void OpenDDS::DCPS::TransportSendStrategy::terminate_send | ( | bool | graceful_disconnecting = false | ) |
Remove all samples in the backpressure queue and packet queue.
This is called whenver the connection is lost and reconnect fails. It removes all samples in the backpressure queue and packet queue.
Definition at line 743 of file TransportSendStrategy.cpp.
References clear(), DBG_ENTRY_LVL, graceful_disconnecting_, LM_DEBUG, lock_, mode_, MODE_SUSPEND, MODE_TERMINATED, and VDBG.
Referenced by OpenDDS::DCPS::TcpDataLink::send_graceful_disconnect_message(), and OpenDDS::DCPS::DataLink::terminate_send().
|
virtual |
Reimplemented in OpenDDS::DCPS::TcpSendStrategy.
Definition at line 774 of file TransportSendStrategy.cpp.
Referenced by OpenDDS::DCPS::DataLink::terminate_send_if_suspended().
|
friend |
Definition at line 436 of file TransportSendStrategy.h.
|
private |
Current elements that have contributed blocks to the current transport packet.
Definition at line 368 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), current_packet_first_element(), do_remove_sample(), get_packet_elems_from_queue(), prepare_packet(), resume_send(), send(), send_packet(), send_stop(), and stop().
|
private |
Definition at line 427 of file TransportSendStrategy.h.
Referenced by send(), send_stop(), set_graceful_disconnecting(), and terminate_send().
|
private |
Current transport packet header.
Definition at line 439 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), current_space_available(), do_remove_sample(), get_packet_elems_from_queue(), marshal_transport_header(), perform_work(), prepare_header(), resume_send(), send(), send_packet(), send_stop(), and set_header_source().
|
private |
Current transport packet header, marshalled.
Definition at line 361 of file TransportSendStrategy.h.
Referenced by do_remove_sample(), prepare_packet(), and stop().
|
private |
Set to false when the packet header hasn't been fully sent. Set to true once the packet header has been fully sent.
Definition at line 376 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), prepare_packet(), and resume_send().
|
private |
Definition at line 412 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
|
private |
Allocator for header message block.
Definition at line 405 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
|
private |
Definition at line 408 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
|
private |
Allocator for header data block.
Definition at line 402 of file TransportSendStrategy.h.
Referenced by prepare_packet(), and start().
|
private |
Current transport header sequence number.
Definition at line 364 of file TransportSendStrategy.h.
Referenced by prepare_header().
|
private |
Definition at line 429 of file TransportSendStrategy.h.
Referenced by link_released(), send(), send_start(), and send_stop().
|
private |
This lock will protect critical sections of code that play a role in the sending of data.
Definition at line 419 of file TransportSendStrategy.h.
Referenced by clear(), deliver_ack_request(), link_released(), OpenDDS::DCPS::RtpsUdpSendStrategy::marshal_transport_header(), perform_work(), remove_all_msgs(), remove_sample(), resume_send(), send(), send_delayed_notifications(), send_start(), send_stop(), start(), stop(), suspend_send(), and terminate_send().
|
private |
Maximum marshalled size of the transport packet header.
Definition at line 358 of file TransportSendStrategy.h.
Referenced by prepare_packet(), send(), space_available(), start(), and TransportSendStrategy().
|
private |
Configuration - max number of samples per transport packet.
Definition at line 342 of file TransportSendStrategy.h.
Referenced by add_delayed_notification(), get_packet_elems_from_queue(), send(), and TransportSendStrategy().
|
private |
Configuration - max transport packet size (bytes)
Definition at line 348 of file TransportSendStrategy.h.
Referenced by send(), space_available(), and TransportSendStrategy().
This mode determines how send() calls will be handled.
Definition at line 390 of file TransportSendStrategy.h.
Referenced by add_delayed_notification(), clear(), direct_send(), do_remove_sample(), isDirectMode(), mode(), perform_work(), resume_send(), send(), send_stop(), suspend_send(), and terminate_send().
|
private |
This mode remembers the mode before send is suspended and is used after the send is resumed because the connection is re-established.
Definition at line 395 of file TransportSendStrategy.h.
Referenced by clear(), direct_send(), resume_send(), and suspend_send().
|
private |
Configuration - optimum transport packet size (bytes)
Definition at line 345 of file TransportSendStrategy.h.
Referenced by get_packet_elems_from_queue(), send(), and TransportSendStrategy().
|
private |
Current (head of chain) block containing unsent bytes for the current transport packet.
Definition at line 372 of file TransportSendStrategy.h.
Referenced by adjust_packet_after_send(), clear(), do_remove_sample(), prepare_packet(), resume_send(), send_packet(), and stop().
|
private |
Used during backpressure situations to hold samples that have not yet been made to be part of a transport packet, and are completely unsent. Also used as a bucket for packets which still have to become part of a packet.
Definition at line 355 of file TransportSendStrategy.h.
Referenced by clear(), do_remove_sample(), get_packet_elems_from_queue(), perform_work(), resume_send(), send(), and stop().
|
private |
Definition at line 423 of file TransportSendStrategy.h.
Referenced by do_remove_sample().
|
private |
Cached allocator for TransportReplaceElement.
Definition at line 422 of file TransportSendStrategy.h.
Referenced by do_remove_sample().
|
private |
Definition at line 431 of file TransportSendStrategy.h.
Referenced by remove_all_msgs(), send_buffer(), send_packet(), and start().
|
private |
Counter that, when greater than zero, indicates that we still expect to receive a send_stop() event. Incremented once for each call to our send_start() method, and decremented once for each call to our send_stop() method. We only care about the transitions of the start_counter_ value from 0 to 1, and from 1 to 0. This accommodates the case where more than one TransportClient is sending to us at the same time. We use this counter to enable a "composite" send_start() and send_stop().
Definition at line 387 of file TransportSendStrategy.h.
Referenced by clear(), resume_send(), send_start(), and send_stop().
|
private |
The thread synch object.
Definition at line 415 of file TransportSendStrategy.h.
Referenced by perform_work(), resume_send(), send(), send_stop(), start(), stop(), synch(), and TransportSendStrategy().
|
private |
Definition at line 425 of file TransportSendStrategy.h.
Referenced by direct_send(), and send_delayed_notifications().
|
static |
Put the maximum UDP payload size here so that it can be shared by all UDP-based transports. This is the worst-case (conservative) value for UDP/IPv4. If there are no IP options, or if IPv6 is used, it could actually be a little larger.
Definition at line 159 of file TransportSendStrategy.h.
Referenced by OpenDDS::DCPS::RtpsUdpSendStrategy::send_single_i().