00001
00002
00003
00004
00005
00006
00007
00008 #include "InfoRepoMulticastResponder.h"
00009 #include "dds/DCPS/debug.h"
00010
00011 #include "tao/debug.h"
00012 #include "tao/Object.h"
00013 #include "tao/IORTable/IORTable.h"
00014
00015 #include "ace/SOCK_Connector.h"
00016 #include "ace/Log_Msg.h"
00017
00018 #include <string>
00019
00020 namespace OpenDDS {
00021 namespace Federator {
00022
00023 InfoRepoMulticastResponder::InfoRepoMulticastResponder()
00024 : initialized_(false)
00025 {
00026 }
00027
00028 InfoRepoMulticastResponder::~InfoRepoMulticastResponder()
00029 {
00030 if (
00031 this->initialized_ &&
00032 (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) {
00033 ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()")));
00034 }
00035 }
00036
00037 ACE_HANDLE
00038 InfoRepoMulticastResponder::get_handle() const
00039 {
00040 return this->mcast_dgram_.get_handle();
00041 }
00042
00043 int
00044 InfoRepoMulticastResponder::init(
00045 CORBA::ORB_ptr orb,
00046 u_short port,
00047 const char *mcast_addr)
00048 {
00049 if (this->initialized_) {
00050 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00051 }
00052
00053 if (this->mcast_addr_.set(port, mcast_addr) == -1)
00054 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1);
00055
00056 return common_init(orb);
00057 }
00058
00059 int
00060 InfoRepoMulticastResponder::init(
00061 CORBA::ORB_ptr orb,
00062 const char *mcast_addr)
00063 {
00064 if (this->initialized_) {
00065 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00066 }
00067
00068
00069 const char* tmpnic = ACE_OS::strchr(mcast_addr, '@');
00070
00071 CORBA::String_var actual_mcast_addr;
00072 CORBA::ULong length_addr;
00073
00074 if (tmpnic != 0) {
00075
00076 length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1);
00077 actual_mcast_addr = CORBA::string_alloc(length_addr);
00078
00079 ACE_OS::strncpy(actual_mcast_addr.inout(),
00080 mcast_addr,
00081 length_addr - 1);
00082
00083 actual_mcast_addr[length_addr - 1] = '\0';
00084
00085
00086 this->mcast_nic_ = tmpnic + 1;
00087
00088 } else {
00089 actual_mcast_addr =
00090 CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr)));
00091
00092 actual_mcast_addr = mcast_addr;
00093 }
00094
00095 if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1)
00096 ACE_ERROR_RETURN((LM_ERROR,
00097 "%p\n",
00098 ACE_TEXT("set")),
00099 -1);
00100
00101 return common_init(orb);
00102 }
00103
00104 int
00105 InfoRepoMulticastResponder::common_init(
00106 CORBA::ORB_ptr orb)
00107 {
00108 orb_ = CORBA::ORB::_duplicate(orb);
00109
00110 if (this->response_addr_.set((u_short) 0) == -1)
00111 ACE_ERROR_RETURN((LM_ERROR,
00112 "InfoRepoMulticastResponder::common_init() %p\n",
00113 ACE_TEXT("set")),
00114 -1);
00115
00116 else if (this->response_.open(this->response_addr_) == -1) {
00117 ACE_ERROR_RETURN((LM_ERROR,
00118 "%p\n",
00119 ACE_TEXT("set")),
00120 -1);
00121 }
00122
00123
00124 #ifdef ACE_HAS_MAC_OSX
00125 mcast_dgram_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00126 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00127 #endif
00128 if (this->mcast_nic_.length() != 0) {
00129 if (this->mcast_dgram_.join(this->mcast_addr_,
00130 1,
00131 ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1)
00132 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n",
00133 ACE_TEXT("subscribe")), -1);
00134
00135 } else {
00136 if (this->mcast_dgram_.join(this->mcast_addr_) == -1)
00137 ACE_ERROR_RETURN((LM_ERROR,
00138 "InfoRepoMulticastResponder::common_init() %p\n",
00139 ACE_TEXT("subscribe")),
00140 -1);
00141 }
00142
00143 this->initialized_ = true;
00144 return 0;
00145 }
00146
00147 int
00148 InfoRepoMulticastResponder::handle_timeout(const ACE_Time_Value &,
00149 const void *)
00150 {
00151 return 0;
00152 }
00153
00154 int
00155 InfoRepoMulticastResponder::handle_input(ACE_HANDLE)
00156 {
00157 if (OpenDDS::DCPS::DCPS_debug_level > 0)
00158 ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n"));
00159
00160
00161 CORBA::Short header;
00162
00163 ACE_UINT16 remote_port;
00164
00165 char object_key[BUFSIZ];
00166
00167 ACE_INET_Addr remote_addr;
00168
00169
00170
00171 ssize_t n = this->mcast_dgram_.recv(&header,
00172 sizeof(header),
00173 remote_addr,
00174 MSG_PEEK);
00175
00176 if (n <= 0)
00177 ACE_ERROR_RETURN((LM_ERROR,
00178 "InfoRepoMulticastResponder::handle_input - peek %d\n",
00179 n),
00180 0);
00181
00182 else if (ACE_NTOHS(header) <= 0)
00183 ACE_ERROR_RETURN((LM_ERROR,
00184 "InfoRepoMulticastResponder::handle_input() Header value < 1\n"),
00185 0);
00186
00187
00188 const int iovcnt = 3;
00189 iovec iov[iovcnt];
00190
00191 iov[0].iov_base = (char *) &header;
00192 iov[0].iov_len = sizeof(header);
00193 iov[1].iov_base = (char *) &remote_port;
00194 iov[1].iov_len = sizeof(ACE_UINT16);
00195 iov[2].iov_base = (char *) object_key;
00196 iov[2].iov_len = ACE_NTOHS(header);
00197
00198
00199 n = this->mcast_dgram_.recv(iov,
00200 iovcnt,
00201 remote_addr);
00202
00203 if (n <= 0)
00204 ACE_ERROR_RETURN((LM_ERROR,
00205 "InfoRepoMulticastResponder::handle_input recv = %d\n",
00206 n),
00207 0);
00208
00209 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00210 ACE_TCHAR addr[64];
00211 remote_addr.addr_to_string(addr, sizeof(addr));
00212 ACE_DEBUG((LM_DEBUG,
00213 "(%P|%t) Received multicast from %s.\n"
00214 "Service Name received : %C\n"
00215 "Port received : %u\n",
00216 addr,
00217 object_key,
00218 ACE_NTOHS(remote_port)));
00219 }
00220
00221
00222 CORBA::Object_var table_object =
00223 orb_->resolve_initial_references("IORTable");
00224
00225 IORTable::Locator_var locator =
00226 IORTable::Locator::_narrow(table_object.in());
00227
00228 if (CORBA::is_nil(locator.in())) {
00229 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00230
00231 }
00232
00233 std::string ior;
00234
00235 {
00236 CORBA::String_var ior_result;
00237
00238 try {
00239 ior_result = locator->locate(object_key);
00240
00241 } catch (const IORTable::NotFound&) {
00242 ACE_ERROR_RETURN((LM_ERROR,
00243 "InfoRepoMulticastResponder::handle_input() Object key not found\n"),
00244 0);
00245 }
00246
00247 ior = ior_result;
00248 }
00249
00250
00251 ACE_SOCK_Connector connector;
00252 ACE_INET_Addr peer_addr;
00253 ACE_SOCK_Stream stream;
00254
00255 peer_addr.set(remote_addr);
00256 peer_addr.set_port_number(ACE_NTOHS(remote_port));
00257
00258 #if defined (ACE_HAS_IPV6)
00259
00260 if (peer_addr.is_linklocal()) {
00261
00262
00263
00264
00265
00266
00267
00268
00269 ACE_INET_Addr peer_tmp(peer_addr);
00270 peer_tmp.set_port_number(static_cast<u_short>(0));
00271 ACE_INET_Addr* tmp = 0;
00272 size_t cnt = 0;
00273 int err = ACE::get_ip_interfaces(cnt, tmp);
00274
00275 if (err == 0) {
00276 for (size_t i = 0; i < cnt; ++i) {
00277 if (peer_tmp == tmp[i]) {
00278 peer_addr.set(ACE_NTOHS(remote_port),
00279 ACE_IPV6_LOCALHOST);
00280 break;
00281 }
00282 }
00283
00284 delete[] tmp;
00285 }
00286 }
00287
00288 #endif
00289
00290 if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00291 ACE_TCHAR addr[64];
00292 peer_addr.addr_to_string(addr, sizeof(addr));
00293 ACE_DEBUG((LM_DEBUG,
00294 "(%P|%t) Replying to peer %s.\n",
00295 addr));
00296 }
00297
00298
00299 if (connector.connect(stream, peer_addr) == -1)
00300 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0);
00301
00302
00303
00304
00305
00306 CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1);
00307
00308
00309 const int cnt = 2;
00310 iovec iovp[cnt];
00311
00312
00313 iovp[0].iov_base = (char *) &data_len;
00314 iovp[0].iov_len = sizeof(CORBA::Short);
00315
00316
00317 iovp[1].iov_base = const_cast<char*>(ior.c_str());
00318 iovp[1].iov_len = static_cast<u_long>(ior.length() + 1);
00319
00320 ssize_t result = stream.sendv_n(iovp, cnt);
00321
00322 stream.close();
00323
00324
00325 if (result == -1)
00326 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0);
00327
00328 if (OpenDDS::DCPS::DCPS_debug_level > 0)
00329 ACE_DEBUG((LM_DEBUG,
00330 "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n"
00331 "sent to %C:%u.\n"
00332 "result from send = %d\n",
00333 ior.c_str(),
00334 peer_addr.get_host_name(),
00335 peer_addr.get_port_number(),
00336 result));
00337
00338 return 0;
00339 }
00340
00341 }
00342 }