UdpDataLink.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpDataLink.h"
00009 #include "UdpTransport.h"
00010 #include "UdpInst.h"
00011 #include "UdpSendStrategy.h"
00012 #include "UdpReceiveStrategy.h"
00013 
00014 
00015 #include "dds/DCPS/transport/framework/NetworkAddress.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017 
00018 #include "ace/Default_Constants.h"
00019 #include "ace/Log_Msg.h"
00020 
00021 #ifndef __ACE_INLINE__
00022 # include "UdpDataLink.inl"
00023 #endif  /* __ACE_INLINE__ */
00024 
00025 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00026 
00027 namespace OpenDDS {
00028 namespace DCPS {
00029 
00030 UdpDataLink::UdpDataLink(UdpTransport& transport,
00031                          Priority   priority,
00032                          TransportReactorTask* reactor_task,
00033                          bool       active)
00034   : DataLink(transport,
00035              priority,
00036              false, // is_loopback,
00037              active),// is_active
00038     active_(active),
00039     reactor_task_(reactor_task),
00040     send_strategy_(make_rch<UdpSendStrategy>(this)),
00041     recv_strategy_(make_rch<UdpReceiveStrategy>(this))
00042 {
00043 }
00044 
00045 bool
00046 UdpDataLink::open(const ACE_INET_Addr& remote_address)
00047 {
00048   this->remote_address_ = remote_address;
00049 
00050   UdpInst& config = static_cast<UdpTransport&>(this->impl()).config();
00051 
00052   this->is_loopback_ = this->remote_address_ == config.local_address();
00053 
00054   ACE_INET_Addr local_address;
00055   if (this->active_) {
00056     if (local_address.get_type() != remote_address.get_type()) {
00057       local_address.set(0, "", 0, remote_address.get_type());
00058     }
00059   } else {
00060     local_address = config.local_address();
00061   }
00062 
00063   if (!open_appropriate_socket_type(this->socket_, local_address)) {
00064     ACE_ERROR_RETURN((LM_ERROR,
00065       ACE_TEXT("(%P|%t) ERROR: ")
00066       ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00067       false);
00068   }
00069 
00070   VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00071         local_address.get_host_addr(), local_address.get_port_number()));
00072 
00073   // If listening on "any" host/port, need to record the actual port number
00074   // selected by the OS, as well as our actual hostname, into the config_
00075   // object's local_address_ for use in UdpTransport::connection_info_i().
00076   if (!this->active_ && config.local_address().is_any()) {
00077     ACE_INET_Addr address;
00078     if (this->socket_.get_local_addr(address) != 0) {
00079       ACE_ERROR_RETURN((LM_ERROR,
00080                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00081                         ACE_TEXT("cannot get local addr\n")), false);
00082     }
00083     const unsigned short port = address.get_port_number();
00084     const std::string hostname = get_fully_qualified_hostname();
00085     VDBG_LVL((LM_DEBUG,
00086               ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00087               hostname.c_str(), port), 2);
00088 
00089     config.local_address(port, hostname.c_str());
00090 
00091   // Similar case to the "if" case above, but with a bound host/IP but no port
00092   } else if (!this->active_ &&
00093              0 == config.local_address().get_port_number()) {
00094     ACE_INET_Addr address;
00095     if (this->socket_.get_local_addr(address) != 0) {
00096       ACE_ERROR_RETURN((LM_ERROR,
00097                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00098                         ACE_TEXT("cannot get local addr\n")), false);
00099     }
00100     const unsigned short port = address.get_port_number();
00101     VDBG_LVL((LM_DEBUG,
00102               ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00103               port), 2);
00104     config.local_address_set_port(port);
00105   }
00106 
00107   if (config.send_buffer_size_ > 0) {
00108     int snd_size = config.send_buffer_size_;
00109     if (this->socket_.set_option(SOL_SOCKET,
00110                                 SO_SNDBUF,
00111                                 (void *) &snd_size,
00112                                 sizeof(snd_size)) < 0
00113         && errno != ENOTSUP) {
00114       ACE_ERROR_RETURN((LM_ERROR,
00115                         ACE_TEXT("(%P|%t) ERROR: ")
00116                         ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00117                         snd_size),
00118                        false);
00119     }
00120   }
00121 
00122   if (config.rcv_buffer_size_ > 0) {
00123     int rcv_size = config.rcv_buffer_size_;
00124     if (this->socket_.set_option(SOL_SOCKET,
00125                                 SO_RCVBUF,
00126                                 (void *) &rcv_size,
00127                                 sizeof(int)) < 0
00128         && errno != ENOTSUP) {
00129       ACE_ERROR_RETURN((LM_ERROR,
00130                         ACE_TEXT("(%P|%t) ERROR: ")
00131                         ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00132                         rcv_size),
00133                        false);
00134     }
00135   }
00136 
00137 #ifdef ACE_WIN32
00138   // By default Winsock will cause reads to fail with "connection reset"
00139   // when UDP sends result in ICMP "port unreachable" messages.
00140   // The transport framework is not set up for this since returning <= 0
00141   // from our receive_bytes causes the framework to close down the datalink
00142   // which in this case is used to receive from multiple peers.
00143   BOOL recv_udp_connreset = FALSE;
00144   socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00145 #endif
00146 
00147   if (this->active_) {
00148     // Set the DiffServ codepoint according to the priority value.
00149     DirectPriorityMapper mapper(this->transport_priority());
00150     this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00151 
00152 
00153     // For the active side, send the blob and wait for a 1 byte ack.
00154     VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00155       remote_address.get_host_addr(), remote_address.get_port_number()));
00156 
00157     TransportLocator info;
00158     this->impl().connection_info_i(info);
00159     ACE_Message_Block* data_block;
00160     ACE_NEW_RETURN(data_block,
00161                    ACE_Message_Block(info.data.length()+sizeof(Priority),
00162                                      ACE_Message_Block::MB_DATA,
00163                                      0, //cont
00164                                      0, //data
00165                                      0, //allocator_strategy
00166                                      0, //locking_strategy
00167                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00168                                      ACE_Time_Value::zero,
00169                                      ACE_Time_Value::max_time,
00170                                      0,
00171                                      0),
00172                    0);
00173 
00174     Serializer serializer(data_block);
00175     serializer << this->transport_priority();
00176     serializer.write_octet_array(info.data.get_buffer(),
00177                                  info.data.length());
00178 
00179     DataSampleHeader sample_header;
00180     sample_header.message_id_ = TRANSPORT_CONTROL;
00181     sample_header.message_length_ =
00182       static_cast<ACE_UINT32>(data_block->length());
00183     ACE_Message_Block* sample_header_block;
00184     ACE_NEW_RETURN(sample_header_block,
00185                    ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00186                                      ACE_Message_Block::MB_DATA,
00187                                      0, //cont
00188                                      0, //data
00189                                      0, //allocator_strategy
00190                                      0, //locking_strategy
00191                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00192                                      ACE_Time_Value::zero,
00193                                      ACE_Time_Value::max_time,
00194                                      0,
00195                                      0),
00196                    0);
00197     *sample_header_block << sample_header;
00198     sample_header_block->cont(data_block);
00199 
00200     ACE_Message_Block* transport_header_block;
00201     TransportHeader transport_header;
00202     ACE_NEW_RETURN(transport_header_block,
00203                    ACE_Message_Block(TransportHeader::max_marshaled_size(),
00204                                      ACE_Message_Block::MB_DATA,
00205                                      0,
00206                                      0,
00207                                      0,
00208                                      0,
00209                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00210                                      ACE_Time_Value::zero,
00211                                      ACE_Time_Value::max_time,
00212                                      0,
00213                                      0),
00214                    0);
00215 
00216     transport_header.length_ =
00217       static_cast<ACE_UINT32>(data_block->length() +
00218                               sample_header_block->length());
00219     *transport_header_block << transport_header;
00220     transport_header_block->cont(sample_header_block);
00221 
00222     iovec iov[MAX_SEND_BLOCKS];
00223     const int num_blocks =
00224       TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00225     const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00226     transport_header_block->release();
00227     if (sent < 0) {
00228       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00229                                   ACE_TEXT("failed to send handshake %m\n")),
00230                        false);
00231     }
00232 
00233     // Need to wait for the 1 byte ack from the passive side before returning
00234     // the link (and indicating success).
00235     const size_t size = 32;
00236     char buff[size];
00237     // Default this timeout to 30.  We may want to make this settable
00238     // or use another settable timeout value here.
00239     ACE_Time_Value tv(30);
00240     const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00241     if (recvd == 1) {
00242       // Expected value
00243       VDBG_LVL((LM_DEBUG,
00244                 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00245                2);
00246     } else if (recvd < 0) {
00247       // Not a handshake ack, something is wrong
00248       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00249                                   ACE_TEXT("failed to receive handshake ack %p\n"),
00250                         ACE_TEXT("recv")), false);
00251     } else {
00252       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00253                                   ACE_TEXT("failed to receive handshake ack ")
00254                                   ACE_TEXT("recv returned %b\n"), recvd),
00255                        false);
00256     }
00257   }
00258 
00259   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00260             static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00261       != 0) {
00262     stop_i();
00263     ACE_ERROR_RETURN((LM_ERROR,
00264                       ACE_TEXT("(%P|%t) ERROR: ")
00265                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00266                      false);
00267   }
00268 
00269   return true;
00270 }
00271 
00272 void
00273 UdpDataLink::control_received(ReceivedDataSample& sample,
00274                               const ACE_INET_Addr& remote_address)
00275 {
00276   // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
00277   // the connection handshaking, so receiving one is an indication of the
00278   // passive_connection event.  In the future the submessage_id_ could be used
00279   // to allow different types of messages here.
00280   static_cast<UdpTransport&>(this->impl()).passive_connection(remote_address, sample.sample_);
00281 }
00282 
00283 void
00284 UdpDataLink::stop_i()
00285 {
00286   this->socket_.close();
00287 }
00288 
00289 } // namespace DCPS
00290 } // namespace OpenDDS
00291 
00292 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1