OpenDDS::DCPS::UdpDataLink Class Reference

#include <UdpDataLink.h>

Inheritance diagram for OpenDDS::DCPS::UdpDataLink:

Inheritance graph
[legend]
Collaboration diagram for OpenDDS::DCPS::UdpDataLink:

Collaboration graph
[legend]
List of all members.

Public Member Functions

 UdpDataLink (UdpTransport *transport, Priority priority, bool active)
void configure (UdpInst *config, TransportReactorTask *reactor_task)
void send_strategy (UdpSendStrategy *send_strategy)
void receive_strategy (UdpReceiveStrategy *recv_strategy)
bool active () const
UdpInstconfig ()
TransportReactorTaskreactor_task ()
ACE_Reactor * get_reactor ()
ACE_INET_Addr & remote_address ()
ACE_SOCK_Dgram & socket ()
bool open (const ACE_INET_Addr &remote_address)
void control_received (ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)

Protected Member Functions

virtual void stop_i ()

Protected Attributes

bool active_
UdpInstconfig_
TransportReactorTaskreactor_task_
UdpSendStrategy_rch send_strategy_
 The transport send strategy object for this DataLink.
UdpReceiveStrategy_rch recv_strategy_

Private Attributes

ACE_INET_Addr remote_address_
ACE_SOCK_Dgram socket_

Detailed Description

Definition at line 31 of file UdpDataLink.h.


Constructor & Destructor Documentation

OpenDDS::DCPS::UdpDataLink::UdpDataLink ( UdpTransport transport,
Priority  priority,
bool  active 
)

Definition at line 25 of file UdpDataLink.cpp.

00028   : DataLink(transport,
00029              priority,
00030              false, // is_loopback,
00031              active),// is_active
00032     active_(active),
00033     config_(0),
00034     reactor_task_(0)
00035 {
00036 }


Member Function Documentation

ACE_INLINE bool OpenDDS::DCPS::UdpDataLink::active (  )  const

Definition at line 32 of file UdpDataLink.inl.

References active_.

00033 {
00034   return this->active_;
00035 }

ACE_INLINE UdpInst * OpenDDS::DCPS::UdpDataLink::config (  ) 

Definition at line 38 of file UdpDataLink.inl.

References config_.

Referenced by configure().

00039 {
00040   return this->config_;
00041 }

ACE_INLINE void OpenDDS::DCPS::UdpDataLink::configure ( UdpInst config,
TransportReactorTask reactor_task 
)

Definition at line 12 of file UdpDataLink.inl.

References config(), config_, reactor_task(), and reactor_task_.

00014 {
00015   this->config_ = config;
00016   this->reactor_task_ = reactor_task;
00017 }

void OpenDDS::DCPS::UdpDataLink::control_received ( ReceivedDataSample sample,
const ACE_INET_Addr &  remote_address 
)

Definition at line 262 of file UdpDataLink.cpp.

References OpenDDS::DCPS::DataLink::impl(), and OpenDDS::DCPS::ReceivedDataSample::sample_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::deliver_sample().

00264 {
00265   TransportImpl_rch impl = this->impl();
00266   RcHandle<UdpTransport> ut = static_rchandle_cast<UdpTransport>(impl);
00267   // At this time, the TRANSPORT_CONTROL messages in Udp are only used for
00268   // the connection handshaking, so receiving one is an indication of the
00269   // passive_connection event.  In the future the submessage_id_ could be used
00270   // to allow different types of messages here.
00271   ut->passive_connection(remote_address, sample.sample_);
00272 }

ACE_INLINE ACE_Reactor * OpenDDS::DCPS::UdpDataLink::get_reactor (  ) 

Definition at line 50 of file UdpDataLink.inl.

References OpenDDS::DCPS::TransportReactorTask::get_reactor(), and reactor_task_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::start_i(), and OpenDDS::DCPS::UdpReceiveStrategy::stop_i().

00051 {
00052   if (this->reactor_task_ == 0) return 0;
00053   return this->reactor_task_->get_reactor();
00054 }

bool OpenDDS::DCPS::UdpDataLink::open ( const ACE_INET_Addr &  remote_address  ) 

Definition at line 39 of file UdpDataLink.cpp.

References config_, OpenDDS::DCPS::TransportLocator::data, OpenDDS::DCPS::get_fully_qualified_hostname(), OpenDDS::DCPS::DataLink::impl(), OpenDDS::DCPS::DataLink::is_loopback_, OpenDDS::DCPS::TransportHeader::length_, OpenDDS::DCPS::UdpInst::local_address(), OpenDDS::DCPS::UdpInst::local_address_set_port(), OpenDDS::DCPS::TransportHeader::max_marshaled_size(), OpenDDS::DCPS::DataSampleHeader::max_marshaled_size(), OpenDDS::DCPS::MAX_SEND_BLOCKS, OpenDDS::DCPS::TransportSendStrategy::mb_to_iov(), OpenDDS::DCPS::DataSampleHeader::message_id_, OpenDDS::DCPS::DataSampleHeader::message_length_, OpenDDS::DCPS::open_appropriate_socket_type(), OpenDDS::DCPS::UdpInst::rcv_buffer_size_, remote_address_, OpenDDS::DCPS::UdpInst::send_buffer_size_, OpenDDS::DCPS::DataLink::set_dscp_codepoint(), socket(), socket_, OpenDDS::DCPS::DataLink::start(), stop_i(), OpenDDS::DCPS::TRANSPORT_CONTROL, OpenDDS::DCPS::DataLink::transport_priority(), VDBG, and VDBG_LVL.

00040 {
00041   this->remote_address_ = remote_address;
00042   this->is_loopback_ = this->remote_address_ == this->config_->local_address();
00043 
00044   ACE_INET_Addr local_address;
00045   if (this->active_) {
00046     if (local_address.get_type() != remote_address.get_type()) {
00047       local_address.set(0, "", 0, remote_address.get_type());
00048     }
00049   } else {
00050     local_address = this->config_->local_address();
00051   }
00052 
00053   if (!open_appropriate_socket_type(this->socket_, local_address)) {
00054     ACE_ERROR_RETURN((LM_ERROR,
00055       ACE_TEXT("(%P|%t) ERROR: ")
00056       ACE_TEXT("UdpDataLink::open: open_appropriate_socket_type failed\n")),
00057       false);
00058   }
00059 
00060   VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: listening on %C:%hu\n",
00061         local_address.get_host_addr(), local_address.get_port_number()));
00062 
00063   // If listening on "any" host/port, need to record the actual port number
00064   // selected by the OS, as well as our actual hostname, into the config_
00065   // object's local_address_ for use in UdpTransport::connection_info_i().
00066   if (!this->active_ && this->config_->local_address().is_any()) {
00067     ACE_INET_Addr address;
00068     if (this->socket_.get_local_addr(address) != 0) {
00069       ACE_ERROR_RETURN((LM_ERROR,
00070                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00071                         ACE_TEXT("cannot get local addr\n")), false);
00072     }
00073     const unsigned short port = address.get_port_number();
00074     const std::string hostname = get_fully_qualified_hostname();
00075     VDBG_LVL((LM_DEBUG,
00076               ACE_TEXT("(%P|%t) UdpDataLink::open listening on host %C:%hu\n"),
00077               hostname.c_str(), port), 2);
00078     this->config_->local_address(port, hostname.c_str());
00079 
00080   // Similar case to the "if" case above, but with a bound host/IP but no port
00081   } else if (!this->active_ &&
00082              0 == this->config_->local_address().get_port_number()) {
00083     ACE_INET_Addr address;
00084     if (this->socket_.get_local_addr(address) != 0) {
00085       ACE_ERROR_RETURN((LM_ERROR,
00086                         ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open - %p"),
00087                         ACE_TEXT("cannot get local addr\n")), false);
00088     }
00089     const unsigned short port = address.get_port_number();
00090     VDBG_LVL((LM_DEBUG,
00091               ACE_TEXT("(%P|%t) UdpDataLink::open listening on port %hu\n"),
00092               port), 2);
00093     this->config_->local_address_set_port(port);
00094   }
00095 
00096   if (this->config_->send_buffer_size_ > 0) {
00097     int snd_size = this->config_->send_buffer_size_;
00098     if (this->socket_.set_option(SOL_SOCKET,
00099                                 SO_SNDBUF,
00100                                 (void *) &snd_size,
00101                                 sizeof(snd_size)) < 0
00102         && errno != ENOTSUP) {
00103       ACE_ERROR_RETURN((LM_ERROR,
00104                         ACE_TEXT("(%P|%t) ERROR: ")
00105                         ACE_TEXT("UdpDataLink::open: failed to set the send buffer size to %d errno %m\n"),
00106                         snd_size),
00107                        false);
00108     }
00109   }
00110 
00111   if (this->config_->send_buffer_size_ > 0) {
00112     int rcv_size = this->config_->rcv_buffer_size_;
00113     if (this->socket_.set_option(SOL_SOCKET,
00114                                 SO_RCVBUF,
00115                                 (void *) &rcv_size,
00116                                 sizeof(int)) < 0
00117         && errno != ENOTSUP) {
00118       ACE_ERROR_RETURN((LM_ERROR,
00119                         ACE_TEXT("(%P|%t) ERROR: ")
00120                         ACE_TEXT("UdpDataLink::open: failed to set the receive buffer size to %d errno %m \n"),
00121                         rcv_size),
00122                        false);
00123     }
00124   }
00125 
00126 #ifdef ACE_WIN32
00127   // By default Winsock will cause reads to fail with "connection reset"
00128   // when UDP sends result in ICMP "port unreachable" messages.
00129   // The transport framework is not set up for this since returning <= 0
00130   // from our receive_bytes causes the framework to close down the datalink
00131   // which in this case is used to receive from multiple peers.
00132   BOOL recv_udp_connreset = FALSE;
00133   socket_.control(SIO_UDP_CONNRESET, &recv_udp_connreset);
00134 #endif
00135 
00136   if (this->active_) {
00137     // Set the DiffServ codepoint according to the priority value.
00138     DirectPriorityMapper mapper(this->transport_priority());
00139     this->set_dscp_codepoint(mapper.codepoint(), this->socket_);
00140 
00141 
00142     // For the active side, send the blob and wait for a 1 byte ack.
00143     VDBG((LM_DEBUG, "(%P|%t) UdpDataLink::open: active connect to %C:%hu\n",
00144       remote_address.get_host_addr(), remote_address.get_port_number()));
00145 
00146     TransportLocator info;
00147     this->impl()->connection_info_i(info);
00148     ACE_Message_Block* data_block;
00149     ACE_NEW_RETURN(data_block,
00150                    ACE_Message_Block(info.data.length()+sizeof(Priority),
00151                                      ACE_Message_Block::MB_DATA,
00152                                      0, //cont
00153                                      0, //data
00154                                      0, //allocator_strategy
00155                                      0, //locking_strategy
00156                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00157                                      ACE_Time_Value::zero,
00158                                      ACE_Time_Value::max_time,
00159                                      0,
00160                                      0),
00161                    0);
00162 
00163     Serializer serializer(data_block);
00164     serializer << this->transport_priority();
00165     serializer.write_octet_array(info.data.get_buffer(),
00166                                  info.data.length());
00167 
00168     DataSampleHeader sample_header;
00169     sample_header.message_id_ = TRANSPORT_CONTROL;
00170     sample_header.message_length_ =
00171       static_cast<ACE_UINT32>(data_block->length());
00172     ACE_Message_Block* sample_header_block;
00173     ACE_NEW_RETURN(sample_header_block,
00174                    ACE_Message_Block(DataSampleHeader::max_marshaled_size(),
00175                                      ACE_Message_Block::MB_DATA,
00176                                      0, //cont
00177                                      0, //data
00178                                      0, //allocator_strategy
00179                                      0, //locking_strategy
00180                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00181                                      ACE_Time_Value::zero,
00182                                      ACE_Time_Value::max_time,
00183                                      0,
00184                                      0),
00185                    0);
00186     *sample_header_block << sample_header;
00187     sample_header_block->cont(data_block);
00188 
00189     ACE_Message_Block* transport_header_block;
00190     TransportHeader transport_header;
00191     ACE_NEW_RETURN(transport_header_block,
00192                    ACE_Message_Block(TransportHeader::max_marshaled_size(),
00193                                      ACE_Message_Block::MB_DATA,
00194                                      0,
00195                                      0,
00196                                      0,
00197                                      0,
00198                                      ACE_DEFAULT_MESSAGE_BLOCK_PRIORITY,
00199                                      ACE_Time_Value::zero,
00200                                      ACE_Time_Value::max_time,
00201                                      0,
00202                                      0),
00203                    0);
00204 
00205     transport_header.length_ =
00206       static_cast<ACE_UINT32>(data_block->length() +
00207                               sample_header_block->length());
00208     *transport_header_block << transport_header;
00209     transport_header_block->cont(sample_header_block);
00210 
00211     iovec iov[MAX_SEND_BLOCKS];
00212     const int num_blocks =
00213       TransportSendStrategy::mb_to_iov(*transport_header_block, iov);
00214     const ssize_t sent = socket().send(iov, num_blocks, remote_address);
00215     transport_header_block->release();
00216     if (sent < 0) {
00217       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00218                                   ACE_TEXT("failed to send handshake %m\n")),
00219                        false);
00220     }
00221 
00222     // Need to wait for the 1 byte ack from the passive side before returning
00223     // the link (and indicating success).
00224     const size_t size = 32;
00225     char buff[size];
00226     // Default this timeout to 30.  We may want to make this settable
00227     // or use another settable timeout value here.
00228     ACE_Time_Value tv(30);
00229     const ssize_t recvd = socket().recv(buff, size, this->remote_address_, 0, &tv);
00230     if (recvd == 1) {
00231       // Expected value
00232       VDBG_LVL((LM_DEBUG,
00233                 ACE_TEXT("(%P|%t) UdpDataLink::open received handshake ack\n")),
00234                2);
00235     } else if (recvd < 0) {
00236       // Not a handshake ack, something is wrong
00237       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00238                                   ACE_TEXT("failed to receive handshake ack %p\n"),
00239                         ACE_TEXT("recv")), false);
00240     } else {
00241       ACE_ERROR_RETURN((LM_ERROR, ACE_TEXT("(%P|%t) ERROR: UdpDataLink::open: ")
00242                                   ACE_TEXT("failed to receive handshake ack ")
00243                                   ACE_TEXT("recv returned %b\n"), recvd),
00244                        false);
00245     }
00246   }
00247 
00248   if (start(static_rchandle_cast<TransportSendStrategy>(this->send_strategy_),
00249             static_rchandle_cast<TransportStrategy>(this->recv_strategy_))
00250       != 0) {
00251     stop_i();
00252     ACE_ERROR_RETURN((LM_ERROR,
00253                       ACE_TEXT("(%P|%t) ERROR: ")
00254                       ACE_TEXT("UdpDataLink::open: start failed!\n")),
00255                      false);
00256   }
00257 
00258   return true;
00259 }

ACE_INLINE TransportReactorTask * OpenDDS::DCPS::UdpDataLink::reactor_task (  ) 

Definition at line 44 of file UdpDataLink.inl.

References reactor_task_.

Referenced by configure().

00045 {
00046   return this->reactor_task_;
00047 }

ACE_INLINE void OpenDDS::DCPS::UdpDataLink::receive_strategy ( UdpReceiveStrategy recv_strategy  ) 

Definition at line 26 of file UdpDataLink.inl.

References recv_strategy_.

00027 {
00028   this->recv_strategy_ = recv_strategy;
00029 }

ACE_INLINE ACE_INET_Addr & OpenDDS::DCPS::UdpDataLink::remote_address (  ) 

Definition at line 57 of file UdpDataLink.inl.

References remote_address_.

00058 {
00059   return this->remote_address_;
00060 }

ACE_INLINE void OpenDDS::DCPS::UdpDataLink::send_strategy ( UdpSendStrategy send_strategy  ) 

Definition at line 20 of file UdpDataLink.inl.

References send_strategy_.

00021 {
00022   this->send_strategy_ = send_strategy;
00023 }

ACE_INLINE ACE_SOCK_Dgram & OpenDDS::DCPS::UdpDataLink::socket (  ) 

Definition at line 63 of file UdpDataLink.inl.

References socket_.

Referenced by OpenDDS::DCPS::UdpReceiveStrategy::get_handle(), open(), OpenDDS::DCPS::UdpReceiveStrategy::receive_bytes(), OpenDDS::DCPS::UdpSendStrategy::send_bytes_i(), and OpenDDS::DCPS::UdpReceiveStrategy::start_i().

00064 {
00065   return this->socket_;
00066 }

void OpenDDS::DCPS::UdpDataLink::stop_i (  )  [protected, virtual]

This announces the "stop" event to our subclass. The "stop" event will occur when this DataLink is handling a release_reservations() call and determines that it has just released all of the remaining reservations on this DataLink. The "stop" event will also occur when the TransportImpl is being shutdown() - we call stop_i() from our transport_shutdown() method to handle this case.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 275 of file UdpDataLink.cpp.

References socket_.

Referenced by open().

00276 {
00277   this->socket_.close();
00278 }


Member Data Documentation

bool OpenDDS::DCPS::UdpDataLink::active_ [protected]

Definition at line 61 of file UdpDataLink.h.

Referenced by active().

UdpInst* OpenDDS::DCPS::UdpDataLink::config_ [protected]

Definition at line 63 of file UdpDataLink.h.

Referenced by config(), configure(), and open().

TransportReactorTask* OpenDDS::DCPS::UdpDataLink::reactor_task_ [protected]

Definition at line 64 of file UdpDataLink.h.

Referenced by configure(), get_reactor(), and reactor_task().

UdpReceiveStrategy_rch OpenDDS::DCPS::UdpDataLink::recv_strategy_ [protected]

Definition at line 67 of file UdpDataLink.h.

Referenced by receive_strategy().

ACE_INET_Addr OpenDDS::DCPS::UdpDataLink::remote_address_ [private]

Definition at line 72 of file UdpDataLink.h.

Referenced by open(), and remote_address().

UdpSendStrategy_rch OpenDDS::DCPS::UdpDataLink::send_strategy_ [protected]

The transport send strategy object for this DataLink.

Reimplemented from OpenDDS::DCPS::DataLink.

Definition at line 66 of file UdpDataLink.h.

Referenced by send_strategy().

ACE_SOCK_Dgram OpenDDS::DCPS::UdpDataLink::socket_ [private]

Definition at line 74 of file UdpDataLink.h.

Referenced by open(), socket(), and stop_i().


The documentation for this class was generated from the following files:
Generated on Fri Feb 12 20:06:39 2016 for OpenDDS by  doxygen 1.4.7