OpenDDS  Snapshot(2023/04/07-19:43)
Public Member Functions | Protected Member Functions | Private Attributes | List of all members
OpenDDS::DCPS::TcpSendStrategy Class Reference

#include <TcpSendStrategy.h>

Inheritance diagram for OpenDDS::DCPS::TcpSendStrategy:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::TcpSendStrategy:
Collaboration graph
[legend]

Public Member Functions

 TcpSendStrategy (std::size_t id, TcpDataLink &link, TcpSynchResource *synch_resource, const ReactorTask_rch &task, Priority priority)
 
virtual ~TcpSendStrategy ()
 
int reset (bool reset_mode=false)
 
virtual void schedule_output ()
 Enable or disable output processing by the reactor according to mode. More...
 
virtual void terminate_send_if_suspended ()
 
- Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
virtual ~TransportSendStrategy ()
 
void send_buffer (TransportSendBuffer *send_buffer)
 Assigns an optional send buffer. More...
 
int start ()
 
void stop ()
 
void send_start ()
 
void send (TransportQueueElement *element, bool relink=true)
 
void send_stop (GUID_t repoId)
 
RemoveResult remove_sample (const DataSampleElement *sample)
 
void remove_all_msgs (const GUID_t &pub_id)
 
virtual WorkOutcome perform_work ()
 
void suspend_send ()
 
void resume_send ()
 
void terminate_send (bool graceful_disconnecting=false)
 Remove all samples in the backpressure queue and packet queue. More...
 
virtual bool start_i ()
 Let the subclass start. More...
 
void link_released (bool flag)
 
bool isDirectMode ()
 
void deliver_ack_request (TransportQueueElement *element)
 
bool fragmentation_helper (TransportQueueElement *original_element, TqeVector &elements_to_send)
 
void clear (SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
 
SendMode mode () const
 Access the current sending mode. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
virtual ~ThreadSynchWorker ()
 
std::size_t id () const
 DataLink reference value for diagnostics. More...
 
- Public Member Functions inherited from OpenDDS::DCPS::RcObject
virtual ~RcObject ()
 
virtual void _add_ref ()
 
virtual void _remove_ref ()
 
long ref_count () const
 
WeakObject_get_weak_object () const
 

Protected Member Functions

virtual ssize_t send_bytes (const iovec iov[], int n, int &bp)
 
virtual ACE_HANDLE get_handle ()
 
virtual ssize_t send_bytes_i (const iovec iov[], int n)
 
virtual void relink (bool do_suspend=true)
 
virtual void stop_i ()
 Let the subclass stop. More...
 
virtual void add_delayed_notification (TransportQueueElement *element)
 
- Protected Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
 TransportSendStrategy (std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
 
virtual ssize_t non_blocking_send (const iovec iov[], int n, int &bp)
 
virtual void prepare_header_i ()
 Specific implementation processing of prepared packet header. More...
 
virtual void prepare_packet_i ()
 Specific implementation processing of prepared packet. More...
 
TransportQueueElementcurrent_packet_first_element () const
 
virtual size_t max_message_size () const
 
void set_graceful_disconnecting (bool flag)
 Set graceful disconnecting flag. More...
 
bool send_delayed_notifications (const TransportQueueElement::MatchCriteria *match=0)
 
virtual Security::SecurityConfig_rch security_config () const
 
virtual RemoveResult do_remove_sample (const GUID_t &pub_id, const TransportQueueElement::MatchCriteria &criteria, bool remove_all=false)
 Implement framework chain visitations to remove a sample. More...
 
ThreadSynchsynch () const
 
void set_header_source (ACE_INT64 source)
 
- Protected Member Functions inherited from OpenDDS::DCPS::ThreadSynchWorker
 ThreadSynchWorker (std::size_t id=0)
 
- Protected Member Functions inherited from OpenDDS::DCPS::RcObject
 RcObject ()
 

Private Attributes

TcpDataLinklink_
 
ReactorTask_rch reactor_task_
 

Additional Inherited Members

- Public Types inherited from OpenDDS::DCPS::TransportSendStrategy
enum  SendMode {
  MODE_NOT_SET, MODE_DIRECT, MODE_QUEUE, MODE_SUSPEND,
  MODE_TERMINATED
}
 
typedef BasicQueue< TransportQueueElementQueueType
 
- Public Types inherited from OpenDDS::DCPS::ThreadSynchWorker
enum  WorkOutcome { WORK_OUTCOME_MORE_TO_DO, WORK_OUTCOME_NO_MORE_TO_DO, WORK_OUTCOME_CLOGGED_RESOURCE, WORK_OUTCOME_BROKEN_RESOURCE }
 
- Static Public Member Functions inherited from OpenDDS::DCPS::TransportSendStrategy
static int mb_to_iov (const ACE_Message_Block &msg, iovec *iov)
 
- Static Public Attributes inherited from OpenDDS::DCPS::TransportSendStrategy
static const size_t UDP_MAX_MESSAGE_SIZE = 65466
 

Detailed Description

Definition at line 25 of file TcpSendStrategy.h.

Constructor & Destructor Documentation

◆ TcpSendStrategy()

OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL OpenDDS::DCPS::TcpSendStrategy::TcpSendStrategy ( std::size_t  id,
TcpDataLink link,
TcpSynchResource synch_resource,
const ReactorTask_rch task,
Priority  priority 
)

Definition at line 21 of file TcpSendStrategy.cpp.

References DBG_ENTRY_LVL.

27  : TransportSendStrategy(id, link.impl(),
28  synch_resource, priority,
29  make_rch<ReactorSynchStrategy>(this,task->get_reactor()))
30  , link_(link)
31  , reactor_task_(task)
32 {
33  DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
34 
35 }
TransportSendStrategy(std::size_t id, const TransportImpl_rch &transport, ThreadSynchResource *synch_resource, Priority priority, const ThreadSynchStrategy_rch &thread_sync_strategy)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ ~TcpSendStrategy()

OpenDDS::DCPS::TcpSendStrategy::~TcpSendStrategy ( )
virtual

Definition at line 37 of file TcpSendStrategy.cpp.

References DBG_ENTRY_LVL.

38 {
39  DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
40 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

Member Function Documentation

◆ add_delayed_notification()

void OpenDDS::DCPS::TcpSendStrategy::add_delayed_notification ( TransportQueueElement element)
protectedvirtual

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 131 of file TcpSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendStrategy::add_delayed_notification(), and OpenDDS::DCPS::TransportQueueElement::is_request_ack().

132 {
133  if (!element->is_request_ack()) {
134  // only add the notification when we are not sending REQUEST_ACK message
136  }
137 }
virtual void add_delayed_notification(TransportQueueElement *element)

◆ get_handle()

ACE_HANDLE OpenDDS::DCPS::TcpSendStrategy::get_handle ( void  )
protectedvirtual

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 89 of file TcpSendStrategy.cpp.

References OpenDDS::DCPS::TcpDataLink::get_connection(), and link_.

Referenced by schedule_output().

90 {
91  TcpConnection_rch connection = link_.get_connection();
92 
93  if (!connection)
94  return ACE_INVALID_HANDLE;
95 
96  return connection->peer().get_handle();
97 }
RcHandle< TcpConnection > TcpConnection_rch
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22

◆ relink()

void OpenDDS::DCPS::TcpSendStrategy::relink ( bool  do_suspend = true)
protectedvirtual

Delegate to the connection object to re-establish the connection.

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 114 of file TcpSendStrategy.cpp.

References DBG_ENTRY_LVL, OpenDDS::DCPS::TcpDataLink::get_connection(), and link_.

115 {
116  DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
117  TcpConnection_rch connection = link_.get_connection();
118 
119  if (connection) {
120  connection->relink_from_send(do_suspend);
121  }
122 }
RcHandle< TcpConnection > TcpConnection_rch
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22

◆ reset()

int OpenDDS::DCPS::TcpSendStrategy::reset ( bool  reset_mode = false)

This is called by the datalink object to associate with the "new" connection object. The "old" connection object is unregistered with the reactor and the "new" connection object is registered for sending. The implementation of this method is borrowed from the ReceiveStrategy.

Definition at line 66 of file TcpSendStrategy.cpp.

References OpenDDS::DCPS::TransportSendStrategy::clear(), DBG_ENTRY_LVL, OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, and OpenDDS::DCPS::TransportSendStrategy::set_graceful_disconnecting().

Referenced by OpenDDS::DCPS::TcpDataLink::reuse_existing_connection().

67 {
68  DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
69  //For the case of a send_strategy being reused for a new connection (not reconnect)
70  //need to reset the state
71  if (reset_mode) {
72  //Need to make sure that the send mode is set back to MODE_DIRECT in case
73  //it was terminated prior to being reused/reset.
74  this->clear(MODE_DIRECT);
75  //reset graceful_disconnecting_ to initial state
76  this->set_graceful_disconnecting(false);
77  }
78  return 0;
79 }
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
void set_graceful_disconnecting(bool flag)
Set graceful disconnecting flag.
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ schedule_output()

void OpenDDS::DCPS::TcpSendStrategy::schedule_output ( )
virtual

Enable or disable output processing by the reactor according to mode.

Reimplemented from OpenDDS::DCPS::ThreadSynchWorker.

Definition at line 43 of file TcpSendStrategy.cpp.

References ACE_DEBUG, ACE_TEXT(), DBG_ENTRY_LVL, OpenDDS::DCPS::DCPS_debug_level, get_handle(), LM_DEBUG, OpenDDS::DCPS::TransportSendStrategy::mode(), OpenDDS::DCPS::TransportSendStrategy::MODE_DIRECT, OpenDDS::DCPS::TransportSendStrategy::MODE_QUEUE, OpenDDS::DCPS::TransportSendStrategy::MODE_SUSPEND, OpenDDS::DCPS::TransportSendStrategy::synch(), and OpenDDS::DCPS::ThreadSynch::work_available().

44 {
45  DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
46 
47  // Notify the reactor to adjust its processing policy according to mode_.
48  synch()->work_available();
49 
50  if (DCPS_debug_level > 4) {
51  const char* action = "";
52  if( mode() == MODE_DIRECT) {
53  action = "canceling";
54  } else if( (mode() == MODE_QUEUE)
55  || (mode() == MODE_SUSPEND)) {
56  action = "starting";
57  }
58  ACE_DEBUG((LM_DEBUG,
59  ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
60  ACE_TEXT("%C data queueing for handle %d.\n"),
61  id(),action,get_handle()));
62  }
63 }
#define ACE_DEBUG(X)
SendMode mode() const
Access the current sending mode.
virtual ACE_HANDLE get_handle()
virtual void work_available()=0
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_TEXT("TCP_Factory")
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_bytes()

ssize_t OpenDDS::DCPS::TcpSendStrategy::send_bytes ( const iovec  iov[],
int  n,
int &  bp 
)
protectedvirtual

Reimplemented from OpenDDS::DCPS::TransportSendStrategy.

Definition at line 82 of file TcpSendStrategy.cpp.

References DBG_ENTRY_LVL, and OpenDDS::DCPS::TransportSendStrategy::non_blocking_send().

83 {
84  DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
85  return this->non_blocking_send(iov, n, bp);
86 }
virtual ssize_t non_blocking_send(const iovec iov[], int n, int &bp)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ send_bytes_i()

ssize_t OpenDDS::DCPS::TcpSendStrategy::send_bytes_i ( const iovec  iov[],
int  n 
)
protectedvirtual

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 100 of file TcpSendStrategy.cpp.

References ACE_DEBUG, OpenDDS::DCPS::DCPS_debug_level, OpenDDS::DCPS::TcpDataLink::get_connection(), link_, and LM_DEBUG.

101 {
102  TcpConnection_rch connection = link_.get_connection();
103 
104  if (!connection)
105  return -1;
106  ssize_t result = connection->peer().sendv(iov, n);
107  if (DCPS_debug_level > 4)
108  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes\n", result));
109 
110  return result;
111 }
#define ACE_DEBUG(X)
int ssize_t
RcHandle< TcpConnection > TcpConnection_rch
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22

◆ stop_i()

void OpenDDS::DCPS::TcpSendStrategy::stop_i ( )
protectedvirtual

Let the subclass stop.

Implements OpenDDS::DCPS::TransportSendStrategy.

Definition at line 125 of file TcpSendStrategy.cpp.

References DBG_ENTRY_LVL.

126 {
127  DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
128 }
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68

◆ terminate_send_if_suspended()

void OpenDDS::DCPS::TcpSendStrategy::terminate_send_if_suspended ( )
virtual

Member Data Documentation

◆ link_

TcpDataLink& OpenDDS::DCPS::TcpSendStrategy::link_
private

Definition at line 58 of file TcpSendStrategy.h.

Referenced by get_handle(), relink(), and send_bytes_i().

◆ reactor_task_

ReactorTask_rch OpenDDS::DCPS::TcpSendStrategy::reactor_task_
private

Definition at line 59 of file TcpSendStrategy.h.


The documentation for this class was generated from the following files: