UdpReceiveStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpReceiveStrategy.h"
00009 #include "UdpDataLink.h"
00010 
00011 #include "ace/Reactor.h"
00012 
00013 namespace OpenDDS {
00014 namespace DCPS {
00015 
00016 UdpReceiveStrategy::UdpReceiveStrategy(UdpDataLink* link)
00017   : link_(link)
00018   , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00019 {
00020 }
00021 
00022 ACE_HANDLE
00023 UdpReceiveStrategy::get_handle() const
00024 {
00025   ACE_SOCK_Dgram& socket = this->link_->socket();
00026   return socket.get_handle();
00027 }
00028 
00029 int
00030 UdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00031 {
00032   return this->handle_dds_input(fd);
00033 }
00034 
00035 ssize_t
00036 UdpReceiveStrategy::receive_bytes(iovec iov[],
00037                                   int n,
00038                                   ACE_INET_Addr& remote_address,
00039                                   ACE_HANDLE /*fd*/)
00040 {
00041   const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
00042   remote_address_ = remote_address;
00043   return ret;
00044 }
00045 
00046 void
00047 UdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00048                                    const ACE_INET_Addr& remote_address)
00049 {
00050   switch (sample.header_.message_id_) {
00051 
00052   case TRANSPORT_CONTROL:
00053     this->link_->control_received(sample, remote_address);
00054     break;
00055 
00056   default:
00057     this->link_->data_received(sample);
00058     break;
00059   }
00060 }
00061 
00062 int
00063 UdpReceiveStrategy::start_i()
00064 {
00065   ACE_Reactor* reactor = this->link_->get_reactor();
00066   if (reactor == 0) {
00067     ACE_ERROR_RETURN((LM_ERROR,
00068                       ACE_TEXT("(%P|%t) ERROR: ")
00069                       ACE_TEXT("UdpReceiveStrategy::start_i: ")
00070                       ACE_TEXT("NULL reactor reference!\n")),
00071                      -1);
00072   }
00073 
00074   if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00075     ACE_ERROR_RETURN((LM_ERROR,
00076                       ACE_TEXT("(%P|%t) ERROR: ")
00077                       ACE_TEXT("UdpReceiveStrategy::start_i: ")
00078                       ACE_TEXT("failed to register handler for DataLink!\n")),
00079                      -1);
00080   }
00081 
00082   if (Transport_debug_level > 5) {
00083     ACE_INET_Addr addr;
00084     link_->socket().get_local_addr(addr);
00085     ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: "
00086                "listening on %C:%hu\n",
00087                addr.get_host_addr(), addr.get_port_number()));
00088   }
00089   return 0;
00090 }
00091 
00092 void
00093 UdpReceiveStrategy::stop_i()
00094 {
00095   ACE_Reactor* reactor = this->link_->get_reactor();
00096   if (reactor == 0) {
00097     ACE_ERROR((LM_ERROR,
00098                ACE_TEXT("(%P|%t) ERROR: ")
00099                ACE_TEXT("UdpReceiveStrategy::stop_i: ")
00100                ACE_TEXT("NULL reactor reference!\n")));
00101     return;
00102   }
00103 
00104   reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00105 }
00106 
00107 bool
00108 UdpReceiveStrategy::check_header(const TransportHeader& header)
00109 {
00110   ReassemblyInfo& info = reassembly_[remote_address_];
00111 
00112   if (header.sequence_ != info.second &&
00113       expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00114     VDBG_LVL((LM_WARNING,
00115                ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
00116                ACE_TEXT("expected %q received %q\n"),
00117                info.second.getValue(), header.sequence_.getValue()), 2);
00118     SequenceRange range(info.second, header.sequence_.previous());
00119     info.first.data_unavailable(range);
00120   }
00121 
00122   info.second = header.sequence_;
00123   ++info.second;
00124   return true;
00125 }
00126 
00127 bool
00128 UdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00129 {
00130   ReassemblyInfo& info = reassembly_[remote_address_];
00131   const TransportHeader& header = received_header();
00132   return info.first.reassemble(header.sequence_, header.first_fragment_, data);
00133 }
00134 
00135 
00136 } // namespace DCPS
00137 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:28 2016 for OpenDDS by  doxygen 1.4.7