00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpDataLink.h"
00009 #include "UdpTransport.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013
00014
00015 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017
00018 #include "ace/Default_Constants.h"
00019 #include "ace/Log_Msg.h"
00020
00021 #ifndef __ACE_INLINE__
00022 # include "UdpDataLink.inl"
00023 #endif
00024
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026
00027 namespace OpenDDS {
00028 namespace DCPS {
00029
00030 UdpDataLink::UdpDataLink(UdpTransport& transport,
00031 Priority priority,
00032 TransportReactorTask* reactor_task,
00033 bool active)
00034 : DataLink(transport,
00035 priority,
00036 false,
00037 active),
00038 active_(active),
00039 reactor_task_(reactor_task),
00040 send_strategy_(make_rch<UdpSendStrategy>(this)),
00041 recv_strategy_(make_rch<UdpReceiveStrategy>(this))
00042 {
00043 }
00044
00045 bool
00046 UdpDataLink::open(const ACE_INET_Addr& remote_address)
00047 {
00048 this->remote_address_ = remote_address;
00049
00050 UdpInst& config = static_cast<UdpTransport&>(this->impl()).config();
00051
00052 this->is_loopback_ = this->remote_address_ == config.local_address();
00053
00054 ACE_INET_Addr local_address;
00055 if (this->active_) {
00056 if (local_address.get_type() != remote_address.get_type()) {
00057 local_address.set(0, "", 0, remote_address.get_type());
00058 }
00059 } else {
00060 local_address = config.local_address();
00061 }
00062
00063 if (!open_appropriate_socket_type(this->socket_, local_address)) {
00064 ACE_ERROR_RETURN((LM_ERROR,
00065 ACE_TEXT("(%P|%t) ERROR: ")
00066 ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00067 false);
00068 }
00069
00070 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00071 local_address.get_host_addr(), local_address.get_port_number()));
00072
00073
00074
00075
00076 if (!this->active_ && config.local_address().is_any()) {
00077 ACE_INET_Addr address;
00078 if (this->socket_.get_local_addr(address) != 0) {
00079 ACE_ERROR_RETURN((LM_ERROR,
00080 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00081 ACE_TEXT("cannot get local addr\n")), false);
00082 }
00083 const unsigned short port = address.get_port_number();
00084 const std::string hostname = get_fully_qualified_hostname();
00085 VDBG_LVL((LM_DEBUG,
00086 ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00087 hostname.c_str(), port), 2);
00088
00089 config.local_address(port, hostname.c_str());
00090
00091
00092 } else if (!this->active_ &&
00093 0 == config.local_address().get_port_number()) {
00094 ACE_INET_Addr address;
00095 if (this->socket_.get_local_addr(address) != 0) {
00096 ACE_ERROR_RETURN((LM_ERROR,
00097 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00098 ACE_TEXT("cannot get local addr\n")), false);
00099 }
00100 const unsigned short port = address.get_port_number();
00101 VDBG_LVL((LM_DEBUG,
00102 ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00103 port), 2);
00104 config.local_address_set_port(port);
00105 }
00106
00107 if (config.send_buffer_size_ > 0) {
00108 int snd_size = config.send_buffer_size_;
00109 if (this->socket_.set_option(SOL_SOCKET,
00110 SO_SNDBUF,
00111 (void *) &snd_size,
00112 sizeof(snd_size)) < 0
00113 && errno != ENOTSUP) {
00114 ACE_ERROR_RETURN((LM_ERROR,
00115 ACE_TEXT("(%P|%t) ERROR: ")
00116 ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00117 snd_size),
00118 false);
00119 }
00120 }
00121
00122 if (config.rcv_buffer_size_ > 0) {
00123 int rcv_size = config.rcv_buffer_size_;
00124 if (this->socket_.set_option(SOL_SOCKET,
00125 SO_RCVBUF,
00126 (void *) &rcv_size,
00127 sizeof(int)) < 0
00128 && errno != ENOTSUP) {
00129 ACE_ERROR_RETURN((LM_ERROR,
00130 ACE_TEXT("(%P|%t) ERROR: ")
00131 ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00132 rcv_size),
00133 false);
00134 }
00135 }
00136
00137 #ifdef ACE_WIN32
00138
00139
00140
00141
00142
00143 BOOL recv_udp_connreset = FALSE;
00144 socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00145 #endif
00146
00147 if (this->active_) {
00148
00149 DirectPriorityMapper mapper(this->transport_priority());
00150 this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00151
00152
00153
00154 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00155 remote_address.get_host_addr(), remote_address.get_port_number()));
00156
00157 TransportLocator info;
00158 this->impl().connection_info_i(info);
00159 ACE_Message_Block* data_block;
00160 ACE_NEW_RETURN(data_block,
00161 ACE_Message_Block(info.data.length()+sizeof(Priority),
00162 ACE_Message_Block::MB_DATA,
00163 0,
00164 0,
00165 0,
00166 0,
00167 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00168 ACE_Time_Value::zero,
00169 ACE_Time_Value::max_time,
00170 0,
00171 0),
00172 0);
00173
00174 Serializer serializer(data_block);
00175 serializer << this->transport_priority();
00176 serializer.write_octet_array(info.data.get_buffer(),
00177 info.data.length());
00178
00179 DataSampleHeader sample_header;
00180 sample_header.message_id_ = TRANSPORT_CONTROL;
00181 sample_header.message_length_ =
00182 static_cast<ACE_UINT32>(data_block->length());
00183 ACE_Message_Block* sample_header_block;
00184 ACE_NEW_RETURN(sample_header_block,
00185 ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00186 ACE_Message_Block::MB_DATA,
00187 0,
00188 0,
00189 0,
00190 0,
00191 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00192 ACE_Time_Value::zero,
00193 ACE_Time_Value::max_time,
00194 0,
00195 0),
00196 0);
00197 *sample_header_block << sample_header;
00198 sample_header_block->cont(data_block);
00199
00200 ACE_Message_Block* transport_header_block;
00201 TransportHeader transport_header;
00202 ACE_NEW_RETURN(transport_header_block,
00203 ACE_Message_Block(TransportHeader::max_marshaled_size(),
00204 ACE_Message_Block::MB_DATA,
00205 0,
00206 0,
00207 0,
00208 0,
00209 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00210 ACE_Time_Value::zero,
00211 ACE_Time_Value::max_time,
00212 0,
00213 0),
00214 0);
00215
00216 transport_header.length_ =
00217 static_cast<ACE_UINT32>(data_block->length() +
00218 sample_header_block->length());
00219 *transport_header_block << transport_header;
00220 transport_header_block->cont(sample_header_block);
00221
00222 iovec iov[MAX_SEND_BLOCKS];
00223 const int num_blocks =
00224 TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00225 const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00226 transport_header_block->release();
00227 if (sent < 0) {
00228 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00229 ACE_TEXT("failed to send handshake %m\n")),
00230 false);
00231 }
00232
00233
00234
00235 const size_t size = 32;
00236 char buff[size];
00237
00238
00239 ACE_Time_Value tv(30);
00240 const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00241 if (recvd == 1) {
00242
00243 VDBG_LVL((LM_DEBUG,
00244 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00245 2);
00246 } else if (recvd < 0) {
00247
00248 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00249 ACE_TEXT("failed to receive handshake ack %p\n"),
00250 ACE_TEXT("recv")), false);
00251 } else {
00252 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00253 ACE_TEXT("failed to receive handshake ack ")
00254 ACE_TEXT("recv returned %b\n"), recvd),
00255 false);
00256 }
00257 }
00258
00259 if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00260 static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00261 != 0) {
00262 stop_i();
00263 ACE_ERROR_RETURN((LM_ERROR,
00264 ACE_TEXT("(%P|%t) ERROR: ")
00265 ACE_TEXT("UdpDataLink::open: start failed!\n")),
00266 false);
00267 }
00268
00269 return true;
00270 }
00271
00272 void
00273 UdpDataLink::control_received(ReceivedDataSample& sample,
00274 const ACE_INET_Addr& remote_address)
00275 {
00276
00277
00278
00279
00280 static_cast<UdpTransport&>(this->impl()).passive_connection(remote_address, sample.sample_);
00281 }
00282
00283 void
00284 UdpDataLink::stop_i()
00285 {
00286 this->socket_.close();
00287 }
00288
00289 }
00290 }
00291
00292 OPENDDS_END_VERSIONED_NAMESPACE_DECL