00001
00002
00003
00004
00005
00006
00007
00008 #include "UdpReceiveStrategy.h"
00009 #include "UdpDataLink.h"
00010
00011 #include "ace/Reactor.h"
00012
00013 namespace OpenDDS {
00014 namespace DCPS {
00015
00016 UdpReceiveStrategy::UdpReceiveStrategy(UdpDataLink* link)
00017 : link_(link)
00018 , expected_(SequenceNumber::SEQUENCENUMBER_UNKNOWN())
00019 {
00020 }
00021
00022 ACE_HANDLE
00023 UdpReceiveStrategy::get_handle() const
00024 {
00025 ACE_SOCK_Dgram& socket = this->link_->socket();
00026 return socket.get_handle();
00027 }
00028
00029 int
00030 UdpReceiveStrategy::handle_input(ACE_HANDLE fd)
00031 {
00032 return this->handle_dds_input(fd);
00033 }
00034
00035 ssize_t
00036 UdpReceiveStrategy::receive_bytes(iovec iov[],
00037 int n,
00038 ACE_INET_Addr& remote_address,
00039 ACE_HANDLE )
00040 {
00041 const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
00042 remote_address_ = remote_address;
00043 return ret;
00044 }
00045
00046 void
00047 UdpReceiveStrategy::deliver_sample(ReceivedDataSample& sample,
00048 const ACE_INET_Addr& remote_address)
00049 {
00050 switch (sample.header_.message_id_) {
00051
00052 case TRANSPORT_CONTROL:
00053 this->link_->control_received(sample, remote_address);
00054 break;
00055
00056 default:
00057 this->link_->data_received(sample);
00058 break;
00059 }
00060 }
00061
00062 int
00063 UdpReceiveStrategy::start_i()
00064 {
00065 ACE_Reactor* reactor = this->link_->get_reactor();
00066 if (reactor == 0) {
00067 ACE_ERROR_RETURN((LM_ERROR,
00068 ACE_TEXT("(%P|%t) ERROR: ")
00069 ACE_TEXT("UdpReceiveStrategy::start_i: ")
00070 ACE_TEXT("NULL reactor reference!\n")),
00071 -1);
00072 }
00073
00074 if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
00075 ACE_ERROR_RETURN((LM_ERROR,
00076 ACE_TEXT("(%P|%t) ERROR: ")
00077 ACE_TEXT("UdpReceiveStrategy::start_i: ")
00078 ACE_TEXT("failed to register handler for DataLink!\n")),
00079 -1);
00080 }
00081
00082 if (Transport_debug_level > 5) {
00083 ACE_INET_Addr addr;
00084 link_->socket().get_local_addr(addr);
00085 ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: "
00086 "listening on %C:%hu\n",
00087 addr.get_host_addr(), addr.get_port_number()));
00088 }
00089 return 0;
00090 }
00091
00092 void
00093 UdpReceiveStrategy::stop_i()
00094 {
00095 ACE_Reactor* reactor = this->link_->get_reactor();
00096 if (reactor == 0) {
00097 ACE_ERROR((LM_ERROR,
00098 ACE_TEXT("(%P|%t) ERROR: ")
00099 ACE_TEXT("UdpReceiveStrategy::stop_i: ")
00100 ACE_TEXT("NULL reactor reference!\n")));
00101 return;
00102 }
00103
00104 reactor->remove_handler(this, ACE_Event_Handler::READ_MASK);
00105 }
00106
00107 bool
00108 UdpReceiveStrategy::check_header(const TransportHeader& header)
00109 {
00110 ReassemblyInfo& info = reassembly_[remote_address_];
00111
00112 if (header.sequence_ != info.second &&
00113 expected_ != SequenceNumber::SEQUENCENUMBER_UNKNOWN()) {
00114 VDBG_LVL((LM_WARNING,
00115 ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
00116 ACE_TEXT("expected %q received %q\n"),
00117 info.second.getValue(), header.sequence_.getValue()), 2);
00118 SequenceRange range(info.second, header.sequence_.previous());
00119 info.first.data_unavailable(range);
00120 }
00121
00122 info.second = header.sequence_;
00123 ++info.second;
00124 return true;
00125 }
00126
00127 bool
00128 UdpReceiveStrategy::reassemble(ReceivedDataSample& data)
00129 {
00130 ReassemblyInfo& info = reassembly_[remote_address_];
00131 const TransportHeader& header = received_header();
00132 return info.first.reassemble(header.sequence_, header.first_fragment_, data);
00133 }
00134
00135
00136 }
00137 }