OpenDDS  Snapshot(2023/04/28-20:55)
TcpReceiveStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "TcpReceiveStrategy.h"
9 #include "TcpSendStrategy.h"
10 #include "TcpTransport.h"
11 #include "TcpDataLink.h"
12 #include "TcpConnection.h"
13 #include <dds/DCPS/LogAddr.h>
14 
15 #include <sstream>
16 
17 #if !defined (__ACE_INLINE__)
18 #include "TcpReceiveStrategy.inl"
19 #endif /* __ACE_INLINE__ */
20 
22 
24  TcpDataLink& link, const ReactorTask_rch& task)
25  : TransportReceiveStrategy<>(link.impl()->config())
26  , link_(link)
27  , reactor_task_(task)
28 {
29  DBG_ENTRY_LVL("TcpReceiveStrategy","TcpReceiveStrategy",6);
30 }
31 
33 {
34  DBG_ENTRY_LVL("TcpReceiveStrategy","~TcpReceiveStrategy",6);
35 }
36 
37 ssize_t
39  iovec iov[],
40  int n,
41  ACE_INET_Addr& /*remote_address*/,
42  ACE_HANDLE /*fd*/,
43  bool& /*stop*/)
44 {
45  DBG_ENTRY_LVL("TcpReceiveStrategy", "receive_bytes", 6);
46 
47  // We don't do anything to the remote_address for the Tcp case.
48  TcpConnection_rch connection = link_.get_connection();
49  if (!connection) {
50  return 0;
51  }
52 
53  return connection->peer().recvv(iov, n);
54 }
55 
56 void
59 {
60  DBG_ENTRY_LVL("TcpReceiveStrategy","deliver_sample",6);
61 
62  if (sample.header_.message_id_ == GRACEFUL_DISCONNECT) {
63  VDBG((LM_DEBUG, "(%P|%t) DBG: received GRACEFUL_DISCONNECT\n"));
64  this->gracefully_disconnected_ = true;
65  }
66  else if (sample.header_.message_id_ == REQUEST_ACK) {
67  VDBG((LM_DEBUG, "(%P|%t) DBG: received REQUEST_ACK\n"));
69  }
70  else if (sample.header_.message_id_ == SAMPLE_ACK) {
71  VDBG((LM_DEBUG, "(%P|%t) DBG: received SAMPLE_ACK\n"));
72  link_.ack_received(sample);
73  }
74  else {
75  link_.data_received(sample);
76  }
77 }
78 
79 int
81 {
82  DBG_ENTRY_LVL("TcpReceiveStrategy","start_i",6);
83 
84  TcpConnection_rch connection = link_.get_connection();
85 
86  if (DCPS_debug_level > 9) {
87  std::stringstream buffer;
88  buffer << link_;
90  ACE_TEXT("(%P|%t) TcpReceiveStrategy::start_i() - ")
91  ACE_TEXT("link:\n%C connected to %C ")
92  ACE_TEXT("registering with reactor to receive.\n"),
93  buffer.str().c_str(),
94  LogAddr(connection->get_remote_address()).c_str()));
95  }
96 
98  (connection.in(),
100  // Take back the "copy" we made.
102  "(%P|%t) ERROR: TcpReceiveStrategy::start_i TcpConnection can't register with "
103  "reactor %@ %p\n", connection.in(), ACE_TEXT("register_handler")),
104  -1);
105  }
106 
107  return 0;
108 }
109 
110 // This is called by the datalink object to associate with the "new" connection object.
111 // The "old" connection object is unregistered with the reactor and the "new" connection
112 // object is registered for receiving.
113 int
115 {
116  DBG_ENTRY_LVL("TcpReceiveStrategy","reset",6);
117  // Unregister the old handle
118  if (old_connection) {
120  (old_connection,
123  }
124 
126 
127  // Give the reactor its own "copy" of the reference to the connection object.
128 
130  (new_connection,
132  // Take back the "copy" we made.
134  "(%P|%t) ERROR: TcpReceiveStrategy::reset TcpConnection can't register with "
135  "reactor\n"),
136  -1);
137  }
138 
139  return 0;
140 }
141 
142 void
144 {
145  DBG_ENTRY_LVL("TcpReceiveStrategy","stop_i",6);
146 
148 }
149 
150 void
152 {
153  DBG_ENTRY_LVL("TcpReceiveStrategy","relink",6);
154  TcpConnection_rch connection = link_.get_connection();
155  if (connection)
156  connection->relink_from_recv(do_suspend);
157 }
158 
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
char message_id_
The enum MessageId.
void request_ack_received(const ReceivedDataSample &sample)
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
int ssize_t
TcpConnection_rch get_connection()
Definition: TcpDataLink.inl:22
virtual void stop_i()
Let the subclass stop.
virtual int start_i()
Let the subclass start.
void ack_received(const ReceivedDataSample &sample)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
LM_DEBUG
#define VDBG(DBG_ARGS)
Holds a data sample received by the transport.
ACE_TEXT("TCP_Factory")
bool gracefully_disconnected_
Flag indicates if the GRACEFUL_DISCONNECT message is received.
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
virtual void relink(bool do_suspend=true)
#define DBG_ENTRY_LVL(CNAME, MNAME, DBG_LVL)
Definition: EntryExit.h:68
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
ACE_Reactor * get_reactor()
Definition: ReactorTask.inl:14
#define ACE_ERROR_RETURN(X, Y)
TcpReceiveStrategy(TcpDataLink &link, const ReactorTask_rch &task)
LM_ERROR
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.