UdpReceiveStrategy.cpp
Go to the documentation of this file.00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpReceiveStrategy.h"
00009 #include "UdpDataLink.h"
00010
00011 #include "ace/Reactor.h"
00012
00013 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00014
00015 namespace OpenDDS {
00016 namespace DCPS {
00017
00018 UdpReceiveStrategy::UdpReceiveStrategy(UdpDataLink* link)
00019 : link_(link)
00020 , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00021 {
00022 }
00023
00024 ACE_HANDLE
00025 UdpReceiveStrategy::get_handle() const
00026 {
00027 ACE_SOCK_Dgram& socket = this->link_->socket();
00028 return socket.get_handle();
00029 }
00030
00031 int
00032 UdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00033 {
00034 return this->handle_dds_input(fd);
00035 }
00036
00037 ssize_t
00038 UdpReceiveStrategy::receive_bytes(iovec iov[],
00039 int n,
00040 ACE_INET_Addr& remote_address,
00041 ACE_HANDLE )
00042 {
00043 const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
00044 remote_address_ = remote_address;
00045 return ret;
00046 }
00047
00048 void
00049 UdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00050 const ACE_INET_Addr& remote_address)
00051 {
00052 switch (sample.header_.message_id_) {
00053
00054 case TRANSPORT_CONTROL:
00055 this->link_->control_received(sample, remote_address);
00056 break;
00057
00058 default:
00059 this->link_->data_received(sample);
00060 break;
00061 }
00062 }
00063
00064 int
00065 UdpReceiveStrategy::start_i()
00066 {
00067 ACE_Reactor* reactor = this->link_->get_reactor();
00068 if (reactor == 0) {
00069 ACE_ERROR_RETURN((LM_ERROR,
00070 ACE_TEXT("(%P|%t) ERROR: ")
00071 ACE_TEXT("UdpReceiveStrategy::start_i: ")
00072 ACE_TEXT("NULL reactor reference!\n")),
00073 -1);
00074 }
00075
00076 if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00077 ACE_ERROR_RETURN((LM_ERROR,
00078 ACE_TEXT("(%P|%t) ERROR: ")
00079 ACE_TEXT("UdpReceiveStrategy::start_i: ")
00080 ACE_TEXT("failed to register handler for DataLink!\n")),
00081 -1);
00082 }
00083
00084 if (Transport_debug_level > 5) {
00085 ACE_INET_Addr addr;
00086 link_->socket().get_local_addr(addr);
00087 ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: "
00088 "listening on %C:%hu\n",
00089 addr.get_host_addr(), addr.get_port_number()));
00090 }
00091 return 0;
00092 }
00093
00094 void
00095 UdpReceiveStrategy::stop_i()
00096 {
00097 ACE_Reactor* reactor = this->link_->get_reactor();
00098 if (reactor == 0) {
00099 ACE_ERROR((LM_ERROR,
00100 ACE_TEXT("(%P|%t) ERROR: ")
00101 ACE_TEXT("UdpReceiveStrategy::stop_i: ")
00102 ACE_TEXT("NULL reactor reference!\n")));
00103 return;
00104 }
00105
00106 reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00107 }
00108
00109 bool
00110 UdpReceiveStrategy::check_header(const TransportHeader& header)
00111 {
00112 ReassemblyInfo& info = reassembly_[remote_address_];
00113
00114 if (header.sequence_ != info.second &&
00115 expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00116 VDBG_LVL((LM_WARNING,
00117 ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
00118 ACE_TEXT("expected %q received %q\n"),
00119 info.second.getValue(), header.sequence_.getValue()), 2);
00120 SequenceRange range(info.second, header.sequence_.previous());
00121 info.first.data_unavailable(range);
00122 }
00123
00124 info.second = header.sequence_;
00125 ++info.second;
00126 return true;
00127 }
00128
00129 bool
00130 UdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00131 {
00132 ReassemblyInfo& info = reassembly_[remote_address_];
00133 const TransportHeader& header = received_header();
00134 return info.first.reassemble(header.sequence_, header.first_fragment_, data);
00135 }
00136
00137 }
00138 }
00139
00140 OPENDDS_END_VERSIONED_NAMESPACE_DECL