OpenDDS::DCPS::UdpDataLink Class Reference

#include <UdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::UdpDataLink:
Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpDataLink:
Collaboration graph
[legend]

List of all members.

Public Member Functions

 UdpDataLink (UdpTransport &transport, Priority priority, TransportReactorTask *reactor_task, bool active)
bool active () const
TransportReactorTaskreactor_task ()
ACE_Reactorget_reactor ()
ACE_INET_Addrremote_address ()
ACE_SOCK_Dgramsocket ()
bool open (const ACE_INET_Addr &remote_address)
void control_received (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)

Protected Member Functions

virtual void stop_i ()

Protected Attributes

bool active_
TransportReactorTaskreactor_task_
UdpSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
UdpReceiveStrategy_rch recv_strategy_

Private Attributes

ACE_INET_Addr remote_address_
ACE_SOCK_Dgram socket_

Detailed Description

Definition at line 34 of file UdpDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::UdpDataLink::UdpDataLink ( UdpTransport transport,
Priority  priority,
TransportReactorTask reactor_task,
bool  active 
)

Definition at line 30 of file UdpDataLink.cpp.

00034   : DataLink(transport,
00035              priority,
00036              false, // is_loopback,
00037              active),// is_active
00038     active_(active),
00039     reactor_task_(reactor_task),
00040     send_strategy_(make_rch<UdpSendStrategy>(this)),
00041     recv_strategy_(make_rch<UdpReceiveStrategy>(this))
00042 {
00043 }


Member Function Documentation

ACE_INLINE bool OpenDDS::DCPS::UdpDataLink::active ( void   )  const

Definition at line 14 of file UdpDataLink.inl.

References active_.

00015 {
00016   return this->active_;
00017 }

void OpenDDS::DCPS::UdpDataLink::control_received ( ReceivedDataSample sample,
const ACE_INET_Addr remote_address 
)

Definition at line 273 of file UdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::impl(), and OpenDDS::DCPS::ReceivedDataSample::sample_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample().

00275 {
00276   // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
00277   // the connection handshaking, so receiving one is an indication of the
00278   // passive_connection event.  In the future the submessage_id_ could be used
00279   // to allow different types of messages here.
00280   static_cast<UdpTransport&>(this->impl()).passive_connection(remote_address, sample.sample_);
00281 }

Here is the call graph for this function:

Here is the caller graph for this function:

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::UdpDataLink::get_reactor ( void   ) 

Definition at line 27 of file UdpDataLink.inl.

References OpenDDS::DCPS::TransportReactorTask::get_reactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::start_i(), and OpenDDS::DCPS::UdpReceiveStrategy::stop_i().

00028 {
00029   if (this->reactor_task_ == 0) return 0;
00030   return this->reactor_task_->get_reactor();
00031 }

Here is the call graph for this function:

Here is the caller graph for this function:

bool OpenDDS::DCPS::UdpDataLink::open ( const ACE_INET_Addr remote_address  ) 

Definition at line 46 of file UdpDataLink.cpp.

References ACE_NEW_RETURN(), ACE_TEXT(), active_, OpenDDS::DCPS::DirectPriorityMapper::codepoint(), OpenDDS::DCPS::TransportImpl::connection_info_i(), ACE_Message_Block::cont(), ACE_IPC_SAP::control(), OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::get_fully_qualified_hostname(), ACE_INET_Addr::get_host_addr(), ACE_SOCK::get_local_addr(), ACE_INET_Addr::get_port_number(), ACE_Addr::get_type(), hostname(), OpenDDS::DCPS::DataLink::impl(), ACE_INET_Addr::is_any(), OpenDDS::DCPS::DataLink::is_loopback_, ACE_Message_Block::length(), OpenDDS::DCPS::TransportHeader::length_, LM_DEBUG, LM_ERROR, OpenDDS::DCPS::UdpInst::local_address(), OpenDDS::DCPS::UdpInst::local_address_set_port(), OpenDDS::DCPS::TransportHeader::max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::MAX_SEND_BLOCKS, ACE_Time_Value::max_time, ACE_Message_Block::MB_DATA, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::open_appropriate_socket_type(), OpenDDS::DCPS::UdpInst::rcv_buffer_size_, ACE_SOCK_Dgram::recv(), recv_strategy_, ACE_Message_Block::release(), remote_address_, ACE_SOCK_Dgram::send(), OpenDDS::DCPS::UdpInst::send_buffer_size_, send_strategy_, ACE_INET_Addr::set(), OpenDDS::DCPS::DataLink::set_dscp_codepoint(), ACE_SOCK::set_option(), size, socket(), socket_, OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::TRANSPORT_CONTROL, OpenDDS::DCPS::DataLink::transport_priority(), VDBG, VDBG_LVL, and ACE_Time_Value::zero.

00047 {
00048   this->remote_address_ = remote_address;
00049 
00050   UdpInst& config = static_cast<UdpTransport&>(this->impl()).config();
00051 
00052   this->is_loopback_ = this->remote_address_ == config.local_address();
00053 
00054   ACE_INET_Addr local_address;
00055   if (this->active_) {
00056     if (local_address.get_type() != remote_address.get_type()) {
00057       local_address.set(0, "", 0, remote_address.get_type());
00058     }
00059   } else {
00060     local_address = config.local_address();
00061   }
00062 
00063   if (!open_appropriate_socket_type(this->socket_, local_address)) {
00064     ACE_ERROR_RETURN((LM_ERROR,
00065       ACE_TEXT("(%P|%t) ERROR: ")
00066       ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00067       false);
00068   }
00069 
00070   VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00071         local_address.get_host_addr(), local_address.get_port_number()));
00072 
00073   // If listening on "any" host/port, need to record the actual port number
00074   // selected by the OS, as well as our actual hostname, into the config_
00075   // object's local_address_ for use in UdpTransport::connection_info_i().
00076   if (!this->active_ && config.local_address().is_any()) {
00077     ACE_INET_Addr address;
00078     if (this->socket_.get_local_addr(address) != 0) {
00079       ACE_ERROR_RETURN((LM_ERROR,
00080                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00081                         ACE_TEXT("cannot get local addr\n")), false);
00082     }
00083     const unsigned short port = address.get_port_number();
00084     const std::string hostname = get_fully_qualified_hostname();
00085     VDBG_LVL((LM_DEBUG,
00086               ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00087               hostname.c_str(), port), 2);
00088 
00089     config.local_address(port, hostname.c_str());
00090 
00091   // Similar case to the "if" case above, but with a bound host/IP but no port
00092   } else if (!this->active_ &&
00093              0 == config.local_address().get_port_number()) {
00094     ACE_INET_Addr address;
00095     if (this->socket_.get_local_addr(address) != 0) {
00096       ACE_ERROR_RETURN((LM_ERROR,
00097                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00098                         ACE_TEXT("cannot get local addr\n")), false);
00099     }
00100     const unsigned short port = address.get_port_number();
00101     VDBG_LVL((LM_DEBUG,
00102               ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00103               port), 2);
00104     config.local_address_set_port(port);
00105   }
00106 
00107   if (config.send_buffer_size_ > 0) {
00108     int snd_size = config.send_buffer_size_;
00109     if (this->socket_.set_option(SOL_SOCKET,
00110                                 SO_SNDBUF,
00111                                 (void *) &snd_size,
00112                                 sizeof(snd_size)) < 0
00113         && errno != ENOTSUP) {
00114       ACE_ERROR_RETURN((LM_ERROR,
00115                         ACE_TEXT("(%P|%t) ERROR: ")
00116                         ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00117                         snd_size),
00118                        false);
00119     }
00120   }
00121 
00122   if (config.rcv_buffer_size_ > 0) {
00123     int rcv_size = config.rcv_buffer_size_;
00124     if (this->socket_.set_option(SOL_SOCKET,
00125                                 SO_RCVBUF,
00126                                 (void *) &rcv_size,
00127                                 sizeof(int)) < 0
00128         && errno != ENOTSUP) {
00129       ACE_ERROR_RETURN((LM_ERROR,
00130                         ACE_TEXT("(%P|%t) ERROR: ")
00131                         ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00132                         rcv_size),
00133                        false);
00134     }
00135   }
00136 
00137 #ifdef ACE_WIN32
00138   // By default Winsock will cause reads to fail with "connection reset"
00139   // when UDP sends result in ICMP "port unreachable" messages.
00140   // The transport framework is not set up for this since returning <= 0
00141   // from our receive_bytes causes the framework to close down the datalink
00142   // which in this case is used to receive from multiple peers.
00143   BOOL recv_udp_connreset = FALSE;
00144   socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00145 #endif
00146 
00147   if (this->active_) {
00148     // Set the DiffServ codepoint according to the priority value.
00149     DirectPriorityMapper mapper(this->transport_priority());
00150     this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00151 
00152 
00153     // For the active side, send the blob and wait for a 1 byte ack.
00154     VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00155       remote_address.get_host_addr(), remote_address.get_port_number()));
00156 
00157     TransportLocator info;
00158     this->impl().connection_info_i(info);
00159     ACE_Message_Block* data_block;
00160     ACE_NEW_RETURN(data_block,
00161                    ACE_Message_Block(info.data.length()+sizeof(Priority),
00162                                      ACE_Message_Block::MB_DATA,
00163                                      0, //cont
00164                                      0, //data
00165                                      0, //allocator_strategy
00166                                      0, //locking_strategy
00167                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00168                                      ACE_Time_Value::zero,
00169                                      ACE_Time_Value::max_time,
00170                                      0,
00171                                      0),
00172                    0);
00173 
00174     Serializer serializer(data_block);
00175     serializer << this->transport_priority();
00176     serializer.write_octet_array(info.data.get_buffer(),
00177                                  info.data.length());
00178 
00179     DataSampleHeader sample_header;
00180     sample_header.message_id_ = TRANSPORT_CONTROL;
00181     sample_header.message_length_ =
00182       static_cast<ACE_UINT32>(data_block->length());
00183     ACE_Message_Block* sample_header_block;
00184     ACE_NEW_RETURN(sample_header_block,
00185                    ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00186                                      ACE_Message_Block::MB_DATA,
00187                                      0, //cont
00188                                      0, //data
00189                                      0, //allocator_strategy
00190                                      0, //locking_strategy
00191                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00192                                      ACE_Time_Value::zero,
00193                                      ACE_Time_Value::max_time,
00194                                      0,
00195                                      0),
00196                    0);
00197     *sample_header_block << sample_header;
00198     sample_header_block->cont(data_block);
00199 
00200     ACE_Message_Block* transport_header_block;
00201     TransportHeader transport_header;
00202     ACE_NEW_RETURN(transport_header_block,
00203                    ACE_Message_Block(TransportHeader::max_marshaled_size(),
00204                                      ACE_Message_Block::MB_DATA,
00205                                      0,
00206                                      0,
00207                                      0,
00208                                      0,
00209                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00210                                      ACE_Time_Value::zero,
00211                                      ACE_Time_Value::max_time,
00212                                      0,
00213                                      0),
00214                    0);
00215 
00216     transport_header.length_ =
00217       static_cast<ACE_UINT32>(data_block->length() +
00218                               sample_header_block->length());
00219     *transport_header_block << transport_header;
00220     transport_header_block->cont(sample_header_block);
00221 
00222     iovec iov[MAX_SEND_BLOCKS];
00223     const int num_blocks =
00224       TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00225     const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00226     transport_header_block->release();
00227     if (sent < 0) {
00228       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00229                                   ACE_TEXT("failed to send handshake %m\n")),
00230                        false);
00231     }
00232 
00233     // Need to wait for the 1 byte ack from the passive side before returning
00234     // the link (and indicating success).
00235     const size_t size = 32;
00236     char buff[size];
00237     // Default this timeout to 30.  We may want to make this settable
00238     // or use another settable timeout value here.
00239     ACE_Time_Value tv(30);
00240     const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00241     if (recvd == 1) {
00242       // Expected value
00243       VDBG_LVL((LM_DEBUG,
00244                 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00245                2);
00246     } else if (recvd < 0) {
00247       // Not a handshake ack, something is wrong
00248       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00249                                   ACE_TEXT("failed to receive handshake ack %p\n"),
00250                         ACE_TEXT("recv")), false);
00251     } else {
00252       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00253                                   ACE_TEXT("failed to receive handshake ack ")
00254                                   ACE_TEXT("recv returned %b\n"), recvd),
00255                        false);
00256     }
00257   }
00258 
00259   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00260             static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00261       != 0) {
00262     stop_i();
00263     ACE_ERROR_RETURN((LM_ERROR,
00264                       ACE_TEXT("(%P|%t) ERROR: ")
00265                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00266                      false);
00267   }
00268 
00269   return true;
00270 }

Here is the call graph for this function:

ACE_INLINE TransportReactorTask * OpenDDS::DCPS::UdpDataLink::reactor_task (  ) 

Definition at line 21 of file UdpDataLink.inl.

References reactor_task_.

00022 {
00023   return this->reactor_task_;
00024 }

ACE_INLINE ACE_INET_Addr & OpenDDS::DCPS::UdpDataLink::remote_address (  ) 

Definition at line 34 of file UdpDataLink.inl.

References remote_address_.

Referenced by OpenDDS::DCPS::UdpSendStrategy::send_bytes_i().

00035 {
00036   return this->remote_address_;
00037 }

Here is the caller graph for this function:

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::UdpDataLink::socket ( void   ) 

Definition at line 40 of file UdpDataLink.inl.

References socket_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::get_handle(), open(), OpenDDS::DCPS::UdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::UdpSendStrategy::send_bytes_i(), and OpenDDS::DCPS::UdpReceiveStrategy::start_i().

00041 {
00042   return this->socket_;
00043 }

Here is the caller graph for this function:

void OpenDDS::DCPS::UdpDataLink::stop_i (  )  [protected, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 284 of file UdpDataLink.cpp.

References ACE_SOCK::close(), and socket_.

Referenced by open().

00285 {
00286   this->socket_.close();
00287 }

Here is the call graph for this function:

Here is the caller graph for this function:


Member Data Documentation

Definition at line 58 of file UdpDataLink.h.

Referenced by active(), and open().

Definition at line 60 of file UdpDataLink.h.

Referenced by get_reactor(), and reactor_task().

Definition at line 63 of file UdpDataLink.h.

Referenced by open().

Definition at line 68 of file UdpDataLink.h.

Referenced by open(), and remote_address().

The transport send strategy object for this DataLink.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 62 of file UdpDataLink.h.

Referenced by open().

Definition at line 70 of file UdpDataLink.h.

Referenced by open(), socket(), and stop_i().


The documentation for this class was generated from the following files:
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1