OpenDDS  Snapshot(2023/04/28-20:55)
MulticastReceiveStrategy.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 #include "MulticastDataLink.h"
10 
11 #include "ace/Reactor.h"
12 
14 
15 namespace OpenDDS {
16 namespace DCPS {
17 
19  : TransportReceiveStrategy<>(link->config())
20  , link_(link)
21 {
22 }
23 
24 ACE_HANDLE
26 {
28  return socket.get_handle();
29 }
30 
31 int
33 {
34  ThreadStatusManager::Event ev(TheServiceParticipant->get_thread_status_manager());
35 
36  const int result = this->handle_dds_input(fd);
37  if (result >= 0 && this->pdu_remaining()) {
38  VDBG_LVL((LM_DEBUG, "(%P|%t) MulticastReceiveStrategy[%@]::handle_input "
39  "resetting with %B bytes remaining\n", this, this->pdu_remaining()), 4);
40  this->reset();
41  }
42  return result;
43 }
44 
45 ssize_t
47  int n,
48  ACE_INET_Addr& remote_address,
49  ACE_HANDLE /*fd*/,
50  bool& /*stop*/)
51 {
53  return socket.recv(iov, n, remote_address);
54 }
55 
56 bool
58 {
59  return this->link_->check_header(header);
60 }
61 
62 bool
64 {
65  return this->link_->check_header(header);
66 }
67 
68 void
70  const ACE_INET_Addr& /*remote_address*/)
71 {
72  this->link_->sample_received(sample);
73 }
74 
75 bool
77 {
78  return this->link_->reassemble(data, received_header());
79 }
80 
81 int
83 {
85  if (reactor == 0) {
87  ACE_TEXT("(%P|%t) ERROR: ")
88  ACE_TEXT("MulticastReceiveStrategy::start_i: ")
89  ACE_TEXT("NULL reactor reference!\n")),
90  -1);
91  }
92 
93  if (reactor->register_handler(this, ACE_Event_Handler::READ_MASK) != 0) {
95  ACE_TEXT("(%P|%t) ERROR: ")
96  ACE_TEXT("MulticastReceiveStrategy::start_i: ")
97  ACE_TEXT("failed to register handler for DataLink!\n")),
98  -1);
99  }
100 
101  return 0;
102 }
103 
104 void
106 {
107  ACE_Reactor* reactor = this->link_->get_reactor();
108  if (reactor == 0) {
110  ACE_TEXT("(%P|%t) ERROR: ")
111  ACE_TEXT("MulticastReceiveStrategy::stop_i: ")
112  ACE_TEXT("NULL reactor reference!\n")));
113  return;
114  }
115 
117 }
118 
119 } // namespace DCPS
120 } // namespace OpenDDS
121 
#define ACE_ERROR(X)
virtual bool check_header(const TransportHeader &header)
Check the transport header for suitability.
virtual ssize_t receive_bytes(iovec iov[], int n, ACE_INET_Addr &remote_address, ACE_HANDLE fd, bool &stop)
Only our subclass knows how to do this.
virtual int start_i()
Let the subclass start.
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
ACE_SOCK_Dgram_Mcast & socket()
int ssize_t
ACE_HANDLE socket(int protocol_family, int type, int proto)
int register_handler(ACE_Event_Handler *event_handler, ACE_Reactor_Mask mask)
int remove_handler(ACE_HANDLE handle, ACE_Reactor_Mask masks)
LM_DEBUG
virtual void deliver_sample(ReceivedDataSample &sample, const ACE_INET_Addr &remote_address)
Called when there is a ReceivedDataSample to be delivered.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
Holds a data sample received by the transport.
Defines class that represents a transport packet header.
virtual bool reassemble(ReceivedDataSample &data)
ACE_HANDLE get_handle(void) const
virtual ACE_Reactor * reactor(void) const
ACE_TEXT("TCP_Factory")
bool check_header(const TransportHeader &header)
#define VDBG_LVL(DBG_ARGS, LEVEL)
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
#define ACE_ERROR_RETURN(X, Y)
#define TheServiceParticipant
bool reassemble(ReceivedDataSample &data, const TransportHeader &header)
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
virtual void stop_i()
Let the subclass stop.
void sample_received(ReceivedDataSample &sample)