00001
00002
00003
00004
00005
00006
00007
00008 #include "InfoRepoMulticastResponder.h"
00009 #include "dds/DCPS/debug.h"
00010
00011 #include "tao/debug.h"
00012 #include "tao/Object.h"
00013 #include "tao/IORTable/IORTable.h"
00014
00015 #include "ace/SOCK_Connector.h"
00016 #include "ace/Log_Msg.h"
00017
00018 #include <string>
00019
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021
00022 namespace OpenDDS {
00023 namespace Federator {
00024
00025 InfoRepoMulticastResponder::InfoRepoMulticastResponder()
00026 : initialized_(false)
00027 {
00028 }
00029
00030 InfoRepoMulticastResponder::~InfoRepoMulticastResponder()
00031 {
00032 if (
00033 this->initialized_ &&
00034 (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) {
00035 ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()")));
00036 }
00037 }
00038
00039 ACE_HANDLE
00040 InfoRepoMulticastResponder::get_handle() const
00041 {
00042 return this->mcast_dgram_.get_handle();
00043 }
00044
00045 int
00046 InfoRepoMulticastResponder::init(
00047 CORBA::ORB_ptr orb,
00048 u_short port,
00049 const char *mcast_addr)
00050 {
00051 if (this->initialized_) {
00052 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00053 }
00054
00055 if (this->mcast_addr_.set(port, mcast_addr) == -1)
00056 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1);
00057
00058 return common_init(orb);
00059 }
00060
00061 int
00062 InfoRepoMulticastResponder::init(
00063 CORBA::ORB_ptr orb,
00064 const char *mcast_addr)
00065 {
00066 if (this->initialized_) {
00067 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00068 }
00069
00070
00071 const char* tmpnic = ACE_OS::strchr(mcast_addr, '@');
00072
00073 CORBA::String_var actual_mcast_addr;
00074 CORBA::ULong length_addr;
00075
00076 if (tmpnic != 0) {
00077
00078 length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1);
00079 actual_mcast_addr = CORBA::string_alloc(length_addr);
00080
00081 ACE_OS::strncpy(actual_mcast_addr.inout(),
00082 mcast_addr,
00083 length_addr - 1);
00084
00085 actual_mcast_addr[length_addr - 1] = '\0';
00086
00087
00088 this->mcast_nic_ = tmpnic + 1;
00089
00090 } else {
00091 actual_mcast_addr =
00092 CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr)));
00093
00094 actual_mcast_addr = mcast_addr;
00095 }
00096
00097 if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1)
00098 ACE_ERROR_RETURN((LM_ERROR,
00099 "%p\n",
00100 ACE_TEXT("set")),
00101 -1);
00102
00103 return common_init(orb);
00104 }
00105
00106 int
00107 InfoRepoMulticastResponder::common_init(
00108 CORBA::ORB_ptr orb)
00109 {
00110 orb_ = CORBA::ORB::_duplicate(orb);
00111
00112 if (this->response_addr_.set((u_short) 0) == -1)
00113 ACE_ERROR_RETURN((LM_ERROR,
00114 "InfoRepoMulticastResponder::common_init() %p\n",
00115 ACE_TEXT("set")),
00116 -1);
00117
00118 else if (this->response_.open(this->response_addr_) == -1) {
00119 ACE_ERROR_RETURN((LM_ERROR,
00120 "%p\n",
00121 ACE_TEXT("set")),
00122 -1);
00123 }
00124
00125
00126 #ifdef ACE_HAS_MAC_OSX
00127 mcast_dgram_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00128 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00129 #endif
00130 if (this->mcast_nic_.length() != 0) {
00131 if (this->mcast_dgram_.join(this->mcast_addr_,
00132 1,
00133 ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1)
00134 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n",
00135 ACE_TEXT("subscribe")), -1);
00136
00137 } else {
00138 if (this->mcast_dgram_.join(this->mcast_addr_) == -1)
00139 ACE_ERROR_RETURN((LM_ERROR,
00140 "InfoRepoMulticastResponder::common_init() %p\n",
00141 ACE_TEXT("subscribe")),
00142 -1);
00143 }
00144
00145 this->initialized_ = true;
00146 return 0;
00147 }
00148
00149 int
00150 InfoRepoMulticastResponder::handle_timeout(const ACE_Time_Value &,
00151 const void *)
00152 {
00153 return 0;
00154 }
00155
00156 int
00157 InfoRepoMulticastResponder::handle_input(ACE_HANDLE)
00158 {
00159 if (OpenDDS::DCPS::DCPS_debug_level > 0)
00160 ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n"));
00161
00162
00163 CORBA::Short header;
00164
00165 ACE_UINT16 remote_port;
00166
00167 char object_key[BUFSIZ];
00168
00169 ACE_INET_Addr remote_addr;
00170
00171
00172
00173 ssize_t n = this->mcast_dgram_.recv(&header,
00174 sizeof(header),
00175 remote_addr,
00176 MSG_PEEK);
00177
00178 if (n <= 0)
00179 ACE_ERROR_RETURN((LM_ERROR,
00180 "InfoRepoMulticastResponder::handle_input - peek %d\n",
00181 n),
00182 0);
00183
00184 else if (ACE_NTOHS(header) <= 0)
00185 ACE_ERROR_RETURN((LM_ERROR,
00186 "InfoRepoMulticastResponder::handle_input() Header value < 1\n"),
00187 0);
00188
00189
00190 const int iovcnt = 3;
00191 iovec iov[iovcnt];
00192
00193 iov[0].iov_base = (char *) &header;
00194 iov[0].iov_len = sizeof(header);
00195 iov[1].iov_base = (char *) &remote_port;
00196 iov[1].iov_len = sizeof(ACE_UINT16);
00197 iov[2].iov_base = (char *) object_key;
00198 iov[2].iov_len = ACE_NTOHS(header);
00199
00200
00201 n = this->mcast_dgram_.recv(iov,
00202 iovcnt,
00203 remote_addr);
00204
00205 if (n <= 0)
00206 ACE_ERROR_RETURN((LM_ERROR,
00207 "InfoRepoMulticastResponder::handle_input recv = %d\n",
00208 n),
00209 0);
00210
00211 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00212 ACE_TCHAR addr[64];
00213 remote_addr.addr_to_string(addr, sizeof(addr));
00214 ACE_DEBUG((LM_DEBUG,
00215 "(%P|%t) Received multicast from %s.\n"
00216 "Service Name received : %C\n"
00217 "Port received : %u\n",
00218 addr,
00219 object_key,
00220 ACE_NTOHS(remote_port)));
00221 }
00222
00223
00224 CORBA::Object_var table_object =
00225 orb_->resolve_initial_references("IORTable");
00226
00227 IORTable::Locator_var locator =
00228 IORTable::Locator::_narrow(table_object.in());
00229
00230 if (CORBA::is_nil(locator.in())) {
00231 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00232
00233 }
00234
00235 std::string ior;
00236
00237 {
00238 CORBA::String_var ior_result;
00239
00240 try {
00241 ior_result = locator->locate(object_key);
00242
00243 } catch (const IORTable::NotFound&) {
00244 ACE_ERROR_RETURN((LM_ERROR,
00245 "InfoRepoMulticastResponder::handle_input() Object key not found\n"),
00246 0);
00247 }
00248
00249 ior = ior_result;
00250 }
00251
00252
00253 ACE_SOCK_Connector connector;
00254 ACE_INET_Addr peer_addr;
00255 ACE_SOCK_Stream stream;
00256
00257 peer_addr.set(remote_addr);
00258 peer_addr.set_port_number(ACE_NTOHS(remote_port));
00259
00260 #if defined (ACE_HAS_IPV6)
00261
00262 if (peer_addr.is_linklocal()) {
00263
00264
00265
00266
00267
00268
00269
00270
00271 ACE_INET_Addr peer_tmp(peer_addr);
00272 peer_tmp.set_port_number(static_cast<u_short>(0));
00273 ACE_INET_Addr* tmp = 0;
00274 size_t cnt = 0;
00275 int err = ACE::get_ip_interfaces(cnt, tmp);
00276
00277 if (err == 0) {
00278 for (size_t i = 0; i < cnt; ++i) {
00279 if (peer_tmp == tmp[i]) {
00280 peer_addr.set(ACE_NTOHS(remote_port),
00281 ACE_IPV6_LOCALHOST);
00282 break;
00283 }
00284 }
00285
00286 delete[] tmp;
00287 }
00288 }
00289
00290 #endif
00291
00292 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00293 ACE_TCHAR addr[64];
00294 peer_addr.addr_to_string(addr, sizeof(addr));
00295 ACE_DEBUG((LM_DEBUG,
00296 "(%P|%t) Replying to peer %s.\n",
00297 addr));
00298 }
00299
00300
00301 if (connector.connect(stream, peer_addr) == -1)
00302 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0);
00303
00304
00305
00306
00307
00308 CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1);
00309
00310
00311 const int cnt = 2;
00312 iovec iovp[cnt];
00313
00314
00315 iovp[0].iov_base = (char *) &data_len;
00316 iovp[0].iov_len = sizeof(CORBA::Short);
00317
00318
00319 iovp[1].iov_base = const_cast<char*>(ior.c_str());
00320 iovp[1].iov_len = static_cast<u_long>(ior.length() + 1);
00321
00322 ssize_t result = stream.sendv_n(iovp, cnt);
00323
00324 stream.close();
00325
00326
00327 if (result == -1)
00328 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0);
00329
00330 if (OpenDDS::DCPS::DCPS_debug_level > 0)
00331 ACE_DEBUG((LM_DEBUG,
00332 "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n"
00333 "sent to %C:%u.\n"
00334 "result from send = %d\n",
00335 ior.c_str(),
00336 peer_addr.get_host_name(),
00337 peer_addr.get_port_number(),
00338 result));
00339
00340 return 0;
00341 }
00342
00343 }
00344 }
00345
00346 OPENDDS_END_VERSIONED_NAMESPACE_DECL