OpenDDS  Snapshot(2023/04/28-20:55)
TcpSendStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "TcpConnection.h"
9 #include "TcpSendStrategy.h"
10 #include "TcpTransport.h"
11 #include "TcpInst.h"
12 #include "TcpSynchResource.h"
13 #include "TcpDataLink.h"
16 #include "dds/DCPS/ReactorTask.h"
18 
20 
22  std::size_t id,
23  TcpDataLink& link,
24  TcpSynchResource* synch_resource,
25  const ReactorTask_rch& task,
26  Priority priority)
27  : TransportSendStrategy(id, link.impl(),
28  synch_resource, priority,
29  make_rch<ReactorSynchStrategy>(this,task->get_reactor()))
30  , link_(link)
31  , reactor_task_(task)
32 {
33  DBG_ENTRY_LVL("TcpSendStrategy","TcpSendStrategy",6);
34 
35 }
36 
38 {
39  DBG_ENTRY_LVL("TcpSendStrategy","~TcpSendStrategy",6);
40 }
41 
42 void
44 {
45  DBG_ENTRY_LVL("TcpSendStrategy","schedule_output",6);
46 
47  // Notify the reactor to adjust its processing policy according to mode_.
48  synch()->work_available();
49 
50  if (DCPS_debug_level > 4) {
51  const char* action = "";
52  if( mode() == MODE_DIRECT) {
53  action = "canceling";
54  } else if( (mode() == MODE_QUEUE)
55  || (mode() == MODE_SUSPEND)) {
56  action = "starting";
57  }
59  ACE_TEXT("(%P|%t) TcpSendStrategy::schedule_output() [%d] - ")
60  ACE_TEXT("%C data queueing for handle %d.\n"),
61  id(),action,get_handle()));
62  }
63 }
64 
65 int
67 {
68  DBG_ENTRY_LVL("TcpSendStrategy","reset",6);
69  //For the case of a send_strategy being reused for a new connection (not reconnect)
70  //need to reset the state
71  if (reset_mode) {
72  //Need to make sure that the send mode is set back to MODE_DIRECT in case
73  //it was terminated prior to being reused/reset.
74  this->clear(MODE_DIRECT);
75  //reset graceful_disconnecting_ to initial state
76  this->set_graceful_disconnecting(false);
77  }
78  return 0;
79 }
80 
81 ssize_t
82 OpenDDS::DCPS::TcpSendStrategy::send_bytes(const iovec iov[], int n, int& bp)
83 {
84  DBG_ENTRY_LVL("TcpSendStrategy","send_bytes",6);
85  return this->non_blocking_send(iov, n, bp);
86 }
87 
88 ACE_HANDLE
90 {
91  TcpConnection_rch connection = link_.get_connection();
92 
93  if (!connection)
94  return ACE_INVALID_HANDLE;
95 
96  return connection->peer().get_handle();
97 }
98 
99 ssize_t
101 {
102  TcpConnection_rch connection = link_.get_connection();
103 
104  if (!connection)
105  return -1;
106  ssize_t result = connection->peer().sendv(iov, n);
107  if (DCPS_debug_level > 4)
108  ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpSendStrategy::send_bytes_i sent %d bytes\n", result));
109 
110  return result;
111 }
112 
113 void
115 {
116  DBG_ENTRY_LVL("TcpSendStrategy","relink",6);
117  TcpConnection_rch connection = link_.get_connection();
118 
119  if (connection) {
120  connection->relink_from_send(do_suspend);
121  }
122 }
123 
124 void
126 {
127  DBG_ENTRY_LVL("TcpSendStrategy","stop_i",6);
128 }
129 
130 void
132 {
133  if (!element->is_request_ack()) {
134  // only add the notification when we are not sending REQUEST_ACK message
136  }
137 }
138 
139 void
141 {
142  DBG_ENTRY_LVL("TcpSendStrategy","terminate_send_if_suspended",6);
144 }
145 
#define ACE_DEBUG(X)
void set_graceful_disconnecting(bool flag)
Set graceful disconnecting flag.
SendMode mode() const
Access the current sending mode.
int reset(bool reset_mode=false)
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
int ssize_t
virtual ACE_HANDLE get_handle()
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22
void clear(SendMode new_mode, SendMode old_mode=MODE_NOT_SET)
virtual ssize_t send_bytes(const iovec iov[], int n, int &bp)
virtual ssize_t send_bytes_i(const iovec iov[], int n)
LM_DEBUG
virtual void work_available()=0
virtual void schedule_output()
Enable or disable output processing by the reactor according to mode.
virtual void add_delayed_notification(TransportQueueElement *element)
virtual void add_delayed_notification(TransportQueueElement *element)
TcpSendStrategy(std::size_t id, TcpDataLink &link, TcpSynchResource *synch_resource, const ReactorTask_rch &task, Priority priority)
virtual ssize_t non_blocking_send(const iovec iov[], int n, int &bp)
ACE_TEXT("TCP_Factory")
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
ACE_CDR::Long Priority
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
strategy to construct ReactorSynch implementations of ThreadSynch.
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
virtual void stop_i()
Let the subclass stop.
virtual void relink(bool do_suspend=true)
Base wrapper class around a data/control sample to be sent.