OpenDDS  Snapshot(2023/04/28-20:55)
UdpDataLink.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "UdpDataLink.h"
9 #include "UdpTransport.h"
10 #include "UdpInst.h"
11 #include "UdpSendStrategy.h"
12 #include "UdpReceiveStrategy.h"
13 
14 #include <dds/DCPS/LogAddr.h>
17 
18 #include <ace/Default_Constants.h>
19 #include <ace/Log_Msg.h>
20 
21 #ifndef __ACE_INLINE__
22 # include "UdpDataLink.inl"
23 #endif /* __ACE_INLINE__ */
24 
26 
27 namespace OpenDDS {
28 namespace DCPS {
29 
31  Priority priority,
32  const ReactorTask_rch& reactor_task,
33  bool active)
34  : DataLink(transport,
35  priority,
36  false, // is_loopback,
37  active),// is_active
38  active_(active),
39  reactor_task_(reactor_task),
40  send_strategy_(make_rch<UdpSendStrategy>(this)),
41  recv_strategy_(make_rch<UdpReceiveStrategy>(this))
42 {
43 }
44 
45 bool
47 {
49 
51  if (!cfg) {
52  return false;
53  }
54 
55  this->is_loopback_ = this->remote_address_ == cfg->local_address();
56 
57  ACE_INET_Addr local_address;
58  if (this->active_) {
59  if (local_address.get_type() != remote_address.get_type()) {
60  local_address.set(0, "", 0, remote_address.get_type());
61  }
62  } else {
63  local_address = cfg->local_address();
64  }
65 
66  if (!open_appropriate_socket_type(this->socket_, local_address)) {
68  ACE_TEXT("(%P|%t) ERROR: ")
69  ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
70  false);
71  }
72 
73  VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C\n",
74  LogAddr(local_address).c_str()));
75 
76  // If listening on "any" host/port, need to record the actual port number
77  // selected by the OS, as well as our actual hostname, into the config_
78  // object's local_address_ for use in UdpTransport::connection_info_i().
79  if (!this->active_ && cfg->local_address().is_any()) {
80  ACE_INET_Addr address;
81  if (this->socket_.get_local_addr(address) != 0) {
83  ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
84  ACE_TEXT("cannot get local addr\n")), false);
85  }
86  const unsigned short port = address.get_port_number();
87  const std::string hostname = get_fully_qualified_hostname();
89  ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
90  hostname.c_str(), port), 2);
91 
92  cfg->local_address(port, hostname.c_str());
93 
94  // Similar case to the "if" case above, but with a bound host/IP but no port
95  } else if (!this->active_ &&
96  0 == cfg->local_address().get_port_number()) {
97  ACE_INET_Addr address;
98  if (this->socket_.get_local_addr(address) != 0) {
100  ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
101  ACE_TEXT("cannot get local addr\n")), false);
102  }
103  const unsigned short port = address.get_port_number();
105  ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
106  port), 2);
107  cfg->local_address_set_port(port);
108  }
109 
110  if (cfg->send_buffer_size_ > 0) {
111  int snd_size = cfg->send_buffer_size_;
112  if (this->socket_.set_option(SOL_SOCKET,
113  SO_SNDBUF,
114  (void *) &snd_size,
115  sizeof(snd_size)) < 0
116  && errno != ENOTSUP) {
118  ACE_TEXT("(%P|%t) ERROR: ")
119  ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
120  snd_size),
121  false);
122  }
123  }
124 
125  if (cfg->rcv_buffer_size_ > 0) {
126  int rcv_size = cfg->rcv_buffer_size_;
127  if (this->socket_.set_option(SOL_SOCKET,
128  SO_RCVBUF,
129  (void *) &rcv_size,
130  sizeof(int)) < 0
131  && errno != ENOTSUP) {
133  ACE_TEXT("(%P|%t) ERROR: ")
134  ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m\n"),
135  rcv_size),
136  false);
137  }
138  }
139 
140 #ifdef ACE_WIN32
141  // By default Winsock will cause reads to fail with "connection reset"
142  // when UDP sends result in ICMP "port unreachable" messages.
143  // The transport framework is not set up for this since returning <= 0
144  // from our receive_bytes causes the framework to close down the datalink
145  // which in this case is used to receive from multiple peers.
146  BOOL recv_udp_connreset = FALSE;
147  socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
148 #endif
149 
150  if (this->active_) {
151  // Set the DiffServ codepoint according to the priority value.
153  this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
154 
155 
156  // For the active side, send the blob and wait for a 1 byte ack.
157  VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C\n",
158  LogAddr(remote_address).c_str()));
159 
161  OPENDDS_TEST_AND_CALL(TransportImpl_rch, impl(), connection_info_i(info, CONNINFO_ALL));
162  ACE_Message_Block* data_block;
163  ACE_NEW_RETURN(data_block,
164  ACE_Message_Block(info.data.length()+sizeof(Priority),
166  0, //cont
167  0, //data
168  0, //allocator_strategy
169  0, //locking_strategy
173  0,
174  0),
175  0);
176 
177  Serializer serializer(data_block, Encoding::KIND_UNALIGNED_CDR);
178  serializer << this->transport_priority();
179  serializer.write_octet_array(info.data.get_buffer(),
180  info.data.length());
181 
182  DataSampleHeader sample_header;
183  sample_header.message_id_ = TRANSPORT_CONTROL;
184  sample_header.message_length_ =
185  static_cast<ACE_UINT32>(data_block->length());
186  ACE_Message_Block* sample_header_block;
187  ACE_NEW_RETURN(sample_header_block,
190  0, //cont
191  0, //data
192  0, //allocator_strategy
193  0, //locking_strategy
197  0,
198  0),
199  0);
200  *sample_header_block << sample_header;
201  sample_header_block->cont(data_block);
202 
203  ACE_Message_Block* transport_header_block;
204  TransportHeader transport_header;
205  ACE_NEW_RETURN(transport_header_block,
208  0,
209  0,
210  0,
211  0,
215  0,
216  0),
217  0);
218 
219  transport_header.length_ =
220  static_cast<ACE_UINT32>(data_block->length() +
221  sample_header_block->length());
222  *transport_header_block << transport_header;
223  transport_header_block->cont(sample_header_block);
224 
225  iovec iov[MAX_SEND_BLOCKS];
226  const int num_blocks =
227  TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
228  const ssize_t sent = socket().send(iov, num_blocks, remote_address);
229  transport_header_block->release();
230  if (sent < 0) {
231  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
232  ACE_TEXT("failed to send handshake %m\n")),
233  false);
234  }
235 
236  // Need to wait for the 1 byte ack from the passive side before returning
237  // the link (and indicating success).
238  const size_t size = 32;
239  char buff[size];
240  // Default this timeout to 30. We may want to make this settable
241  // or use another settable timeout value here.
242  const TimeDuration tv(30);
243  const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv.value());
244  if (recvd == 1) {
245  // Expected value
247  ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
248  2);
249  } else if (recvd < 0) {
250  // Not a handshake ack, something is wrong
251  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
252  ACE_TEXT("failed to receive handshake ack %p\n"),
253  ACE_TEXT("recv")), false);
254  } else {
255  ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
256  ACE_TEXT("failed to receive handshake ack ")
257  ACE_TEXT("recv returned %b\n"), recvd),
258  false);
259  }
260  }
261 
262  if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
263  static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
264  != 0) {
265  stop_i();
267  ACE_TEXT("(%P|%t) ERROR: ")
268  ACE_TEXT("UdpDataLink::open: start failed!\n")),
269  false);
270  }
271 
272  return true;
273 }
274 
275 void
278 {
279  // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
280  // the connection handshaking, so receiving one is an indication of the
281  // passive_connection event. In the future the submessage_id_ could be used
282  // to allow different types of messages here.
283  OPENDDS_TEST_AND_CALL(UdpTransport_rch, dynamic_rchandle_cast<UdpTransport>(impl()), passive_connection(remote_address, sample));
284 }
285 
286 void
288 {
289  this->socket_.close();
290 }
291 
292 } // namespace DCPS
293 } // namespace OpenDDS
294 
UdpSendStrategy_rch send_strategy_
Definition: UdpDataLink.h:63
TransportImpl_rch impl() const
Definition: DataLink.cpp:105
static const ACE_Time_Value max_time
#define ENOTSUP
char message_id_
The enum MessageId.
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
size_t length(void) const
static int mb_to_iov(const ACE_Message_Block &msg, iovec *iov)
bool open(const ACE_INET_Addr &remote_address)
Definition: UdpDataLink.cpp:46
ACE_INET_Addr remote_address_
Definition: UdpDataLink.h:69
virtual short codepoint() const
Access the mapped DiffServ codepoint value.
UdpTransport_rch UdpTransport_rch
Definition: UdpDataLink.h:32
RcHandle< T > make_rch()
Definition: RcHandle_T.h:256
void control_received(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
static const ConnectionInfoFlags CONNINFO_ALL
const ACE_Time_Value & value() const
ACE_SOCK_Dgram socket_
Definition: UdpDataLink.h:71
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
int ssize_t
int set_option(int level, int option, void *optval, int optlen) const
int get_type(void) const
#define SOL_SOCKET
int start(const TransportSendStrategy_rch &send_strategy, const TransportStrategy_rch &receive_strategy, bool invoke_all=true)
Definition: DataLink.inl:212
#define SO_SNDBUF
LM_DEBUG
ACE_INET_Addr & remote_address()
Definition: UdpDataLink.inl:34
Class to serialize and deserialize data for DDS.
Definition: Serializer.h:369
static size_t get_max_serialized_size()
Similar to IDL compiler generated methods.
#define VDBG(DBG_ARGS)
Priority & transport_priority()
Definition: DataLink.inl:21
int hostname(char name[], size_t maxnamelen)
Holds a data sample received by the transport.
void set_dscp_codepoint(int cp, ACE_SOCK &socket)
Definition: DataLink.cpp:1115
virtual ACE_Message_Block * release(void)
int control(int cmd, void *) const
ACE_Message_Block * cont(void) const
int get_local_addr(ACE_Addr &) const
Defines class that represents a transport packet header.
UdpDataLink(const UdpTransport_rch &transport, Priority priority, const ReactorTask_rch &reactor_task, bool active)
Definition: UdpDataLink.cpp:30
int set(const ACE_INET_Addr &)
ACE_TEXT("TCP_Factory")
map TRANSPORT_PRIORITY values directly.
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
#define OPENDDS_TEST_AND_CALL(TYPE, TEST, CALL)
Definition: Definitions.h:75
bool open_appropriate_socket_type(ACE_SOCK_Dgram &socket, const ACE_INET_Addr &local_address, int *proto_family)
ACE_CDR::Long Priority
u_short get_port_number(void) const
UdpReceiveStrategy_rch recv_strategy_
Definition: UdpDataLink.h:64
static const ACE_Time_Value zero
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
String get_fully_qualified_hostname(ACE_INET_Addr *addr)
#define ACE_NEW_RETURN(POINTER, CONSTRUCTOR, RET_VAL)
bool is_loopback_
Is remote attached to same transport ?
Definition: DataLink.h:461
#define SO_RCVBUF
#define ACE_ERROR_RETURN(X, Y)
RcHandle< T > dynamic_rchandle_cast(const RcHandle< U > &h)
Definition: RcHandle_T.h:214
#define ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY
ssize_t send(const void *buf, size_t n, const ACE_Addr &addr, int flags=0) const
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
int close(void)