InfoRepoMulticastResponder.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "InfoRepoMulticastResponder.h"
00009 #include "dds/DCPS/debug.h"
00010 
00011 #include "tao/debug.h"
00012 #include "tao/Object.h"
00013 #include "tao/IORTable/IORTable.h"
00014 
00015 #include "ace/SOCK_Connector.h"
00016 #include "ace/Log_Msg.h"
00017 
00018 #include <string>
00019 
00020 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00021 
00022 namespace OpenDDS {
00023 namespace Federator {
00024 
00025 InfoRepoMulticastResponder::InfoRepoMulticastResponder()
00026   : initialized_(false)
00027 {
00028 }
00029 
00030 InfoRepoMulticastResponder::~InfoRepoMulticastResponder()
00031 {
00032   if (
00033     this->initialized_ &&
00034     (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) {
00035     ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()")));
00036   }
00037 }
00038 
00039 ACE_HANDLE
00040 InfoRepoMulticastResponder::get_handle() const
00041 {
00042   return this->mcast_dgram_.get_handle();
00043 }
00044 
00045 int
00046 InfoRepoMulticastResponder::init(
00047   CORBA::ORB_ptr orb,
00048   u_short port,
00049   const char *mcast_addr)
00050 {
00051   if (this->initialized_) {
00052     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00053   }
00054 
00055   if (this->mcast_addr_.set(port, mcast_addr) == -1)
00056     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1);
00057 
00058   return common_init(orb);
00059 }
00060 
00061 int
00062 InfoRepoMulticastResponder::init(
00063   CORBA::ORB_ptr orb,
00064   const char *mcast_addr)
00065 {
00066   if (this->initialized_) {
00067     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1);
00068   }
00069 
00070   // Look for a '@' incase a nic is specified.
00071   const char* tmpnic = ACE_OS::strchr(mcast_addr, '@');
00072 
00073   CORBA::String_var actual_mcast_addr;
00074   CORBA::ULong length_addr;
00075 
00076   if (tmpnic != 0) {
00077     // i.e. a nic name has been specified
00078     length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1);
00079     actual_mcast_addr = CORBA::string_alloc(length_addr);
00080 
00081     ACE_OS::strncpy(actual_mcast_addr.inout(),
00082                     mcast_addr,
00083                     length_addr - 1);
00084 
00085     actual_mcast_addr[length_addr - 1] = '\0';
00086 
00087     /// Save for use later.
00088     this->mcast_nic_ = tmpnic + 1;
00089 
00090   } else {
00091     actual_mcast_addr =
00092       CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr)));
00093 
00094     actual_mcast_addr = mcast_addr;
00095   }
00096 
00097   if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1)
00098     ACE_ERROR_RETURN((LM_ERROR,
00099                       "%p\n",
00100                       ACE_TEXT("set")),
00101                      -1);
00102 
00103   return common_init(orb);
00104 }
00105 
00106 int
00107 InfoRepoMulticastResponder::common_init(
00108   CORBA::ORB_ptr orb)
00109 {
00110   orb_ = CORBA::ORB::_duplicate(orb);
00111 
00112   if (this->response_addr_.set((u_short) 0) == -1)
00113     ACE_ERROR_RETURN((LM_ERROR,
00114                       "InfoRepoMulticastResponder::common_init() %p\n",
00115                       ACE_TEXT("set")),
00116                      -1);
00117 
00118   else if (this->response_.open(this->response_addr_) == -1) {
00119     ACE_ERROR_RETURN((LM_ERROR,
00120                       "%p\n",
00121                       ACE_TEXT("set")),
00122                      -1);
00123   }
00124 
00125   // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group.
00126 #ifdef ACE_HAS_MAC_OSX
00127   mcast_dgram_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO |
00128                     ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE);
00129 #endif
00130   if (this->mcast_nic_.length() != 0) {
00131     if (this->mcast_dgram_.join(this->mcast_addr_,
00132                                 1,
00133                                 ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1)
00134       ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n",
00135                         ACE_TEXT("subscribe")), -1);
00136 
00137   } else {
00138     if (this->mcast_dgram_.join(this->mcast_addr_) == -1)
00139       ACE_ERROR_RETURN((LM_ERROR,
00140                         "InfoRepoMulticastResponder::common_init() %p\n",
00141                         ACE_TEXT("subscribe")),
00142                        -1);
00143   }
00144 
00145   this->initialized_ = true;
00146   return 0;
00147 }
00148 
00149 int
00150 InfoRepoMulticastResponder::handle_timeout(const ACE_Time_Value &,
00151                                            const void *)
00152 {
00153   return 0;
00154 }
00155 
00156 int
00157 InfoRepoMulticastResponder::handle_input(ACE_HANDLE)
00158 {
00159   if (OpenDDS::DCPS::DCPS_debug_level > 0)
00160     ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n"));
00161 
00162   // The length of the service name string that follows.
00163   CORBA::Short header;
00164   // Port to which to reply.
00165   ACE_UINT16 remote_port;
00166   // Name of the service for which the client is looking.
00167   char object_key[BUFSIZ];
00168 
00169   ACE_INET_Addr remote_addr;
00170 
00171   // Take a peek at the header to find out how long is the service
00172   // name string we should receive.
00173   ssize_t n = this->mcast_dgram_.recv(&header,
00174                                       sizeof(header),
00175                                       remote_addr,
00176                                       MSG_PEEK);
00177 
00178   if (n <= 0)
00179     ACE_ERROR_RETURN((LM_ERROR,
00180                       "InfoRepoMulticastResponder::handle_input - peek %d\n",
00181                       n),
00182                      0);
00183 
00184   else if (ACE_NTOHS(header) <= 0)
00185     ACE_ERROR_RETURN((LM_ERROR,
00186                       "InfoRepoMulticastResponder::handle_input() Header value < 1\n"),
00187                      0);
00188 
00189   // Receive full client multicast request.
00190   const int iovcnt = 3;
00191   iovec iov[iovcnt];
00192 
00193   iov[0].iov_base = (char *) &header;
00194   iov[0].iov_len  = sizeof(header);
00195   iov[1].iov_base = (char *) &remote_port;
00196   iov[1].iov_len  = sizeof(ACE_UINT16);
00197   iov[2].iov_base = (char *) object_key;
00198   iov[2].iov_len  = ACE_NTOHS(header);
00199 
00200   // Read the iovec.
00201   n = this->mcast_dgram_.recv(iov,
00202                               iovcnt,
00203                               remote_addr);
00204 
00205   if (n <= 0)
00206     ACE_ERROR_RETURN((LM_ERROR,
00207                       "InfoRepoMulticastResponder::handle_input recv = %d\n",
00208                       n),
00209                      0);
00210 
00211   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00212     ACE_TCHAR addr[64];
00213     remote_addr.addr_to_string(addr, sizeof(addr));
00214     ACE_DEBUG((LM_DEBUG,
00215                "(%P|%t) Received multicast from %s.\n"
00216                "Service Name received : %C\n"
00217                "Port received : %u\n",
00218                addr,
00219                object_key,
00220                ACE_NTOHS(remote_port)));
00221   }
00222 
00223   // Grab the IOR table.
00224   CORBA::Object_var table_object =
00225     orb_->resolve_initial_references("IORTable");
00226 
00227   IORTable::Locator_var locator =
00228     IORTable::Locator::_narrow(table_object.in());
00229 
00230   if (CORBA::is_nil(locator.in())) {
00231     ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n")));
00232 
00233   }
00234 
00235   std::string ior;
00236 
00237   {
00238     CORBA::String_var ior_result;
00239 
00240     try {
00241       ior_result = locator->locate(object_key);
00242 
00243     } catch (const IORTable::NotFound&) {
00244       ACE_ERROR_RETURN((LM_ERROR,
00245                         "InfoRepoMulticastResponder::handle_input() Object key not found\n"),
00246                        0);
00247     }
00248 
00249     ior = ior_result;
00250   }
00251 
00252   // Reply to the multicast message.
00253   ACE_SOCK_Connector connector;
00254   ACE_INET_Addr peer_addr;
00255   ACE_SOCK_Stream stream;
00256 
00257   peer_addr.set(remote_addr);
00258   peer_addr.set_port_number(ACE_NTOHS(remote_port));
00259 
00260 #if defined (ACE_HAS_IPV6)
00261 
00262   if (peer_addr.is_linklocal()) {
00263     // If this is one of our local linklocal interfaces this is not going
00264     // to work.
00265     // Creating a connection using such interface to the client listening
00266     // at the IPv6 ANY address is not going to work (I'm not quite sure why
00267     // but it probably has to do with the rather restrictive routing rules
00268     // for linklocal interfaces).
00269     // So we see if this is one of our local interfaces and if so create the
00270     // connection using the IPv6 loopback address instead.
00271     ACE_INET_Addr  peer_tmp(peer_addr);
00272     peer_tmp.set_port_number(static_cast<u_short>(0));
00273     ACE_INET_Addr* tmp = 0;
00274     size_t cnt = 0;
00275     int err = ACE::get_ip_interfaces(cnt, tmp);
00276 
00277     if (err == 0) {
00278       for (size_t i = 0; i < cnt; ++i) {
00279         if (peer_tmp == tmp[i]) {
00280           peer_addr.set(ACE_NTOHS(remote_port),
00281                         ACE_IPV6_LOCALHOST);
00282           break;
00283         }
00284       }
00285 
00286       delete[] tmp;
00287     }
00288   }
00289 
00290 #endif /* ACE_HAS_IPV6 */
00291 
00292   if (OpenDDS::DCPS::DCPS_debug_level > 0) {
00293     ACE_TCHAR addr[64];
00294     peer_addr.addr_to_string(addr, sizeof(addr));
00295     ACE_DEBUG((LM_DEBUG,
00296                "(%P|%t) Replying to peer %s.\n",
00297                addr));
00298   }
00299 
00300   // Connect.
00301   if (connector.connect(stream, peer_addr) == -1)
00302     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0);
00303 
00304   // Send the IOR back to the client.  (Send iovec, which contains ior
00305   // length as the first element, and ior itself as the second.)
00306 
00307   // Length of ior to be sent.
00308   CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1);
00309 
00310   // Vector to be sent.
00311   const int cnt = 2;
00312   iovec iovp[cnt];
00313 
00314   // The length of ior to be sent.
00315   iovp[0].iov_base = (char *) &data_len;
00316   iovp[0].iov_len  = sizeof(CORBA::Short);
00317 
00318   // The ior.
00319   iovp[1].iov_base = const_cast<char*>(ior.c_str());
00320   iovp[1].iov_len  = static_cast<u_long>(ior.length() + 1);
00321 
00322   ssize_t result = stream.sendv_n(iovp, cnt);
00323   // Close the stream.
00324   stream.close();
00325 
00326   // Check for error.
00327   if (result == -1)
00328     ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0);
00329 
00330   if (OpenDDS::DCPS::DCPS_debug_level > 0)
00331     ACE_DEBUG((LM_DEBUG,
00332                "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n"
00333                "sent to %C:%u.\n"
00334                "result from send = %d\n",
00335                ior.c_str(),
00336                peer_addr.get_host_name(),
00337                peer_addr.get_port_number(),
00338                result));
00339 
00340   return 0;
00341 }
00342 
00343 } // namespace Federator
00344 } // namespace OpenDDS
00345 
00346 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1