UdpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpDataLink.h"
00009 #include "UdpTransport.h"
00010 #include "UdpInst.h"
00011 
00012 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00013 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00014 
00015 #include "ace/Default_Constants.h"
00016 #include "ace/Log_Msg.h"
00017 
00018 #ifndef __ACE_INLINE__
00019 # include "UdpDataLink.inl"
00020 #endif  /* __ACE_INLINE__ */
00021 
00022 namespace OpenDDS {
00023 namespace DCPS {
00024 
00025 UdpDataLink::UdpDataLink(UdpTransport* transport,
00026                          Priority   priority,
00027                          bool          active)
00028   : DataLink(transport,
00029              priority,
00030              false, // is_loopback,
00031              active),// is_active
00032     active_(active),
00033     config_(0),
00034     reactor_task_(0)
00035 {
00036 }
00037 
00038 bool
00039 UdpDataLink::open(const ACE_INET_Addr& remote_address)
00040 {
00041   this->remote_address_ = remote_address;
00042   this->is_loopback_ = this->remote_address_ == this->config_->local_address();
00043 
00044   ACE_INET_Addr local_address;
00045   if (this->active_) {
00046     if (local_address.get_type() != remote_address.get_type()) {
00047       local_address.set(0, "", 0, remote_address.get_type());
00048     }
00049   } else {
00050     local_address = this->config_->local_address();
00051   }
00052 
00053   if (!open_appropriate_socket_type(this->socket_, local_address)) {
00054     ACE_ERROR_RETURN((LM_ERROR,
00055       ACE_TEXT("(%P|%t) ERROR: ")
00056       ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00057       false);
00058   }
00059 
00060   VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00061         local_address.get_host_addr(), local_address.get_port_number()));
00062 
00063   // If listening on "any" host/port, need to record the actual port number
00064   // selected by the OS, as well as our actual hostname, into the config_
00065   // object's local_address_ for use in UdpTransport::connection_info_i().
00066   if (!this->active_ && this->config_->local_address().is_any()) {
00067     ACE_INET_Addr address;
00068     if (this->socket_.get_local_addr(address) != 0) {
00069       ACE_ERROR_RETURN((LM_ERROR,
00070                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00071                         ACE_TEXT("cannot get local addr\n")), false);
00072     }
00073     const unsigned short port = address.get_port_number();
00074     const std::string hostname = get_fully_qualified_hostname();
00075     VDBG_LVL((LM_DEBUG,
00076               ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00077               hostname.c_str(), port), 2);
00078     this->config_->local_address(port, hostname.c_str());
00079 
00080   // Similar case to the "if" case above, but with a bound host/IP but no port
00081   } else if (!this->active_ &&
00082              0 == this->config_->local_address().get_port_number()) {
00083     ACE_INET_Addr address;
00084     if (this->socket_.get_local_addr(address) != 0) {
00085       ACE_ERROR_RETURN((LM_ERROR,
00086                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00087                         ACE_TEXT("cannot get local addr\n")), false);
00088     }
00089     const unsigned short port = address.get_port_number();
00090     VDBG_LVL((LM_DEBUG,
00091               ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00092               port), 2);
00093     this->config_->local_address_set_port(port);
00094   }
00095 
00096   if (this->config_->send_buffer_size_ > 0) {
00097     int snd_size = this->config_->send_buffer_size_;
00098     if (this->socket_.set_option(SOL_SOCKET,
00099                                 SO_SNDBUF,
00100                                 (void *) &snd_size,
00101                                 sizeof(snd_size)) < 0
00102         && errno != ENOTSUP) {
00103       ACE_ERROR_RETURN((LM_ERROR,
00104                         ACE_TEXT("(%P|%t) ERROR: ")
00105                         ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00106                         snd_size),
00107                        false);
00108     }
00109   }
00110 
00111   if (this->config_->send_buffer_size_ > 0) {
00112     int rcv_size = this->config_->rcv_buffer_size_;
00113     if (this->socket_.set_option(SOL_SOCKET,
00114                                 SO_RCVBUF,
00115                                 (void *) &rcv_size,
00116                                 sizeof(int)) < 0
00117         && errno != ENOTSUP) {
00118       ACE_ERROR_RETURN((LM_ERROR,
00119                         ACE_TEXT("(%P|%t) ERROR: ")
00120                         ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00121                         rcv_size),
00122                        false);
00123     }
00124   }
00125 
00126 #ifdef ACE_WIN32
00127   // By default Winsock will cause reads to fail with "connection reset"
00128   // when UDP sends result in ICMP "port unreachable" messages.
00129   // The transport framework is not set up for this since returning <= 0
00130   // from our receive_bytes causes the framework to close down the datalink
00131   // which in this case is used to receive from multiple peers.
00132   BOOL recv_udp_connreset = FALSE;
00133   socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00134 #endif
00135 
00136   if (this->active_) {
00137     // Set the DiffServ codepoint according to the priority value.
00138     DirectPriorityMapper mapper(this->transport_priority());
00139     this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00140 
00141 
00142     // For the active side, send the blob and wait for a 1 byte ack.
00143     VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00144       remote_address.get_host_addr(), remote_address.get_port_number()));
00145 
00146     TransportLocator info;
00147     this->impl()->connection_info_i(info);
00148     ACE_Message_Block* data_block;
00149     ACE_NEW_RETURN(data_block,
00150                    ACE_Message_Block(info.data.length()+sizeof(Priority),
00151                                      ACE_Message_Block::MB_DATA,
00152                                      0, //cont
00153                                      0, //data
00154                                      0, //allocator_strategy
00155                                      0, //locking_strategy
00156                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00157                                      ACE_Time_Value::zero,
00158                                      ACE_Time_Value::max_time,
00159                                      0,
00160                                      0),
00161                    0);
00162 
00163     Serializer serializer(data_block);
00164     serializer << this->transport_priority();
00165     serializer.write_octet_array(info.data.get_buffer(),
00166                                  info.data.length());
00167 
00168     DataSampleHeader sample_header;
00169     sample_header.message_id_ = TRANSPORT_CONTROL;
00170     sample_header.message_length_ =
00171       static_cast<ACE_UINT32>(data_block->length());
00172     ACE_Message_Block* sample_header_block;
00173     ACE_NEW_RETURN(sample_header_block,
00174                    ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00175                                      ACE_Message_Block::MB_DATA,
00176                                      0, //cont
00177                                      0, //data
00178                                      0, //allocator_strategy
00179                                      0, //locking_strategy
00180                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00181                                      ACE_Time_Value::zero,
00182                                      ACE_Time_Value::max_time,
00183                                      0,
00184                                      0),
00185                    0);
00186     *sample_header_block << sample_header;
00187     sample_header_block->cont(data_block);
00188 
00189     ACE_Message_Block* transport_header_block;
00190     TransportHeader transport_header;
00191     ACE_NEW_RETURN(transport_header_block,
00192                    ACE_Message_Block(TransportHeader::max_marshaled_size(),
00193                                      ACE_Message_Block::MB_DATA,
00194                                      0,
00195                                      0,
00196                                      0,
00197                                      0,
00198                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00199                                      ACE_Time_Value::zero,
00200                                      ACE_Time_Value::max_time,
00201                                      0,
00202                                      0),
00203                    0);
00204 
00205     transport_header.length_ =
00206       static_cast<ACE_UINT32>(data_block->length() +
00207                               sample_header_block->length());
00208     *transport_header_block << transport_header;
00209     transport_header_block->cont(sample_header_block);
00210 
00211     iovec iov[MAX_SEND_BLOCKS];
00212     const int num_blocks =
00213       TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00214     const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00215     transport_header_block->release();
00216     if (sent < 0) {
00217       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00218                                   ACE_TEXT("failed to send handshake %m\n")),
00219                        false);
00220     }
00221 
00222     // Need to wait for the 1 byte ack from the passive side before returning
00223     // the link (and indicating success).
00224     const size_t size = 32;
00225     char buff[size];
00226     // Default this timeout to 30.  We may want to make this settable
00227     // or use another settable timeout value here.
00228     ACE_Time_Value tv(30);
00229     const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00230     if (recvd == 1) {
00231       // Expected value
00232       VDBG_LVL((LM_DEBUG,
00233                 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00234                2);
00235     } else if (recvd < 0) {
00236       // Not a handshake ack, something is wrong
00237       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00238                                   ACE_TEXT("failed to receive handshake ack %p\n"),
00239                         ACE_TEXT("recv")), false);
00240     } else {
00241       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00242                                   ACE_TEXT("failed to receive handshake ack ")
00243                                   ACE_TEXT("recv returned %b\n"), recvd),
00244                        false);
00245     }
00246   }
00247 
00248   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00249             static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00250       != 0) {
00251     stop_i();
00252     ACE_ERROR_RETURN((LM_ERROR,
00253                       ACE_TEXT("(%P|%t) ERROR: ")
00254                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00255                      false);
00256   }
00257 
00258   return true;
00259 }
00260 
00261 void
00262 UdpDataLink::control_received(ReceivedDataSample& sample,
00263                               const ACE_INET_Addr& remote_address)
00264 {
00265   TransportImpl_rch impl = this->impl();
00266   RcHandle<UdpTransport> ut = static_rchandle_cast<UdpTransport>(impl);
00267   // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
00268   // the connection handshaking, so receiving one is an indication of the
00269   // passive_connection event.  In the future the submessage_id_ could be used
00270   // to allow different types of messages here.
00271   ut->passive_connection(remote_address, sample.sample_);
00272 }
00273 
00274 void
00275 UdpDataLink::stop_i()
00276 {
00277   this->socket_.close();
00278 }
00279 
00280 } // namespace DCPS
00281 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7