00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpDataLink.h"
00009 #include "UdpTransport.h"
00010 #include "UdpInst.h"
00011
00012 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00013 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00014
00015 #include "ace/Default_Constants.h"
00016 #include "ace/Log_Msg.h"
00017
00018 #ifndef __ACE_INLINE__
00019 # include "UdpDataLink.inl"
00020 #endif
00021
00022 namespace OpenDDS {
00023 namespace DCPS {
00024
00025 UdpDataLink::UdpDataLink(UdpTransport* transport,
00026 Priority priority,
00027 bool active)
00028 : DataLink(transport,
00029 priority,
00030 false,
00031 active),
00032 active_(active),
00033 config_(0),
00034 reactor_task_(0)
00035 {
00036 }
00037
00038 bool
00039 UdpDataLink::open(const ACE_INET_Addr& remote_address)
00040 {
00041 this->remote_address_ = remote_address;
00042 this->is_loopback_ = this->remote_address_ == this->config_->local_address();
00043
00044 ACE_INET_Addr local_address;
00045 if (this->active_) {
00046 if (local_address.get_type() != remote_address.get_type()) {
00047 local_address.set(0, "", 0, remote_address.get_type());
00048 }
00049 } else {
00050 local_address = this->config_->local_address();
00051 }
00052
00053 if (!open_appropriate_socket_type(this->socket_, local_address)) {
00054 ACE_ERROR_RETURN((LM_ERROR,
00055 ACE_TEXT("(%P|%t) ERROR: ")
00056 ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00057 false);
00058 }
00059
00060 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00061 local_address.get_host_addr(), local_address.get_port_number()));
00062
00063
00064
00065
00066 if (!this->active_ && this->config_->local_address().is_any()) {
00067 ACE_INET_Addr address;
00068 if (this->socket_.get_local_addr(address) != 0) {
00069 ACE_ERROR_RETURN((LM_ERROR,
00070 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00071 ACE_TEXT("cannot get local addr\n")), false);
00072 }
00073 const unsigned short port = address.get_port_number();
00074 const std::string hostname = get_fully_qualified_hostname();
00075 VDBG_LVL((LM_DEBUG,
00076 ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00077 hostname.c_str(), port), 2);
00078 this->config_->local_address(port, hostname.c_str());
00079
00080
00081 } else if (!this->active_ &&
00082 0 == this->config_->local_address().get_port_number()) {
00083 ACE_INET_Addr address;
00084 if (this->socket_.get_local_addr(address) != 0) {
00085 ACE_ERROR_RETURN((LM_ERROR,
00086 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00087 ACE_TEXT("cannot get local addr\n")), false);
00088 }
00089 const unsigned short port = address.get_port_number();
00090 VDBG_LVL((LM_DEBUG,
00091 ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00092 port), 2);
00093 this->config_->local_address_set_port(port);
00094 }
00095
00096 if (this->config_->send_buffer_size_ > 0) {
00097 int snd_size = this->config_->send_buffer_size_;
00098 if (this->socket_.set_option(SOL_SOCKET,
00099 SO_SNDBUF,
00100 (void *) &snd_size,
00101 sizeof(snd_size)) < 0
00102 && errno != ENOTSUP) {
00103 ACE_ERROR_RETURN((LM_ERROR,
00104 ACE_TEXT("(%P|%t) ERROR: ")
00105 ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00106 snd_size),
00107 false);
00108 }
00109 }
00110
00111 if (this->config_->send_buffer_size_ > 0) {
00112 int rcv_size = this->config_->rcv_buffer_size_;
00113 if (this->socket_.set_option(SOL_SOCKET,
00114 SO_RCVBUF,
00115 (void *) &rcv_size,
00116 sizeof(int)) < 0
00117 && errno != ENOTSUP) {
00118 ACE_ERROR_RETURN((LM_ERROR,
00119 ACE_TEXT("(%P|%t) ERROR: ")
00120 ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00121 rcv_size),
00122 false);
00123 }
00124 }
00125
00126 #ifdef ACE_WIN32
00127
00128
00129
00130
00131
00132 BOOL recv_udp_connreset = FALSE;
00133 socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00134 #endif
00135
00136 if (this->active_) {
00137
00138 DirectPriorityMapper mapper(this->transport_priority());
00139 this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00140
00141
00142
00143 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00144 remote_address.get_host_addr(), remote_address.get_port_number()));
00145
00146 TransportLocator info;
00147 this->impl()->connection_info_i(info);
00148 ACE_Message_Block* data_block;
00149 ACE_NEW_RETURN(data_block,
00150 ACE_Message_Block(info.data.length()+sizeof(Priority),
00151 ACE_Message_Block::MB_DATA,
00152 0,
00153 0,
00154 0,
00155 0,
00156 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00157 ACE_Time_Value::zero,
00158 ACE_Time_Value::max_time,
00159 0,
00160 0),
00161 0);
00162
00163 Serializer serializer(data_block);
00164 serializer << this->transport_priority();
00165 serializer.write_octet_array(info.data.get_buffer(),
00166 info.data.length());
00167
00168 DataSampleHeader sample_header;
00169 sample_header.message_id_ = TRANSPORT_CONTROL;
00170 sample_header.message_length_ =
00171 static_cast<ACE_UINT32>(data_block->length());
00172 ACE_Message_Block* sample_header_block;
00173 ACE_NEW_RETURN(sample_header_block,
00174 ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00175 ACE_Message_Block::MB_DATA,
00176 0,
00177 0,
00178 0,
00179 0,
00180 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00181 ACE_Time_Value::zero,
00182 ACE_Time_Value::max_time,
00183 0,
00184 0),
00185 0);
00186 *sample_header_block << sample_header;
00187 sample_header_block->cont(data_block);
00188
00189 ACE_Message_Block* transport_header_block;
00190 TransportHeader transport_header;
00191 ACE_NEW_RETURN(transport_header_block,
00192 ACE_Message_Block(TransportHeader::max_marshaled_size(),
00193 ACE_Message_Block::MB_DATA,
00194 0,
00195 0,
00196 0,
00197 0,
00198 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00199 ACE_Time_Value::zero,
00200 ACE_Time_Value::max_time,
00201 0,
00202 0),
00203 0);
00204
00205 transport_header.length_ =
00206 static_cast<ACE_UINT32>(data_block->length() +
00207 sample_header_block->length());
00208 *transport_header_block << transport_header;
00209 transport_header_block->cont(sample_header_block);
00210
00211 iovec iov[MAX_SEND_BLOCKS];
00212 const int num_blocks =
00213 TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00214 const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00215 transport_header_block->release();
00216 if (sent < 0) {
00217 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00218 ACE_TEXT("failed to send handshake %m\n")),
00219 false);
00220 }
00221
00222
00223
00224 const size_t size = 32;
00225 char buff[size];
00226
00227
00228 ACE_Time_Value tv(30);
00229 const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00230 if (recvd == 1) {
00231
00232 VDBG_LVL((LM_DEBUG,
00233 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00234 2);
00235 } else if (recvd < 0) {
00236
00237 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00238 ACE_TEXT("failed to receive handshake ack %p\n"),
00239 ACE_TEXT("recv")), false);
00240 } else {
00241 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00242 ACE_TEXT("failed to receive handshake ack ")
00243 ACE_TEXT("recv returned %b\n"), recvd),
00244 false);
00245 }
00246 }
00247
00248 if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00249 static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00250 != 0) {
00251 stop_i();
00252 ACE_ERROR_RETURN((LM_ERROR,
00253 ACE_TEXT("(%P|%t) ERROR: ")
00254 ACE_TEXT("UdpDataLink::open: start failed!\n")),
00255 false);
00256 }
00257
00258 return true;
00259 }
00260
00261 void
00262 UdpDataLink::control_received(ReceivedDataSample& sample,
00263 const ACE_INET_Addr& remote_address)
00264 {
00265 TransportImpl_rch impl = this->impl();
00266 RcHandle<UdpTransport> ut = static_rchandle_cast<UdpTransport>(impl);
00267
00268
00269
00270
00271 ut->passive_connection(remote_address, sample.sample_);
00272 }
00273
00274 void
00275 UdpDataLink::stop_i()
00276 {
00277 this->socket_.close();
00278 }
00279
00280 }
00281 }