InfoRepoMulticastResponder.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "InfoRepoMulticastResponder.h"
00009 #include "dds/DCPS/debug.h"
00010 
00011 #include "tao/debug.h"
00012 #include "tao/Object.h"
00013 #include "tao/IORTable/IORTable.h"
00014 
00015 #include "ace/SOCK_Connector.h"
00016 #include "ace/Log_Msg.h"
00017 
00018 #include <string>
00019 
00020 namespace OpenDDS {
00021 namespace Federator {
00022 
00023 InfoRepoMulticastResponder::InfoRepoMulticastResponder()
00024   : initialized_(false)
00025 {
00026 }
00027 
00028 InfoRepoMulticastResponder::~InfoRepoMulticastResponder()
00029 {
00030   if (
00031     this->initialized_ &&
00032     (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) {
00033     ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()")));
00034   }
00035 }
00036 
00037 ACE_HANDLE
00038 InfoRepoMulticastResponder::get_handle() const
00039 {
00040   return this->mcast_dgram_.get_handle();
00041 }
00042 
00043 int
00044 InfoRepoMulticastResponder::init(
00045   CORBA::ORB_ptr orb,
00046   u_short port,
00047   const char *mcast_addr)
00048 {
00049   if (this->initialized_) {
00050     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00051   }
00052 
00053   if (this->mcast_addr_.set(port, mcast_addr) == -1)
00054     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1);
00055 
00056   return common_init(orb);
00057 }
00058 
00059 int
00060 InfoRepoMulticastResponder::init(
00061   CORBA::ORB_ptr orb,
00062   const char *mcast_addr)
00063 {
00064   if (this->initialized_) {
00065     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00066   }
00067 
00068   // Look for a '@' incase a nic is specified.
00069   const char* tmpnic = ACE_OS::strchr(mcast_addr, '@');
00070 
00071   CORBA::String_var actual_mcast_addr;
00072   CORBA::ULong length_addr;
00073 
00074   if (tmpnic != 0) {
00075     // i.e. a nic name has been specified
00076     length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1);
00077     actual_mcast_addr = CORBA::string_alloc(length_addr);
00078 
00079     ACE_OS::strncpy(actual_mcast_addr.inout(),
00080                     mcast_addr,
00081                     length_addr - 1);
00082 
00083     actual_mcast_addr[length_addr - 1] = '\0';
00084 
00085     /// Save for use later.
00086     this->mcast_nic_ = tmpnic + 1;
00087 
00088   } else {
00089     actual_mcast_addr =
00090       CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr)));
00091 
00092     actual_mcast_addr = mcast_addr;
00093   }
00094 
00095   if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1)
00096     ACE_ERROR_RETURN((LM_ERROR,
00097                       "%p\n",
00098                       ACE_TEXT("set")),
00099                      -1);
00100 
00101   return common_init(orb);
00102 }
00103 
00104 int
00105 InfoRepoMulticastResponder::common_init(
00106   CORBA::ORB_ptr orb)
00107 {
00108   orb_ = CORBA::ORB::_duplicate(orb);
00109 
00110   if (this->response_addr_.set((u_short) 0) == -1)
00111     ACE_ERROR_RETURN((LM_ERROR,
00112                       "InfoRepoMulticastResponder::common_init() %p\n",
00113                       ACE_TEXT("set")),
00114                      -1);
00115 
00116   else if (this->response_.open(this->response_addr_) == -1) {
00117     ACE_ERROR_RETURN((LM_ERROR,
00118                       "%p\n",
00119                       ACE_TEXT("set")),
00120                      -1);
00121   }
00122 
00123   // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
00124 #ifdef ACE_HAS_MAC_OSX
00125   mcast_dgram_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00126                     ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00127 #endif
00128   if (this->mcast_nic_.length() != 0) {
00129     if (this->mcast_dgram_.join(this->mcast_addr_,
00130                                 1,
00131                                 ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1)
00132       ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n",
00133                         ACE_TEXT("subscribe")), -1);
00134 
00135   } else {
00136     if (this->mcast_dgram_.join(this->mcast_addr_) == -1)
00137       ACE_ERROR_RETURN((LM_ERROR,
00138                         "InfoRepoMulticastResponder::common_init() %p\n",
00139                         ACE_TEXT("subscribe")),
00140                        -1);
00141   }
00142 
00143   this->initialized_ = true;
00144   return 0;
00145 }
00146 
00147 int
00148 InfoRepoMulticastResponder::handle_timeout(const ACE_Time_Value &,
00149                                            const void *)
00150 {
00151   return 0;
00152 }
00153 
00154 int
00155 InfoRepoMulticastResponder::handle_input(ACE_HANDLE)
00156 {
00157   if (OpenDDS::DCPS::DCPS_debug_level > 0)
00158     ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n"));
00159 
00160   // The length of the service name string that follows.
00161   CORBA::Short header;
00162   // Port to which to reply.
00163   ACE_UINT16 remote_port;
00164   // Name of the service for which the client is looking.
00165   char object_key[BUFSIZ];
00166 
00167   ACE_INET_Addr remote_addr;
00168 
00169   // Take a peek at the header to find out how long is the service
00170   // name string we should receive.
00171   ssize_t n = this->mcast_dgram_.recv(&header,
00172                                       sizeof(header),
00173                                       remote_addr,
00174                                       MSG_PEEK);
00175 
00176   if (n <= 0)
00177     ACE_ERROR_RETURN((LM_ERROR,
00178                       "InfoRepoMulticastResponder::handle_input - peek %d\n",
00179                       n),
00180                      0);
00181 
00182   else if (ACE_NTOHS(header) <= 0)
00183     ACE_ERROR_RETURN((LM_ERROR,
00184                       "InfoRepoMulticastResponder::handle_input() Header value < 1\n"),
00185                      0);
00186 
00187   // Receive full client multicast request.
00188   const int iovcnt = 3;
00189   iovec iov[iovcnt];
00190 
00191   iov[0].iov_base = (char *) &header;
00192   iov[0].iov_len  = sizeof(header);
00193   iov[1].iov_base = (char *) &remote_port;
00194   iov[1].iov_len  = sizeof(ACE_UINT16);
00195   iov[2].iov_base = (char *) object_key;
00196   iov[2].iov_len  = ACE_NTOHS(header);
00197 
00198   // Read the iovec.
00199   n = this->mcast_dgram_.recv(iov,
00200                               iovcnt,
00201                               remote_addr);
00202 
00203   if (n <= 0)
00204     ACE_ERROR_RETURN((LM_ERROR,
00205                       "InfoRepoMulticastResponder::handle_input recv = %d\n",
00206                       n),
00207                      0);
00208 
00209   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00210     ACE_TCHAR addr[64];
00211     remote_addr.addr_to_string(addr, sizeof(addr));
00212     ACE_DEBUG((LM_DEBUG,
00213                "(%P|%t) Received multicast from %s.\n"
00214                "Service Name received : %C\n"
00215                "Port received : %u\n",
00216                addr,
00217                object_key,
00218                ACE_NTOHS(remote_port)));
00219   }
00220 
00221   // Grab the IOR table.
00222   CORBA::Object_var table_object =
00223     orb_->resolve_initial_references("IORTable");
00224 
00225   IORTable::Locator_var locator =
00226     IORTable::Locator::_narrow(table_object.in());
00227 
00228   if (CORBA::is_nil(locator.in())) {
00229     ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00230 
00231   }
00232 
00233   std::string ior;
00234 
00235   {
00236     CORBA::String_var ior_result;
00237 
00238     try {
00239       ior_result = locator->locate(object_key);
00240 
00241     } catch (const IORTable::NotFound&) {
00242       ACE_ERROR_RETURN((LM_ERROR,
00243                         "InfoRepoMulticastResponder::handle_input() Object key not found\n"),
00244                        0);
00245     }
00246 
00247     ior = ior_result;
00248   }
00249 
00250   // Reply to the multicast message.
00251   ACE_SOCK_Connector connector;
00252   ACE_INET_Addr peer_addr;
00253   ACE_SOCK_Stream stream;
00254 
00255   peer_addr.set(remote_addr);
00256   peer_addr.set_port_number(ACE_NTOHS(remote_port));
00257 
00258 #if defined (ACE_HAS_IPV6)
00259 
00260   if (peer_addr.is_linklocal()) {
00261     // If this is one of our local linklocal interfaces this is not going
00262     // to work.
00263     // Creating a connection using such interface to the client listening
00264     // at the IPv6 ANY address is not going to work (I'm not quite sure why
00265     // but it probably has to do with the rather restrictive routing rules
00266     // for linklocal interfaces).
00267     // So we see if this is one of our local interfaces and if so create the
00268     // connection using the IPv6 loopback address instead.
00269     ACE_INET_Addr  peer_tmp(peer_addr);
00270     peer_tmp.set_port_number(static_cast<u_short>(0));
00271     ACE_INET_Addr* tmp = 0;
00272     size_t cnt = 0;
00273     int err = ACE::get_ip_interfaces(cnt, tmp);
00274 
00275     if (err == 0) {
00276       for (size_t i = 0; i < cnt; ++i) {
00277         if (peer_tmp == tmp[i]) {
00278           peer_addr.set(ACE_NTOHS(remote_port),
00279                         ACE_IPV6_LOCALHOST);
00280           break;
00281         }
00282       }
00283 
00284       delete[] tmp;
00285     }
00286   }
00287 
00288 #endif /* ACE_HAS_IPV6 */
00289 
00290   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00291     ACE_TCHAR addr[64];
00292     peer_addr.addr_to_string(addr, sizeof(addr));
00293     ACE_DEBUG((LM_DEBUG,
00294                "(%P|%t) Replying to peer %s.\n",
00295                addr));
00296   }
00297 
00298   // Connect.
00299   if (connector.connect(stream, peer_addr) == -1)
00300     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0);
00301 
00302   // Send the IOR back to the client.  (Send iovec, which contains ior
00303   // length as the first element, and ior itself as the second.)
00304 
00305   // Length of ior to be sent.
00306   CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1);
00307 
00308   // Vector to be sent.
00309   const int cnt = 2;
00310   iovec iovp[cnt];
00311 
00312   // The length of ior to be sent.
00313   iovp[0].iov_base = (char *) &data_len;
00314   iovp[0].iov_len  = sizeof(CORBA::Short);
00315 
00316   // The ior.
00317   iovp[1].iov_base = const_cast<char*>(ior.c_str());
00318   iovp[1].iov_len  = static_cast<u_long>(ior.length() + 1);
00319 
00320   ssize_t result = stream.sendv_n(iovp, cnt);
00321   // Close the stream.
00322   stream.close();
00323 
00324   // Check for error.
00325   if (result == -1)
00326     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0);
00327 
00328   if (OpenDDS::DCPS::DCPS_debug_level > 0)
00329     ACE_DEBUG((LM_DEBUG,
00330                "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n"
00331                "sent to %C:%u.\n"
00332                "result from send = %d\n",
00333                ior.c_str(),
00334                peer_addr.get_host_name(),
00335                peer_addr.get_port_number(),
00336                result));
00337 
00338   return 0;
00339 }
00340 
00341 } // namespace Federator
00342 } // namespace OpenDDS

Generated on Fri Feb 12 20:05:23 2016 for OpenDDS by  doxygen 1.4.7