UdpReceiveStrategy.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "UdpReceiveStrategy.h"
00009 #include "UdpDataLink.h"
00010 
00011 #include "ace/Reactor.h"
00012 
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014 
00015 namespace OpenDDS {
00016 namespace DCPS {
00017 
00018 UdpReceiveStrategy::UdpReceiveStrategy(UdpDataLink* link)
00019   : link_(link)
00020   , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00021 {
00022 }
00023 
00024 ACE_HANDLE
00025 UdpReceiveStrategy::get_handle() const
00026 {
00027   ACE_SOCK_Dgram& socket = this->link_->socket();
00028   return socket.get_handle();
00029 }
00030 
00031 int
00032 UdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00033 {
00034   return this->handle_dds_input(fd);
00035 }
00036 
00037 ssize_t
00038 UdpReceiveStrategy::receive_bytes(iovec iov[],
00039                                   int n,
00040                                   ACE_INET_Addr& remote_address,
00041                                   ACE_HANDLE /*fd*/)
00042 {
00043   const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
00044   remote_address_ = remote_address;
00045   return ret;
00046 }
00047 
00048 void
00049 UdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00050                                    const ACE_INET_Addr& remote_address)
00051 {
00052   switch (sample.header_.message_id_) {
00053 
00054   case TRANSPORT_CONTROL:
00055     this->link_->control_received(sample, remote_address);
00056     break;
00057 
00058   default:
00059     this->link_->data_received(sample);
00060     break;
00061   }
00062 }
00063 
00064 int
00065 UdpReceiveStrategy::start_i()
00066 {
00067   ACE_Reactor* reactor = this->link_->get_reactor();
00068   if (reactor == 0) {
00069     ACE_ERROR_RETURN((LM_ERROR,
00070                       ACE_TEXT("(%P|%t) ERROR: ")
00071                       ACE_TEXT("UdpReceiveStrategy::start_i: ")
00072                       ACE_TEXT("NULL reactor reference!\n")),
00073                      -1);
00074   }
00075 
00076   if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00077     ACE_ERROR_RETURN((LM_ERROR,
00078                       ACE_TEXT("(%P|%t) ERROR: ")
00079                       ACE_TEXT("UdpReceiveStrategy::start_i: ")
00080                       ACE_TEXT("failed to register handler for DataLink!\n")),
00081                      -1);
00082   }
00083 
00084   if (Transport_debug_level > 5) {
00085     ACE_INET_Addr addr;
00086     link_->socket().get_local_addr(addr);
00087     ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: "
00088                "listening on %C:%hu\n",
00089                addr.get_host_addr(), addr.get_port_number()));
00090   }
00091   return 0;
00092 }
00093 
00094 void
00095 UdpReceiveStrategy::stop_i()
00096 {
00097   ACE_Reactor* reactor = this->link_->get_reactor();
00098   if (reactor == 0) {
00099     ACE_ERROR((LM_ERROR,
00100                ACE_TEXT("(%P|%t) ERROR: ")
00101                ACE_TEXT("UdpReceiveStrategy::stop_i: ")
00102                ACE_TEXT("NULL reactor reference!\n")));
00103     return;
00104   }
00105 
00106   reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00107 }
00108 
00109 bool
00110 UdpReceiveStrategy::check_header(const TransportHeader& header)
00111 {
00112   ReassemblyInfo& info = reassembly_[remote_address_];
00113 
00114   if (header.sequence_ != info.second &&
00115       expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00116     VDBG_LVL((LM_WARNING,
00117                ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
00118                ACE_TEXT("expected %q received %q\n"),
00119                info.second.getValue(), header.sequence_.getValue()), 2);
00120     SequenceRange range(info.second, header.sequence_.previous());
00121     info.first.data_unavailable(range);
00122   }
00123 
00124   info.second = header.sequence_;
00125   ++info.second;
00126   return true;
00127 }
00128 
00129 bool
00130 UdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00131 {
00132   ReassemblyInfo& info = reassembly_[remote_address_];
00133   const TransportHeader& header = received_header();
00134   return info.first.reassemble(header.sequence_, header.first_fragment_, data);
00135 }
00136 
00137 } // namespace DCPS
00138 } // namespace OpenDDS
00139 
00140 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1