#include <UdpDataLink.h>
Inheritance diagram for OpenDDS::DCPS::UdpDataLink:
Public Member Functions | |
UdpDataLink (UdpTransport *transport, Priority priority, bool active) | |
void | configure (UdpInst *config, TransportReactorTask *reactor_task) |
void | send_strategy (UdpSendStrategy *send_strategy) |
void | receive_strategy (UdpReceiveStrategy *recv_strategy) |
bool | active () const |
UdpInst * | config () |
TransportReactorTask * | reactor_task () |
ACE_Reactor * | get_reactor () |
ACE_INET_Addr & | remote_address () |
ACE_SOCK_Dgram & | socket () |
bool | open (const ACE_INET_Addr &remote_address) |
void | control_received (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address) |
Protected Member Functions | |
virtual void | stop_i () |
Protected Attributes | |
bool | active_ |
UdpInst * | config_ |
TransportReactorTask * | reactor_task_ |
UdpSendStrategy_rch | send_strategy_ |
The transport send strategy object for this DataLink. | |
UdpReceiveStrategy_rch | recv_strategy_ |
Private Attributes | |
ACE_INET_Addr | remote_address_ |
ACE_SOCK_Dgram | socket_ |
Definition at line 31 of file UdpDataLink.h.
OpenDDS::DCPS::UdpDataLink::UdpDataLink | ( | UdpTransport * | transport, | |
Priority | priority, | |||
bool | active | |||
) |
Definition at line 25 of file UdpDataLink.cpp.
00028 : DataLink(transport, 00029 priority, 00030 false, // is_loopback, 00031 active),// is_active 00032 active_(active), 00033 config_(0), 00034 reactor_task_(0) 00035 { 00036 }
ACE_INLINE bool OpenDDS::DCPS::UdpDataLink::active | ( | ) | const |
Definition at line 32 of file UdpDataLink.inl.
References active_.
00033 { 00034 return this->active_; 00035 }
ACE_INLINE UdpInst * OpenDDS::DCPS::UdpDataLink::config | ( | ) |
Definition at line 38 of file UdpDataLink.inl.
References config_.
Referenced by configure().
00039 { 00040 return this->config_; 00041 }
ACE_INLINE void OpenDDS::DCPS::UdpDataLink::configure | ( | UdpInst * | config, | |
TransportReactorTask * | reactor_task | |||
) |
Definition at line 12 of file UdpDataLink.inl.
References config(), config_, reactor_task(), and reactor_task_.
00014 { 00015 this->config_ = config; 00016 this->reactor_task_ = reactor_task; 00017 }
void OpenDDS::DCPS::UdpDataLink::control_received | ( | ReceivedDataSample & | sample, | |
const ACE_INET_Addr & | remote_address | |||
) |
Definition at line 262 of file UdpDataLink.cpp.
References OpenDDS::DCPS::DataLink::impl(), and OpenDDS::DCPS::ReceivedDataSample::sample_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample().
00264 { 00265 TransportImpl_rch impl = this->impl(); 00266 RcHandle<UdpTransport> ut = static_rchandle_cast<UdpTransport>(impl); 00267 // At this time, the TRANSPORT_CONTROL messages in Udp are only used for 00268 // the connection handshaking, so receiving one is an indication of the 00269 // passive_connection event. In the future the submessage_id_ could be used 00270 // to allow different types of messages here. 00271 ut->passive_connection(remote_address, sample.sample_); 00272 }
ACE_INLINE ACE_Reactor * OpenDDS::DCPS::UdpDataLink::get_reactor | ( | ) |
Definition at line 50 of file UdpDataLink.inl.
References OpenDDS::DCPS::TransportReactorTask::get_reactor(), and reactor_task_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::start_i(), and OpenDDS::DCPS::UdpReceiveStrategy::stop_i().
00051 { 00052 if (this->reactor_task_ == 0) return 0; 00053 return this->reactor_task_->get_reactor(); 00054 }
bool OpenDDS::DCPS::UdpDataLink::open | ( | const ACE_INET_Addr & | remote_address | ) |
Definition at line 39 of file UdpDataLink.cpp.
References config_, OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::get_fully_qualified_hostname(), OpenDDS::DCPS::DataLink::impl(), OpenDDS::DCPS::DataLink::is_loopback_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::UdpInst::local_address(), OpenDDS::DCPS::UdpInst::local_address_set_port(), OpenDDS::DCPS::TransportHeader::max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::open_appropriate_socket_type(), OpenDDS::DCPS::UdpInst::rcv_buffer_size_, remote_address_, OpenDDS::DCPS::UdpInst::send_buffer_size_, OpenDDS::DCPS::DataLink::set_dscp_codepoint(), socket(), socket_, OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::TRANSPORT_CONTROL, OpenDDS::DCPS::DataLink::transport_priority(), VDBG, and VDBG_LVL.
00040 { 00041 this->remote_address_ = remote_address; 00042 this->is_loopback_ = this->remote_address_ == this->config_->local_address(); 00043 00044 ACE_INET_Addr local_address; 00045 if (this->active_) { 00046 if (local_address.get_type() != remote_address.get_type()) { 00047 local_address.set(0, "", 0, remote_address.get_type()); 00048 } 00049 } else { 00050 local_address = this->config_->local_address(); 00051 } 00052 00053 if (!open_appropriate_socket_type(this->socket_, local_address)) { 00054 ACE_ERROR_RETURN((LM_ERROR, 00055 ACE_TEXT("(%P|%t) ERROR: ") 00056 ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")), 00057 false); 00058 } 00059 00060 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n", 00061 local_address.get_host_addr(), local_address.get_port_number())); 00062 00063 // If listening on "any" host/port, need to record the actual port number 00064 // selected by the OS, as well as our actual hostname, into the config_ 00065 // object's local_address_ for use in UdpTransport::connection_info_i(). 00066 if (!this->active_ && this->config_->local_address().is_any()) { 00067 ACE_INET_Addr address; 00068 if (this->socket_.get_local_addr(address) != 0) { 00069 ACE_ERROR_RETURN((LM_ERROR, 00070 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"), 00071 ACE_TEXT("cannot get local addr\n")), false); 00072 } 00073 const unsigned short port = address.get_port_number(); 00074 const std::string hostname = get_fully_qualified_hostname(); 00075 VDBG_LVL((LM_DEBUG, 00076 ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"), 00077 hostname.c_str(), port), 2); 00078 this->config_->local_address(port, hostname.c_str()); 00079 00080 // Similar case to the "if" case above, but with a bound host/IP but no port 00081 } else if (!this->active_ && 00082 0 == this->config_->local_address().get_port_number()) { 00083 ACE_INET_Addr address; 00084 if (this->socket_.get_local_addr(address) != 0) { 00085 ACE_ERROR_RETURN((LM_ERROR, 00086 ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"), 00087 ACE_TEXT("cannot get local addr\n")), false); 00088 } 00089 const unsigned short port = address.get_port_number(); 00090 VDBG_LVL((LM_DEBUG, 00091 ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"), 00092 port), 2); 00093 this->config_->local_address_set_port(port); 00094 } 00095 00096 if (this->config_->send_buffer_size_ > 0) { 00097 int snd_size = this->config_->send_buffer_size_; 00098 if (this->socket_.set_option(SOL_SOCKET, 00099 SO_SNDBUF, 00100 (void *) &snd_size, 00101 sizeof(snd_size)) < 0 00102 && errno != ENOTSUP) { 00103 ACE_ERROR_RETURN((LM_ERROR, 00104 ACE_TEXT("(%P|%t) ERROR: ") 00105 ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"), 00106 snd_size), 00107 false); 00108 } 00109 } 00110 00111 if (this->config_->send_buffer_size_ > 0) { 00112 int rcv_size = this->config_->rcv_buffer_size_; 00113 if (this->socket_.set_option(SOL_SOCKET, 00114 SO_RCVBUF, 00115 (void *) &rcv_size, 00116 sizeof(int)) < 0 00117 && errno != ENOTSUP) { 00118 ACE_ERROR_RETURN((LM_ERROR, 00119 ACE_TEXT("(%P|%t) ERROR: ") 00120 ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"), 00121 rcv_size), 00122 false); 00123 } 00124 } 00125 00126 #ifdef ACE_WIN32 00127 // By default Winsock will cause reads to fail with "connection reset" 00128 // when UDP sends result in ICMP "port unreachable" messages. 00129 // The transport framework is not set up for this since returning <= 0 00130 // from our receive_bytes causes the framework to close down the datalink 00131 // which in this case is used to receive from multiple peers. 00132 BOOL recv_udp_connreset = FALSE; 00133 socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset); 00134 #endif 00135 00136 if (this->active_) { 00137 // Set the DiffServ codepoint according to the priority value. 00138 DirectPriorityMapper mapper(this->transport_priority()); 00139 this->set_dscp_codepoint(mapper.codepoint(), this->socket_); 00140 00141 00142 // For the active side, send the blob and wait for a 1 byte ack. 00143 VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n", 00144 remote_address.get_host_addr(), remote_address.get_port_number())); 00145 00146 TransportLocator info; 00147 this->impl()->connection_info_i(info); 00148 ACE_Message_Block* data_block; 00149 ACE_NEW_RETURN(data_block, 00150 ACE_Message_Block(info.data.length()+sizeof(Priority), 00151 ACE_Message_Block::MB_DATA, 00152 0, //cont 00153 0, //data 00154 0, //allocator_strategy 00155 0, //locking_strategy 00156 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00157 ACE_Time_Value::zero, 00158 ACE_Time_Value::max_time, 00159 0, 00160 0), 00161 0); 00162 00163 Serializer serializer(data_block); 00164 serializer << this->transport_priority(); 00165 serializer.write_octet_array(info.data.get_buffer(), 00166 info.data.length()); 00167 00168 DataSampleHeader sample_header; 00169 sample_header.message_id_ = TRANSPORT_CONTROL; 00170 sample_header.message_length_ = 00171 static_cast<ACE_UINT32>(data_block->length()); 00172 ACE_Message_Block* sample_header_block; 00173 ACE_NEW_RETURN(sample_header_block, 00174 ACE_Message_Block(DataSampleHeader::max_marshaled_size(), 00175 ACE_Message_Block::MB_DATA, 00176 0, //cont 00177 0, //data 00178 0, //allocator_strategy 00179 0, //locking_strategy 00180 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00181 ACE_Time_Value::zero, 00182 ACE_Time_Value::max_time, 00183 0, 00184 0), 00185 0); 00186 *sample_header_block << sample_header; 00187 sample_header_block->cont(data_block); 00188 00189 ACE_Message_Block* transport_header_block; 00190 TransportHeader transport_header; 00191 ACE_NEW_RETURN(transport_header_block, 00192 ACE_Message_Block(TransportHeader::max_marshaled_size(), 00193 ACE_Message_Block::MB_DATA, 00194 0, 00195 0, 00196 0, 00197 0, 00198 ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY, 00199 ACE_Time_Value::zero, 00200 ACE_Time_Value::max_time, 00201 0, 00202 0), 00203 0); 00204 00205 transport_header.length_ = 00206 static_cast<ACE_UINT32>(data_block->length() + 00207 sample_header_block->length()); 00208 *transport_header_block << transport_header; 00209 transport_header_block->cont(sample_header_block); 00210 00211 iovec iov[MAX_SEND_BLOCKS]; 00212 const int num_blocks = 00213 TransportSendStrategy::mb_to_iov(*transport_header_block, iov); 00214 const ssize_t sent = socket().send(iov, num_blocks, remote_address); 00215 transport_header_block->release(); 00216 if (sent < 0) { 00217 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00218 ACE_TEXT("failed to send handshake %m\n")), 00219 false); 00220 } 00221 00222 // Need to wait for the 1 byte ack from the passive side before returning 00223 // the link (and indicating success). 00224 const size_t size = 32; 00225 char buff[size]; 00226 // Default this timeout to 30. We may want to make this settable 00227 // or use another settable timeout value here. 00228 ACE_Time_Value tv(30); 00229 const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv); 00230 if (recvd == 1) { 00231 // Expected value 00232 VDBG_LVL((LM_DEBUG, 00233 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")), 00234 2); 00235 } else if (recvd < 0) { 00236 // Not a handshake ack, something is wrong 00237 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00238 ACE_TEXT("failed to receive handshake ack %p\n"), 00239 ACE_TEXT("recv")), false); 00240 } else { 00241 ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ") 00242 ACE_TEXT("failed to receive handshake ack ") 00243 ACE_TEXT("recv returned %b\n"), recvd), 00244 false); 00245 } 00246 } 00247 00248 if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_), 00249 static_rchandle_cast<TransportStrategy>(this->recv_strategy_)) 00250 != 0) { 00251 stop_i(); 00252 ACE_ERROR_RETURN((LM_ERROR, 00253 ACE_TEXT("(%P|%t) ERROR: ") 00254 ACE_TEXT("UdpDataLink::open: start failed!\n")), 00255 false); 00256 } 00257 00258 return true; 00259 }
ACE_INLINE TransportReactorTask * OpenDDS::DCPS::UdpDataLink::reactor_task | ( | ) |
Definition at line 44 of file UdpDataLink.inl.
References reactor_task_.
Referenced by configure().
00045 { 00046 return this->reactor_task_; 00047 }
ACE_INLINE void OpenDDS::DCPS::UdpDataLink::receive_strategy | ( | UdpReceiveStrategy * | recv_strategy | ) |
Definition at line 26 of file UdpDataLink.inl.
References recv_strategy_.
00027 { 00028 this->recv_strategy_ = recv_strategy; 00029 }
ACE_INLINE ACE_INET_Addr & OpenDDS::DCPS::UdpDataLink::remote_address | ( | ) |
Definition at line 57 of file UdpDataLink.inl.
References remote_address_.
00058 { 00059 return this->remote_address_; 00060 }
ACE_INLINE void OpenDDS::DCPS::UdpDataLink::send_strategy | ( | UdpSendStrategy * | send_strategy | ) |
Definition at line 20 of file UdpDataLink.inl.
References send_strategy_.
00021 { 00022 this->send_strategy_ = send_strategy; 00023 }
ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::UdpDataLink::socket | ( | ) |
Definition at line 63 of file UdpDataLink.inl.
References socket_.
Referenced by OpenDDS::DCPS::UdpReceiveStrategy::get_handle(), open(), OpenDDS::DCPS::UdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::UdpSendStrategy::send_bytes_i(), and OpenDDS::DCPS::UdpReceiveStrategy::start_i().
00064 { 00065 return this->socket_; 00066 }
void OpenDDS::DCPS::UdpDataLink::stop_i | ( | ) | [protected, virtual] |
This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 275 of file UdpDataLink.cpp.
References socket_.
Referenced by open().
00276 { 00277 this->socket_.close(); 00278 }
bool OpenDDS::DCPS::UdpDataLink::active_ [protected] |
UdpInst* OpenDDS::DCPS::UdpDataLink::config_ [protected] |
Definition at line 64 of file UdpDataLink.h.
Referenced by configure(), get_reactor(), and reactor_task().
ACE_INET_Addr OpenDDS::DCPS::UdpDataLink::remote_address_ [private] |
The transport send strategy object for this DataLink.
Reimplemented from OpenDDS::DCPS::DataLink.
Definition at line 66 of file UdpDataLink.h.
Referenced by send_strategy().
ACE_SOCK_Dgram OpenDDS::DCPS::UdpDataLink::socket_ [private] |