Event Handler that services multicast requests for IOR of a bootstrappable service. More...
#include <InfoRepoMulticastResponder.h>
Public Member Functions | |
InfoRepoMulticastResponder () | |
virtual | ~InfoRepoMulticastResponder () |
int | init (CORBA::ORB_ptr orb, u_short port, const char *mcast_addr) |
Initialization method. | |
int | init (CORBA::ORB_ptr orb, const char *mcast_addr) |
virtual int | handle_input (ACE_HANDLE n) |
Callback when input is received on the handle. | |
virtual int | handle_timeout (const ACE_Time_Value &tv, const void *arg) |
Callback when a timeout has occurred. | |
virtual ACE_HANDLE | get_handle () const |
Returns the internal handle used to receive multicast. | |
Private Member Functions | |
int | common_init (CORBA::ORB_ptr orb) |
Factor common functionality from the two init functions. | |
Private Attributes | |
bool | initialized_ |
Are we initialized? | |
CORBA::ORB_var | orb_ |
The ORB. | |
ACE_SOCK_Dgram_Mcast | mcast_dgram_ |
multicast endpoint of communication | |
ACE_INET_Addr | mcast_addr_ |
multicast address | |
ACE_INET_Addr | response_addr_ |
address of response. | |
ACE_SOCK_Dgram | response_ |
socket for response to the multicast | |
ACE_CString | mcast_nic_ |
Event Handler that services multicast requests for IOR of a bootstrappable service.
This class uses the ACE_SOCK_Dgram_Mcast class and should be registered with a reactor and should be initialized with the ior of the service to be multicasted.
Definition at line 42 of file InfoRepoMulticastResponder.h.
OpenDDS::Federator::InfoRepoMulticastResponder::InfoRepoMulticastResponder | ( | ) |
Definition at line 25 of file InfoRepoMulticastResponder.cpp.
00026 : initialized_(false) 00027 { 00028 }
OpenDDS::Federator::InfoRepoMulticastResponder::~InfoRepoMulticastResponder | ( | ) | [virtual] |
Definition at line 30 of file InfoRepoMulticastResponder.cpp.
References ACE_TEXT(), initialized_, ACE_SOCK_Dgram_Mcast::leave(), LM_ERROR, and mcast_dgram_.
00031 { 00032 if ( 00033 this->initialized_ && 00034 (this->mcast_dgram_.leave(this->mcast_addr_) == -1)) { 00035 ACE_ERROR((LM_ERROR, "%p\n", ACE_TEXT("~InfoRepoMulticastResponder()"))); 00036 } 00037 }
int OpenDDS::Federator::InfoRepoMulticastResponder::common_init | ( | CORBA::ORB_ptr | orb | ) | [private] |
Factor common functionality from the two init functions.
Definition at line 107 of file InfoRepoMulticastResponder.cpp.
References CORBA::ORB::_duplicate(), ACE_TEXT(), ACE_TEXT_CHAR_TO_TCHAR, ACE_String_Base< ACE_CHAR_T >::c_str(), ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE, initialized_, ACE_SOCK_Dgram_Mcast::join(), ACE_String_Base< ACE_CHAR_T >::length(), LM_ERROR, mcast_dgram_, mcast_nic_, ACE_SOCK_Dgram::open(), ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO, ACE_SOCK_Dgram_Mcast::opts(), orb_, response_, response_addr_, and ACE_INET_Addr::set().
Referenced by init().
00109 { 00110 orb_ = CORBA::ORB::_duplicate(orb); 00111 00112 if (this->response_addr_.set((u_short) 0) == -1) 00113 ACE_ERROR_RETURN((LM_ERROR, 00114 "InfoRepoMulticastResponder::common_init() %p\n", 00115 ACE_TEXT("set")), 00116 -1); 00117 00118 else if (this->response_.open(this->response_addr_) == -1) { 00119 ACE_ERROR_RETURN((LM_ERROR, 00120 "%p\n", 00121 ACE_TEXT("set")), 00122 -1); 00123 } 00124 00125 // Use ACE_SOCK_Dgram_Mcast factory to subscribe to multicast group. 00126 #ifdef ACE_HAS_MAC_OSX 00127 mcast_dgram_.opts(ACE_SOCK_Dgram_Mcast::OPT_BINDADDR_NO | 00128 ACE_SOCK_Dgram_Mcast::DEFOPT_NULLIFACE); 00129 #endif 00130 if (this->mcast_nic_.length() != 0) { 00131 if (this->mcast_dgram_.join(this->mcast_addr_, 00132 1, 00133 ACE_TEXT_CHAR_TO_TCHAR(this->mcast_nic_.c_str())) == -1) 00134 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::common_init() %p\n", 00135 ACE_TEXT("subscribe")), -1); 00136 00137 } else { 00138 if (this->mcast_dgram_.join(this->mcast_addr_) == -1) 00139 ACE_ERROR_RETURN((LM_ERROR, 00140 "InfoRepoMulticastResponder::common_init() %p\n", 00141 ACE_TEXT("subscribe")), 00142 -1); 00143 } 00144 00145 this->initialized_ = true; 00146 return 0; 00147 }
ACE_HANDLE OpenDDS::Federator::InfoRepoMulticastResponder::get_handle | ( | void | ) | const [virtual] |
Returns the internal handle used to receive multicast.
Reimplemented from ACE_Event_Handler.
Definition at line 40 of file InfoRepoMulticastResponder.cpp.
References ACE_IPC_SAP::get_handle(), and mcast_dgram_.
00041 { 00042 return this->mcast_dgram_.get_handle(); 00043 }
int OpenDDS::Federator::InfoRepoMulticastResponder::handle_input | ( | ACE_HANDLE | n | ) | [virtual] |
Callback when input is received on the handle.
Reimplemented from ACE_Event_Handler.
Definition at line 157 of file InfoRepoMulticastResponder.cpp.
References IORTable::Locator::_narrow(), ACE_TEXT(), ACE_INET_Addr::addr_to_string(), ACE_SOCK_Stream::close(), ACE_SOCK_Connector::connect(), OpenDDS::DCPS::DCPS_debug_level, ACE_INET_Addr::get_host_name(), ACE::get_ip_interfaces(), ACE_INET_Addr::get_port_number(), header, TAO_Objref_Var_T< T >::in(), TAO_Pseudo_Var_T< T >::in(), iovec::iov_base, iovec::iov_len, ACE_INET_Addr::is_linklocal(), CORBA::is_nil(), LM_DEBUG, LM_ERROR, mcast_dgram_, orb_, ACE_SOCK_Dgram::recv(), ACE_SOCK_Stream::sendv_n(), ACE_INET_Addr::set(), and ACE_INET_Addr::set_port_number().
00158 { 00159 if (OpenDDS::DCPS::DCPS_debug_level > 0) 00160 ACE_DEBUG((LM_DEBUG, "Entered InfoRepoMulticastResponder::handle_input\n")); 00161 00162 // The length of the service name string that follows. 00163 CORBA::Short header; 00164 // Port to which to reply. 00165 ACE_UINT16 remote_port; 00166 // Name of the service for which the client is looking. 00167 char object_key[BUFSIZ]; 00168 00169 ACE_INET_Addr remote_addr; 00170 00171 // Take a peek at the header to find out how long is the service 00172 // name string we should receive. 00173 ssize_t n = this->mcast_dgram_.recv(&header, 00174 sizeof(header), 00175 remote_addr, 00176 MSG_PEEK); 00177 00178 if (n <= 0) 00179 ACE_ERROR_RETURN((LM_ERROR, 00180 "InfoRepoMulticastResponder::handle_input - peek %d\n", 00181 n), 00182 0); 00183 00184 else if (ACE_NTOHS(header) <= 0) 00185 ACE_ERROR_RETURN((LM_ERROR, 00186 "InfoRepoMulticastResponder::handle_input() Header value < 1\n"), 00187 0); 00188 00189 // Receive full client multicast request. 00190 const int iovcnt = 3; 00191 iovec iov[iovcnt]; 00192 00193 iov[0].iov_base = (char *) &header; 00194 iov[0].iov_len = sizeof(header); 00195 iov[1].iov_base = (char *) &remote_port; 00196 iov[1].iov_len = sizeof(ACE_UINT16); 00197 iov[2].iov_base = (char *) object_key; 00198 iov[2].iov_len = ACE_NTOHS(header); 00199 00200 // Read the iovec. 00201 n = this->mcast_dgram_.recv(iov, 00202 iovcnt, 00203 remote_addr); 00204 00205 if (n <= 0) 00206 ACE_ERROR_RETURN((LM_ERROR, 00207 "InfoRepoMulticastResponder::handle_input recv = %d\n", 00208 n), 00209 0); 00210 00211 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00212 ACE_TCHAR addr[64]; 00213 remote_addr.addr_to_string(addr, sizeof(addr)); 00214 ACE_DEBUG((LM_DEBUG, 00215 "(%P|%t) Received multicast from %s.\n" 00216 "Service Name received : %C\n" 00217 "Port received : %u\n", 00218 addr, 00219 object_key, 00220 ACE_NTOHS(remote_port))); 00221 } 00222 00223 // Grab the IOR table. 00224 CORBA::Object_var table_object = 00225 orb_->resolve_initial_references("IORTable"); 00226 00227 IORTable::Locator_var locator = 00228 IORTable::Locator::_narrow(table_object.in()); 00229 00230 if (CORBA::is_nil(locator.in())) { 00231 ACE_ERROR((LM_ERROR, ACE_TEXT("Nil IORTable\n"))); 00232 00233 } 00234 00235 std::string ior; 00236 00237 { 00238 CORBA::String_var ior_result; 00239 00240 try { 00241 ior_result = locator->locate(object_key); 00242 00243 } catch (const IORTable::NotFound&) { 00244 ACE_ERROR_RETURN((LM_ERROR, 00245 "InfoRepoMulticastResponder::handle_input() Object key not found\n"), 00246 0); 00247 } 00248 00249 ior = ior_result; 00250 } 00251 00252 // Reply to the multicast message. 00253 ACE_SOCK_Connector connector; 00254 ACE_INET_Addr peer_addr; 00255 ACE_SOCK_Stream stream; 00256 00257 peer_addr.set(remote_addr); 00258 peer_addr.set_port_number(ACE_NTOHS(remote_port)); 00259 00260 #if defined (ACE_HAS_IPV6) 00261 00262 if (peer_addr.is_linklocal()) { 00263 // If this is one of our local linklocal interfaces this is not going 00264 // to work. 00265 // Creating a connection using such interface to the client listening 00266 // at the IPv6 ANY address is not going to work (I'm not quite sure why 00267 // but it probably has to do with the rather restrictive routing rules 00268 // for linklocal interfaces). 00269 // So we see if this is one of our local interfaces and if so create the 00270 // connection using the IPv6 loopback address instead. 00271 ACE_INET_Addr peer_tmp(peer_addr); 00272 peer_tmp.set_port_number(static_cast<u_short>(0)); 00273 ACE_INET_Addr* tmp = 0; 00274 size_t cnt = 0; 00275 int err = ACE::get_ip_interfaces(cnt, tmp); 00276 00277 if (err == 0) { 00278 for (size_t i = 0; i < cnt; ++i) { 00279 if (peer_tmp == tmp[i]) { 00280 peer_addr.set(ACE_NTOHS(remote_port), 00281 ACE_IPV6_LOCALHOST); 00282 break; 00283 } 00284 } 00285 00286 delete[] tmp; 00287 } 00288 } 00289 00290 #endif /* ACE_HAS_IPV6 */ 00291 00292 if (OpenDDS::DCPS::DCPS_debug_level > 0) { 00293 ACE_TCHAR addr[64]; 00294 peer_addr.addr_to_string(addr, sizeof(addr)); 00295 ACE_DEBUG((LM_DEBUG, 00296 "(%P|%t) Replying to peer %s.\n", 00297 addr)); 00298 } 00299 00300 // Connect. 00301 if (connector.connect(stream, peer_addr) == -1) 00302 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::connect failed\n"), 0); 00303 00304 // Send the IOR back to the client. (Send iovec, which contains ior 00305 // length as the first element, and ior itself as the second.) 00306 00307 // Length of ior to be sent. 00308 CORBA::Short data_len = ACE_HTONS(static_cast<CORBA::Short>(ior.length()) + 1); 00309 00310 // Vector to be sent. 00311 const int cnt = 2; 00312 iovec iovp[cnt]; 00313 00314 // The length of ior to be sent. 00315 iovp[0].iov_base = (char *) &data_len; 00316 iovp[0].iov_len = sizeof(CORBA::Short); 00317 00318 // The ior. 00319 iovp[1].iov_base = const_cast<char*>(ior.c_str()); 00320 iovp[1].iov_len = static_cast<u_long>(ior.length() + 1); 00321 00322 ssize_t result = stream.sendv_n(iovp, cnt); 00323 // Close the stream. 00324 stream.close(); 00325 00326 // Check for error. 00327 if (result == -1) 00328 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::send failed\n"), 0); 00329 00330 if (OpenDDS::DCPS::DCPS_debug_level > 0) 00331 ACE_DEBUG((LM_DEBUG, 00332 "(%P|%t) InfoRepoMulticastResponder::handle_input() ior: <%C>\n" 00333 "sent to %C:%u.\n" 00334 "result from send = %d\n", 00335 ior.c_str(), 00336 peer_addr.get_host_name(), 00337 peer_addr.get_port_number(), 00338 result)); 00339 00340 return 0; 00341 }
int OpenDDS::Federator::InfoRepoMulticastResponder::handle_timeout | ( | const ACE_Time_Value & | tv, | |
const void * | arg | |||
) | [virtual] |
Callback when a timeout has occurred.
Reimplemented from ACE_Event_Handler.
Definition at line 150 of file InfoRepoMulticastResponder.cpp.
int OpenDDS::Federator::InfoRepoMulticastResponder::init | ( | CORBA::ORB_ptr | orb, | |
const char * | mcast_addr | |||
) |
Initialization method. Takes in "address:port" string as a parameter.
Save for use later.
Definition at line 62 of file InfoRepoMulticastResponder.cpp.
References ACE_TEXT(), common_init(), initialized_, LM_ERROR, mcast_addr_, mcast_nic_, ACE_INET_Addr::set(), ACE_OS::strchr(), FACE::string_alloc(), ACE_OS::strlen(), and ACE_OS::strncpy().
00065 { 00066 if (this->initialized_) { 00067 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1); 00068 } 00069 00070 // Look for a '@' incase a nic is specified. 00071 const char* tmpnic = ACE_OS::strchr(mcast_addr, '@'); 00072 00073 CORBA::String_var actual_mcast_addr; 00074 CORBA::ULong length_addr; 00075 00076 if (tmpnic != 0) { 00077 // i.e. a nic name has been specified 00078 length_addr = static_cast<CORBA::ULong>(tmpnic - mcast_addr + 1); 00079 actual_mcast_addr = CORBA::string_alloc(length_addr); 00080 00081 ACE_OS::strncpy(actual_mcast_addr.inout(), 00082 mcast_addr, 00083 length_addr - 1); 00084 00085 actual_mcast_addr[length_addr - 1] = '\0'; 00086 00087 /// Save for use later. 00088 this->mcast_nic_ = tmpnic + 1; 00089 00090 } else { 00091 actual_mcast_addr = 00092 CORBA::string_alloc(static_cast<CORBA::ULong>(ACE_OS::strlen(mcast_addr))); 00093 00094 actual_mcast_addr = mcast_addr; 00095 } 00096 00097 if (this->mcast_addr_.set(actual_mcast_addr.in()) == -1) 00098 ACE_ERROR_RETURN((LM_ERROR, 00099 "%p\n", 00100 ACE_TEXT("set")), 00101 -1); 00102 00103 return common_init(orb); 00104 }
int OpenDDS::Federator::InfoRepoMulticastResponder::init | ( | CORBA::ORB_ptr | orb, | |
u_short | port, | |||
const char * | mcast_addr | |||
) |
Initialization method.
Definition at line 46 of file InfoRepoMulticastResponder.cpp.
References ACE_TEXT(), common_init(), initialized_, LM_ERROR, mcast_addr_, and ACE_INET_Addr::set().
Referenced by OpenDDS::Federator::ManagerImpl::initialize().
00050 { 00051 if (this->initialized_) { 00052 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() already initialized\n"), -1); 00053 } 00054 00055 if (this->mcast_addr_.set(port, mcast_addr) == -1) 00056 ACE_ERROR_RETURN((LM_ERROR, "InfoRepoMulticastResponder::init() %p\n", ACE_TEXT("set")), -1); 00057 00058 return common_init(orb); 00059 }
bool OpenDDS::Federator::InfoRepoMulticastResponder::initialized_ [private] |
Are we initialized?
Definition at line 76 of file InfoRepoMulticastResponder.h.
Referenced by common_init(), init(), and ~InfoRepoMulticastResponder().
multicast endpoint of communication
Definition at line 82 of file InfoRepoMulticastResponder.h.
Referenced by common_init(), get_handle(), handle_input(), and ~InfoRepoMulticastResponder().
Definition at line 93 of file InfoRepoMulticastResponder.h.
Referenced by common_init(), and init().
The ORB.
Definition at line 79 of file InfoRepoMulticastResponder.h.
Referenced by common_init(), and handle_input().
socket for response to the multicast
Definition at line 91 of file InfoRepoMulticastResponder.h.
Referenced by common_init().
address of response.
Definition at line 88 of file InfoRepoMulticastResponder.h.
Referenced by common_init().