OpenDDS  Snapshot(2023/04/28-20:55)
UdpReceiveStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
8 #include "UdpReceiveStrategy.h"
9 #include "UdpDataLink.h"
10 
11 #include <dds/DCPS/LogAddr.h>
13 
14 #include "ace/Reactor.h"
15 
17 
18 namespace OpenDDS {
19 namespace DCPS {
20 
22  : TransportReceiveStrategy<>(link->impl()->config())
23  , link_(link)
25 {
26 }
27 
28 ACE_HANDLE
30 {
31  ACE_SOCK_Dgram& socket = this->link_->socket();
32  return socket.get_handle();
33 }
34 
35 int
37 {
38  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
39 
40  return this->handle_dds_input(fd);
41 }
42 
43 ssize_t
45  int n,
46  ACE_INET_Addr& remote_address,
47  ACE_HANDLE /*fd*/,
48  bool& /*stop*/)
49 {
50  const ssize_t ret = this->link_->socket().recv(iov, n, remote_address);
51  remote_address_ = remote_address;
52  return ret;
53 }
54 
55 void
57  const ACE_INET_Addr& remote_address)
58 {
59  switch (sample.header_.message_id_) {
60 
61  case TRANSPORT_CONTROL:
62  this->link_->control_received(sample, remote_address);
63  break;
64 
65  default:
66  this->link_->data_received(sample);
67  break;
68  }
69 }
70 
71 int
73 {
75  if (reactor == 0) {
77  ACE_TEXT("(%P|%t) ERROR: ")
78  ACE_TEXT("UdpReceiveStrategy::start_i: ")
79  ACE_TEXT("NULL reactor reference!\n")),
80  -1);
81  }
82 
83  if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
85  ACE_TEXT("(%P|%t) ERROR: ")
86  ACE_TEXT("UdpReceiveStrategy::start_i: ")
87  ACE_TEXT("failed to register handler for DataLink!\n")),
88  -1);
89  }
90 
91  if (Transport_debug_level > 5) {
92  ACE_INET_Addr addr;
93  link_->socket().get_local_addr(addr);
94  ACE_DEBUG((LM_DEBUG, "(%P|%t) UdpReceiveStrategy::start_i: listening on %C\n",
95  LogAddr(addr).c_str()));
96  }
97  return 0;
98 }
99 
100 void
102 {
103  ACE_Reactor* reactor = this->link_->get_reactor();
104  if (reactor == 0) {
106  ACE_TEXT("(%P|%t) ERROR: ")
107  ACE_TEXT("UdpReceiveStrategy::stop_i: ")
108  ACE_TEXT("NULL reactor reference!\n")));
109  return;
110  }
111 
113 }
114 
115 bool
117 {
119  if (!info.first) {
120  info.first = make_rch<TransportReassembly>();
121  }
122 
123  if (header.sequence_ != info.second &&
126  ACE_TEXT("(%P|%t) WARNING: UdpReceiveStrategy::check_header ")
127  ACE_TEXT("expected %q received %q\n"),
128  info.second.getValue(), header.sequence_.getValue()), 2);
129  FragmentRange range(info.second.getValue(), header.sequence_.previous().getValue());
130  info.first->data_unavailable(range);
131  }
132 
133  info.second = header.sequence_;
134  ++info.second;
135  return true;
136 }
137 
138 bool
140 {
142  if (!info.first) {
143  info.first = make_rch<TransportReassembly>();
144  }
146  return info.first->reassemble(header.sequence_, header.first_fragment_, data);
147 }
148 
149 } // namespace DCPS
150 } // namespace OpenDDS
151 
DataSampleHeader header_
The demarshalled sample header.
#define ACE_DEBUG(X)
#define ACE_ERROR(X)
char message_id_
The enum MessageId.
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
virtual bool reassemble(ReceivedDataSample &data)
SequenceNumber previous() const
int data_received(ReceivedDataSample &sample, const GUID_t &readerId=GUID_UNKNOWN)
Definition: DataLink.cpp:690
void control_received(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
int ssize_t
ACE_Reactor * get_reactor()
Definition: UdpDataLink.inl:27
virtual int handle_input(ACE_HANDLE fd)
ACE_HANDLE socket(int protocol_family, int type, int proto)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
OpenDDS_Dcps_Export unsigned int Transport_debug_level
Transport Logging verbosity level.
Definition: debug.cpp:25
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
LM_DEBUG
std::pair< TransportReassembly_rch, SequenceNumber > ReassemblyInfo
virtual bool check_header(const TransportHeader &header)
Check the transport header for suitability.
std::pair< FragmentNumber, FragmentNumber > FragmentRange
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
int get_local_addr(ACE_Addr &) const
Defines class that represents a transport packet header.
ACE_HANDLE get_handle(void) const
LM_WARNING
virtual int start_i()
Let the subclass start.
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
virtual ACE_HANDLE get_handle() const
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.
ACE_SOCK_Dgram & socket()
Definition: UdpDataLink.inl:40
const SequenceNumber_t SEQUENCENUMBER_UNKNOWN
Definition: MessageTypes.h:50
Sequence number abstraction. Only allows positive 64 bit values.
static SequenceNumber SEQUENCENUMBER_UNKNOWN()
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void stop_i()
Let the subclass stop.
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.