OpenDDS  Snapshot(2023/04/28-20:55)
InfoRepoMulticastResponder.cpp
Go to the documentation of this file.
1 /*
2  *
3  *
4  * Distributed under the OpenDDS License.
5  * See: http://www.opendds.org/license.html
6  */
7 
9 #include "dds/DCPS/debug.h"
10 #include "dds/DCPS/Definitions.h"
11 #include <dds/DCPS/LogAddr.h>
12 
13 #include "tao/debug.h"
14 #include "tao/Object.h"
15 #include "tao/IORTable/IORTable.h"
16 
17 #include "ace/SOCK_Connector.h"
18 #include "ace/Log_Msg.h"
19 
20 #include <string>
21 
23 
24 namespace OpenDDS {
25 namespace Federator {
26 
28  : initialized_(false)
29 {
30 }
31 
33 {
34  if (
35  this->initialized_ &&
36  (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) {
37  ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()")));
38  }
39 }
40 
41 ACE_HANDLE
43 {
44  return this->mcast_dgram_.get_handle();
45 }
46 
47 int
49  CORBA::ORB_ptr orb,
50  u_short port,
51  const char *mcast_addr)
52 {
53  if (this->initialized_) {
54  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
55  }
56 
57  if (this->mcast_addr_.set(port, mcast_addr) == -1)
58  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1);
59 
60  return common_init(orb);
61 }
62 
63 int
65  CORBA::ORB_ptr orb,
66  const char *mcast_addr)
67 {
68  if (this->initialized_) {
69  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
70  }
71 
72  // Look for a '@' incase a nic is specified.
73  const char* tmpnic = ACE_OS::strchr(mcast_addr, '@');
74 
75  CORBA::String_var actual_mcast_addr;
76  CORBA::ULong length_addr;
77 
78  if (tmpnic != 0) {
79  // i.e. a nic name has been specified
80  length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1);
81  actual_mcast_addr = CORBA::string_alloc(length_addr);
82 
83  ACE_OS::strncpy(actual_mcast_addr.inout(),
84  mcast_addr,
85  length_addr - 1);
86 
87  actual_mcast_addr[length_addr - 1] = '\0';
88 
89  /// Save for use later.
90  this->mcast_nic_ = tmpnic + 1;
91 
92  } else {
93  actual_mcast_addr =
94  CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr)));
95 
96  actual_mcast_addr = mcast_addr;
97  }
98 
99  if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1)
101  "%p\n",
102  ACE_TEXT("set")),
103  -1);
104 
105  return common_init(orb);
106 }
107 
108 int
110  CORBA::ORB_ptr orb)
111 {
113 
114  if (this->response_addr_.set((u_short) 0) == -1)
116  "InfoRepoMulticastResponder::common_init() %p\n",
117  ACE_TEXT("set")),
118  -1);
119 
120  else if (this->response_.open(this->response_addr_) == -1) {
122  "%p\n",
123  ACE_TEXT("set")),
124  -1);
125  }
126 
127  // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
128 #ifdef ACE_HAS_MAC_OSX
131 #endif
132  if (this->mcast_nic_.length() != 0) {
133  if (this->mcast_dgram_.join(this->mcast_addr_,
134  1,
135  ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1)
136  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n",
137  ACE_TEXT("subscribe")), -1);
138 
139  } else {
140  if (this->mcast_dgram_.join(this->mcast_addr_) == -1)
142  "InfoRepoMulticastResponder::common_init() %p\n",
143  ACE_TEXT("subscribe")),
144  -1);
145  }
146 
147  this->initialized_ = true;
148  return 0;
149 }
150 
151 int
153  const void *)
154 {
155  return 0;
156 }
157 
158 int
160 {
162  ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n"));
163 
164  // The length of the service name string that follows.
166  // Port to which to reply.
167  ACE_UINT16 remote_port;
168  // Name of the service for which the client is looking.
169  char object_key[BUFSIZ];
170 
171  ACE_INET_Addr remote_addr;
172 
173  // Take a peek at the header to find out how long is the service
174  // name string we should receive.
175  ssize_t n = this->mcast_dgram_.recv(&header,
176  sizeof(header),
177  remote_addr,
178  MSG_PEEK);
179 
180  if (n <= 0)
182  "InfoRepoMulticastResponder::handle_input - peek %d\n",
183  n),
184  0);
185 
186  else if (ACE_NTOHS(header) <= 0)
188  "InfoRepoMulticastResponder::handle_input() Header value < 1\n"),
189  0);
190 
191  // Receive full client multicast request.
192  const int iovcnt = 3;
193  iovec iov[iovcnt];
194 
195  iov[0].iov_base = (char *) &header;
196  iov[0].iov_len = sizeof(header);
197  iov[1].iov_base = (char *) &remote_port;
198  iov[1].iov_len = sizeof(ACE_UINT16);
199  iov[2].iov_base = (char *) object_key;
200  iov[2].iov_len = ACE_NTOHS(header);
201 
202  // Read the iovec.
203  n = this->mcast_dgram_.recv(iov,
204  iovcnt,
205  remote_addr);
206 
207  if (n <= 0)
209  "InfoRepoMulticastResponder::handle_input recv = %d\n",
210  n),
211  0);
212 
215  "(%P|%t) Received multicast from %C.\n"
216  "Service Name received : %s\n"
217  "Port received : %u\n",
218  DCPS::LogAddr(remote_addr, DCPS::LogAddr::HostPort).c_str(),
219  object_key,
220  ACE_NTOHS(remote_port)));
221  }
222 
223  // Grab the IOR table.
224  CORBA::Object_var table_object =
225  orb_->resolve_initial_references("IORTable");
226 
227  IORTable::Locator_var locator =
228  IORTable::Locator::_narrow(table_object.in());
229 
230  if (CORBA::is_nil(locator.in())) {
231  ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
232  }
233 
234  std::string ior;
235 
236  {
237  CORBA::String_var ior_result;
238 
239  try {
240  ior_result = locator->locate(object_key);
241 
242  } catch (const IORTable::NotFound&) {
244  "InfoRepoMulticastResponder::handle_input() Object key not found\n"),
245  0);
246  }
247 
248  ior = ior_result;
249  }
250 
251  // Reply to the multicast message.
252  ACE_SOCK_Connector connector;
253  ACE_INET_Addr peer_addr;
254  ACE_SOCK_Stream stream;
255 
256  peer_addr.set(remote_addr);
257  peer_addr.set_port_number(ACE_NTOHS(remote_port));
258 
259 #if defined (ACE_HAS_IPV6)
260 
261  if (peer_addr.is_linklocal()) {
262  // If this is one of our local linklocal interfaces this is not going
263  // to work.
264  // Creating a connection using such interface to the client listening
265  // at the IPv6 ANY address is not going to work (I'm not quite sure why
266  // but it probably has to do with the rather restrictive routing rules
267  // for linklocal interfaces).
268  // So we see if this is one of our local interfaces and if so create the
269  // connection using the IPv6 loopback address instead.
270  ACE_INET_Addr peer_tmp(peer_addr);
271  peer_tmp.set_port_number(static_cast<u_short>(0));
272  ACE_INET_Addr* tmp = 0;
273  size_t cnt = 0;
274  int err = ACE::get_ip_interfaces(cnt, tmp);
275 
276  if (err == 0) {
277  for (size_t i = 0; i < cnt; ++i) {
278  if (peer_tmp == tmp[i]) {
279  peer_addr.set(ACE_NTOHS(remote_port),
281  break;
282  }
283  }
284 
285  delete[] tmp;
286  }
287  }
288 
289 #endif /* ACE_HAS_IPV6 */
290 
292  ACE_DEBUG((LM_DEBUG, "(%P|%t) Replying to peer %C.\n",
293  DCPS::LogAddr(peer_addr, DCPS::LogAddr::HostPort).c_str()));
294  }
295 
296  // Connect.
297  if (connector.connect(stream, peer_addr) == -1)
298  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0);
299 
300  // Send the IOR back to the client. (Send iovec, which contains ior
301  // length as the first element, and ior itself as the second.)
302 
303  // Length of ior to be sent.
304  const CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1);
305 
306  // Vector to be sent.
307  const int cnt = 2;
308  iovec iovp[cnt];
309 
310  // The length of ior to be sent.
311  iovp[0].iov_base = (char *) &data_len;
312  iovp[0].iov_len = sizeof(CORBA::Short);
313 
314  // The ior.
315  iovp[1].iov_base = const_cast<char*>(ior.c_str());
316  iovp[1].iov_len = static_cast<u_long>(ior.length() + 1);
317 
318  const ssize_t result = stream.sendv_n(iovp, cnt);
319  // Close the stream.
320  stream.close();
321 
322  // Check for error.
323  if (result == -1)
324  ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0);
325 
328  "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n"
329  "sent to %C.\n"
330  "result from send = %d\n",
331  ior.c_str(),
332  DCPS::LogAddr(peer_addr).c_str(),
333  result));
334 
335  return 0;
336 }
337 
338 } // namespace Federator
339 } // namespace OpenDDS
340 
#define ACE_DEBUG(X)
#define MSG_PEEK
#define ACE_ERROR(X)
const char * c_str(void) const
ssize_t recv(void *buf, size_t n, ACE_Addr &addr, int flags=0) const
int leave(const ACE_INET_Addr &mcast_addr, const ACE_TCHAR *net_if=0)
int join(const ACE_INET_Addr &mcast_addr, int reuse_addr=1, const ACE_TCHAR *net_if=0)
char * string_alloc(ULong len)
#define ACE_IPV6_LOCALHOST
#define ACE_HTONS(x)
virtual ACE_HANDLE get_handle() const
Returns the internal handle used to receive multicast.
ACE_CDR::Short Short
void opts(int opts)
int ssize_t
int common_init(CORBA::ORB_ptr orb)
Factor common functionality from the two init functions.
size_t strlen(const char *s)
bool is_linklocal(void) const
LM_DEBUG
const char * strchr(const char *s, int c)
ACE_CDR::ULong ULong
virtual int handle_input(ACE_HANDLE n)
Callback when input is received on the handle.
Christopher Diggins *renamed files *fixing compilation errors *adding Visual C project file *removed make Max Lybbert *removed references to missing and unused header
Definition: CHANGELOG.txt:8
int get_ip_interfaces(size_t &count, ACE_INET_Addr *&addr_array)
const char * c_str() const
Definition: LogAddr.h:32
ACE_HANDLE get_handle(void) const
#define ACE_TEXT_CHAR_TO_TCHAR(STRING)
character_type *& inout(void)
static CORBA::ORB_ptr _duplicate(CORBA::ORB_ptr orb)
int init(CORBA::ORB_ptr orb, u_short port, const char *mcast_addr)
Initialization method.
int set(const ACE_INET_Addr &)
_in_type in(void) const
virtual int handle_timeout(const ACE_Time_Value &tv, const void *arg)
Callback when a timeout has occurred.
ACE_TEXT("TCP_Factory")
ACE_SOCK_Dgram_Mcast mcast_dgram_
multicast endpoint of communication
int close(void)
OpenDDS_Dcps_Export unsigned int DCPS_debug_level
Definition: debug.cpp:30
#define OPENDDS_END_VERSIONED_NAMESPACE_DECL
int open(const ACE_Addr &local, int protocol_family=ACE_PROTOCOL_FAMILY_INET, int protocol=0, int reuse_addr=0, int ipv6_only=0)
#define ACE_NTOHS(x)
void set_port_number(u_short, int encode=1)
ACE_SOCK_Dgram response_
socket for response to the multicast
ssize_t sendv_n(const iovec iov[], int iovcnt, const ACE_Time_Value *timeout=0, size_t *bytes_transferred=0) const
#define ACE_ERROR_RETURN(X, Y)
size_type length(void) const
const character_type * in(void) const
LM_ERROR
The Internal API and Implementation of OpenDDS.
Definition: AddressCache.h:28
Boolean is_nil(T x)
int connect(ACE_SOCK_Stream &new_stream, const ACE_Addr &remote_sap, const ACE_Time_Value *timeout=0, const ACE_Addr &local_sap=ACE_Addr::sap_any, int reuse_addr=0, int flags=0, int perms=0, int protocol=0)
char * strncpy(char *s, const char *t, size_t len)