#include <UdpDataLink.h>
Public Member Functions | |
UdpDataLink (UdpTransport &transport, Priority priority, TransportReactorTask *reactor_task, bool active) | |
bool | active () const |
TransportReactorTask * | reactor_task () |
ACE_Reactor * | get_reactor () |
ACE_INET_Addr & | remote_address () |
ACE_SOCK_Dgram & | socket () |
bool | open (const ACE_INET_Addr &remote_address) |
void | control_received (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Protected Member Functions | |
virtual void | stop_i () |
Protected Attributes | |
bool | active_ |
TransportReactorTask * | reactor_task_ |
UdpSendStrategy_rch | send_strategy_ |
The transport send strategy object for this DataLink. | |
UdpReceiveStrategy_rch | recv_strategy_ |
Private Attributes | |
ACE_INET_Addr | remote_address_ |
ACE_SOCK_Dgram | socket_ |
Definition at line 34 of file UdpDataLink.h.
OpenDDS::DCPS::UdpDataLink::UdpDataLink | ( | UdpTransport & | transport, | |
Priority | priority, | |||
TransportReactorTask * | reactor_task, | |||
bool | active | |||
) |
Definition at line 30 of file UdpDataLink.cpp.
00034 : DataLink(transport, 00035 priority, 00036 false, // is_loopback, 00037 active),// is_active 00038 active_(active), 00039 reactor_task_(reactor_task), 00040 send_strategy_(make_rch<UdpSendStrategy>(this)), 00041 recv_strategy_(make_rch<UdpReceiveStrategy>(this)) 00042 { 00043 }
ACE_INLINE bool OpenDDS::DCPS::UdpDataLink::active | ( | void | ) | const |
Definition at line 14 of file UdpDataLink.inl.
References active_.
00015 { 00016 return this->active_; 00017 }
void OpenDDS::DCPS::UdpDataLink::control_received | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) |
Definition at line 273 of file UdpDataLink.cpp.
References OpenDDS::DCPS::DataLink::impl(), and OpenDDS::DCPS::ReceivedDataSample::sample_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample().
00275 { 00276 // At this time, the TRANSPORT_CONTROL messages in Udp are only used for 00277 // the connection handshaking, so receiving one is an indication of the 00278 // passive_connection event. In the future the submessage_id_ could be used 00279 // to allow different types of messages here. 00280 static_cast<UdpTransport&>(this->impl()).passive_connection(remote_address, sample.sample_); 00281 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::UdpDataLink::get_reactor | ( | void | ) |
Definition at line 27 of file UdpDataLink.inl.
References OpenDDS::DCPS::TransportReactorTask::get_reactor(), and reactor_task_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::start_i(), and OpenDDS::DCPS::UdpReceiveStrategy::stop_i().
00028 { 00029 if (this->reactor_task_ == 0) return 0; 00030 return this->reactor_task_->get_reactor(); 00031 }
bool OpenDDS::DCPS::UdpDataLink::open | ( | const ACE_INET_Addr & | remote_address | ) |
Definition at line 46 of file UdpDataLink.cpp.
References ACE_NEW_RETURN(), ACE_TEXT(), active_, OpenDDS::DCPS::DirectPriorityMapper::codepoint(), OpenDDS::DCPS::TransportImpl::connection_info_i(), ACE_Message_Block::cont(), ACE_IPC_SAP::control(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::get_fully_qualified_hostname(), ACE_INET_Addr::get_host_addr(), ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), ACE_Addr::get_type(), hostname(), OpenDDS::DCPS::DataLink::impl(), ACE_INET_Addr::is_any(), OpenDDS::DCPS::DataLink::is_loopback_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::UdpInst::local_address(), OpenDDS::DCPS::UdpInst::local_address_set_port(), OpenDDS::DCPS::TransportHeader::max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::MAX_SEND_BLOCKS, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::open_appropriate_socket_type(), OpenDDS::DCPS::UdpInst::rcv_buffer_size_, ACE_SOCK_Dgram::recv(), recv_strategy_, ACE_Message_Block::release(), remote_address_, ACE_SOCK_Dgram::send(), OpenDDS::DCPS::UdpInst::send_buffer_size_, send_strategy_, ACE_INET_Addr::set(), OpenDDS::DCPS::DataLink::set_dscp_codepoint(), ACE_SOCK::set_option(), size, socket(), socket_, OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::TRANSPORT_CONTROL, OpenDDS::DCPS::DataLink::transport_priority(), VDBG, VDBG_LVL, and ACE_Time_Value::zero.
00047 { 00048 this->remote_address_ = remote_address; 00049 00050 UdpInst& config = static_cast<UdpTransport&>(this->impl()).config(); 00051 00052 this->is_loopback_ = this->remote_address_ == config.local_address(); 00053 00054 ACE_INET_Addr local_address; 00055 if (this->active_) { 00056 if (local_address.get_type() != remote_address.get_type()) { 00057 local_address.set(0, "", 0, remote_address.get_type()); 00058 } 00059 } else { 00060 local_address = config.local_address(); 00061 } 00062 00063 if (!open_appropriate_socket_type(this->socket_, local_address)) { 00064 ACE_ERROR_RETURN((LM_ERROR, 00065 ACE_TEXT("(%P|%t) ERROR: ") 00066 ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")), 00067 false); 00068 } 00069 00070 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n", 00071 local_address.get_host_addr(), local_address.get_port_number())); 00072 00073 // If listening on "any" host/port, need to record the actual port number 00074 // selected by the OS, as well as our actual hostname, into the config_ 00075 // object's local_address_ for use in UdpTransport::connection_info_i(). 00076 if (!this->active_ && config.local_address().is_any()) { 00077 ACE_INET_Addr address; 00078 if (this->socket_.get_local_addr(address) != 0) { 00079 ACE_ERROR_RETURN((LM_ERROR, 00080 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"), 00081 ACE_TEXT("cannot get local addr\n")), false); 00082 } 00083 const unsigned short port = address.get_port_number(); 00084 const std::string hostname = get_fully_qualified_hostname(); 00085 VDBG_LVL((LM_DEBUG, 00086 ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"), 00087 hostname.c_str(), port), 2); 00088 00089 config.local_address(port, hostname.c_str()); 00090 00091 // Similar case to the "if" case above, but with a bound host/IP but no port 00092 } else if (!this->active_ && 00093 0 == config.local_address().get_port_number()) { 00094 ACE_INET_Addr address; 00095 if (this->socket_.get_local_addr(address) != 0) { 00096 ACE_ERROR_RETURN((LM_ERROR, 00097 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"), 00098 ACE_TEXT("cannot get local addr\n")), false); 00099 } 00100 const unsigned short port = address.get_port_number(); 00101 VDBG_LVL((LM_DEBUG, 00102 ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"), 00103 port), 2); 00104 config.local_address_set_port(port); 00105 } 00106 00107 if (config.send_buffer_size_ > 0) { 00108 int snd_size = config.send_buffer_size_; 00109 if (this->socket_.set_option(SOL_SOCKET, 00110 SO_SNDBUF, 00111 (void *) &snd_size, 00112 sizeof(snd_size)) < 0 00113 && errno != ENOTSUP) { 00114 ACE_ERROR_RETURN((LM_ERROR, 00115 ACE_TEXT("(%P|%t) ERROR: ") 00116 ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"), 00117 snd_size), 00118 false); 00119 } 00120 } 00121 00122 if (config.rcv_buffer_size_ > 0) { 00123 int rcv_size = config.rcv_buffer_size_; 00124 if (this->socket_.set_option(SOL_SOCKET, 00125 SO_RCVBUF, 00126 (void *) &rcv_size, 00127 sizeof(int)) < 0 00128 && errno != ENOTSUP) { 00129 ACE_ERROR_RETURN((LM_ERROR, 00130 ACE_TEXT("(%P|%t) ERROR: ") 00131 ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"), 00132 rcv_size), 00133 false); 00134 } 00135 } 00136 00137 #ifdef ACE_WIN32 00138 // By default Winsock will cause reads to fail with "connection reset" 00139 // when UDP sends result in ICMP "port unreachable" messages. 00140 // The transport framework is not set up for this since returning <= 0 00141 // from our receive_bytes causes the framework to close down the datalink 00142 // which in this case is used to receive from multiple peers. 00143 BOOL recv_udp_connreset = FALSE; 00144 socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset); 00145 #endif 00146 00147 if (this->active_) { 00148 // Set the DiffServ codepoint according to the priority value. 00149 DirectPriorityMapper mapper(this->transport_priority()); 00150 this->set_dscp_codepoint(mapper.codepoint(), this->socket_); 00151 00152 00153 // For the active side, send the blob and wait for a 1 byte ack. 00154 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n", 00155 remote_address.get_host_addr(), remote_address.get_port_number())); 00156 00157 TransportLocator info; 00158 this->impl().connection_info_i(info); 00159 ACE_Message_Block* data_block; 00160 ACE_NEW_RETURN(data_block, 00161 ACE_Message_Block(info.data.length()+sizeof(Priority), 00162 ACE_Message_Block::MB_DATA, 00163 0, //cont 00164 0, //data 00165 0, //allocator_strategy 00166 0, //locking_strategy 00167 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00168 ACE_Time_Value::zero, 00169 ACE_Time_Value::max_time, 00170 0, 00171 0), 00172 0); 00173 00174 Serializer serializer(data_block); 00175 serializer << this->transport_priority(); 00176 serializer.write_octet_array(info.data.get_buffer(), 00177 info.data.length()); 00178 00179 DataSampleHeader sample_header; 00180 sample_header.message_id_ = TRANSPORT_CONTROL; 00181 sample_header.message_length_ = 00182 static_cast<ACE_UINT32>(data_block->length()); 00183 ACE_Message_Block* sample_header_block; 00184 ACE_NEW_RETURN(sample_header_block, 00185 ACE_Message_Block(DataSampleHeader::max_marshaled_size(), 00186 ACE_Message_Block::MB_DATA, 00187 0, //cont 00188 0, //data 00189 0, //allocator_strategy 00190 0, //locking_strategy 00191 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00192 ACE_Time_Value::zero, 00193 ACE_Time_Value::max_time, 00194 0, 00195 0), 00196 0); 00197 *sample_header_block << sample_header; 00198 sample_header_block->cont(data_block); 00199 00200 ACE_Message_Block* transport_header_block; 00201 TransportHeader transport_header; 00202 ACE_NEW_RETURN(transport_header_block, 00203 ACE_Message_Block(TransportHeader::max_marshaled_size(), 00204 ACE_Message_Block::MB_DATA, 00205 0, 00206 0, 00207 0, 00208 0, 00209 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00210 ACE_Time_Value::zero, 00211 ACE_Time_Value::max_time, 00212 0, 00213 0), 00214 0); 00215 00216 transport_header.length_ = 00217 static_cast<ACE_UINT32>(data_block->length() + 00218 sample_header_block->length()); 00219 *transport_header_block << transport_header; 00220 transport_header_block->cont(sample_header_block); 00221 00222 iovec iov[MAX_SEND_BLOCKS]; 00223 const int num_blocks = 00224 TransportSendStrategy::mb_to_iov(*transport_header_block, iov); 00225 const ssize_t sent = socket().send(iov, num_blocks, remote_address); 00226 transport_header_block->release(); 00227 if (sent < 0) { 00228 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00229 ACE_TEXT("failed to send handshake %m\n")), 00230 false); 00231 } 00232 00233 // Need to wait for the 1 byte ack from the passive side before returning 00234 // the link (and indicating success). 00235 const size_t size = 32; 00236 char buff[size]; 00237 // Default this timeout to 30. We may want to make this settable 00238 // or use another settable timeout value here. 00239 ACE_Time_Value tv(30); 00240 const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv); 00241 if (recvd == 1) { 00242 // Expected value 00243 VDBG_LVL((LM_DEBUG, 00244 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")), 00245 2); 00246 } else if (recvd < 0) { 00247 // Not a handshake ack, something is wrong 00248 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00249 ACE_TEXT("failed to receive handshake ack %p\n"), 00250 ACE_TEXT("recv")), false); 00251 } else { 00252 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00253 ACE_TEXT("failed to receive handshake ack ") 00254 ACE_TEXT("recv returned %b\n"), recvd), 00255 false); 00256 } 00257 } 00258 00259 if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_), 00260 static_rchandle_cast<TransportStrategy>(this->recv_strategy_)) 00261 != 0) { 00262 stop_i(); 00263 ACE_ERROR_RETURN((LM_ERROR, 00264 ACE_TEXT("(%P|%t) ERROR: ") 00265 ACE_TEXT("UdpDataLink::open: start failed!\n")), 00266 false); 00267 } 00268 00269 return true; 00270 }
ACE_INLINE TransportReactorTask * OpenDDS::DCPS::UdpDataLink::reactor_task | ( | ) |
Definition at line 21 of file UdpDataLink.inl.
References reactor_task_.
00022 { 00023 return this->reactor_task_; 00024 }
ACE_INLINE ACE_INET_Addr & OpenDDS::DCPS::UdpDataLink::remote_address | ( | ) |
Definition at line 34 of file UdpDataLink.inl.
References remote_address_.
Referenced by OpenDDS::DCPS::UdpSendStrategy::send_bytes_i().
00035 { 00036 return this->remote_address_; 00037 }
ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::UdpDataLink::socket | ( | void | ) |
Definition at line 40 of file UdpDataLink.inl.
References socket_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::get_handle(), open(), OpenDDS::DCPS::UdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::UdpSendStrategy::send_bytes_i(), and OpenDDS::DCPS::UdpReceiveStrategy::start_i().
00041 { 00042 return this->socket_; 00043 }
void OpenDDS::DCPS::UdpDataLink::stop_i | ( | ) | [protected, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 284 of file UdpDataLink.cpp.
References ACE_SOCK::close(), and socket_.
Referenced by open().
bool OpenDDS::DCPS::UdpDataLink::active_ [protected] |
Definition at line 58 of file UdpDataLink.h.
Definition at line 60 of file UdpDataLink.h.
Referenced by get_reactor(), and reactor_task().
Definition at line 63 of file UdpDataLink.h.
Referenced by open().
Definition at line 68 of file UdpDataLink.h.
Referenced by open(), and remote_address().
The transport send strategy object for this DataLink.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 62 of file UdpDataLink.h.
Referenced by open().
Definition at line 70 of file UdpDataLink.h.