TcpConnection.cpp

Go to the documentation of this file.
00001 /*
00002  *
00003  *
00004  * Distributed under the OpenDDS License.
00005  * See: http://www.opendds.org/license.html
00006  */
00007 
00008 #include "Tcp_pch.h"
00009 #include "TcpConnection.h"
00010 #include "TcpTransport.h"
00011 #include "TcpInst.h"
00012 #include "TcpDataLink.h"
00013 #include "TcpReceiveStrategy.h"
00014 #include "TcpSendStrategy.h"
00015 #include "TcpReconnectTask.h"
00016 #include "dds/DCPS/transport/framework/DirectPriorityMapper.h"
00017 #include "dds/DCPS/transport/framework/PriorityKey.h"
00018 
00019 #include "ace/os_include/netinet/os_tcp.h"
00020 #include "ace/OS_NS_arpa_inet.h"
00021 #include "ace/OS_NS_unistd.h"
00022 #include <sstream>
00023 #include <string>
00024 
00025 #if !defined (__ACE_INLINE__)
00026 #include "TcpConnection.inl"
00027 #endif /* __ACE_INLINE__ */
00028 
00029 OPENDDS_BEGIN_VERSIONED_NAMESPACE_DECL
00030 
00031 // The connection lost can be detected by both send and receive strategy. When
00032 // that happens, both of them add a request to the reconnect task. The reconnect
00033 // will be attempted when the first request is dequeued and the second request
00034 // just look the state to determine if the connection is good. To distinguish
00035 // if the request is queued because the lost connection is detected by different
00036 // threads or is because the re-established connection lost again, we need the
00037 // reconnect_delay to help to identify these two cases so we can reset the reconnect
00038 // state to trigger reconnecting after a re-established connection is lost.
00039 
00040 // The reconnect delay is the period from the last time the reconnect attempt
00041 // completes to when the reconnect request is dequeued.
00042 const ACE_Time_Value reconnect_delay(2);
00043 
00044 OpenDDS::DCPS::TcpConnection::TcpConnection()
00045   : connected_(false)
00046   , is_connector_(false)
00047   , tcp_config_(0)
00048   , passive_reconnect_timer_id_(-1)
00049   , reconnect_state_(INIT_STATE)
00050   , last_reconnect_attempted_(ACE_Time_Value::zero)
00051   , transport_priority_(0)  // TRANSPORT_PRIORITY.value default value - 0.
00052   , shutdown_(false)
00053   , passive_setup_(false)
00054   , passive_setup_buffer_(sizeof(ACE_UINT32))
00055   , transport_during_setup_(0)
00056   , id_(0)
00057   , reconnect_thread_(0)
00058 {
00059   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00060 
00061   this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00062 }
00063 
00064 OpenDDS::DCPS::TcpConnection::TcpConnection(const ACE_INET_Addr& remote_address,
00065                                             Priority priority,
00066                                             const TcpInst& config)
00067   : connected_(false)
00068   , is_connector_(true)
00069   , remote_address_(remote_address)
00070   , local_address_(config.local_address())
00071   , tcp_config_(&config)
00072   , passive_reconnect_timer_id_(-1)
00073   , reconnect_state_(INIT_STATE)
00074   , last_reconnect_attempted_(ACE_Time_Value::zero)
00075   , transport_priority_(priority)
00076   , shutdown_(false)
00077   , passive_setup_(false)
00078   , transport_during_setup_(0)
00079   , id_(0)
00080   , reconnect_thread_(0)
00081 {
00082   DBG_ENTRY_LVL("TcpConnection","TcpConnection",6);
00083   this->reference_counting_policy().value(ACE_Event_Handler::Reference_Counting_Policy::ENABLED);
00084 
00085 }
00086 OpenDDS::DCPS::TcpConnection::~TcpConnection()
00087 {
00088   DBG_ENTRY_LVL("TcpConnection","~TcpConnection",6);
00089   if (reconnect_thread_ &&
00090     // This is for Windows, where join doesn't check if the thread is the same
00091     // and the thread will hang itself if it tries to join itself.
00092     !ACE_OS::thr_equal(ACE_OS::thr_self(), reconnect_thread_)
00093   ) {
00094     ACE_Thread_Manager::instance()->join(reconnect_thread_);
00095   }
00096 }
00097 
00098 OpenDDS::DCPS::TcpSendStrategy_rch
00099 OpenDDS::DCPS::TcpConnection::send_strategy()
00100 {
00101   return this->link_->send_strategy();
00102 }
00103 
00104 OpenDDS::DCPS::TcpReceiveStrategy_rch
00105 OpenDDS::DCPS::TcpConnection::receive_strategy()
00106 {
00107   return this->link_->receive_strategy();
00108 }
00109 
00110 void
00111 OpenDDS::DCPS::TcpConnection::disconnect()
00112 {
00113   DBG_ENTRY_LVL("TcpConnection","disconnect",6);
00114   this->connected_ = false;
00115   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00116   if (receive_strategy) {
00117     receive_strategy->get_reactor()->remove_handler(this,
00118                                                     ACE_Event_Handler::READ_MASK | ACE_Event_Handler::DONT_CALL);
00119   }
00120 
00121   if (this->link_) {
00122     this->link_->drop_pending_request_acks();
00123   }
00124 
00125   this->peer().close();
00126 
00127 }
00128 
00129 int
00130 OpenDDS::DCPS::TcpConnection::open(void* arg)
00131 {
00132   DBG_ENTRY_LVL("TcpConnection","open",6);
00133 
00134   if (is_connector_) {
00135 
00136     VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open active.\n"), 2);
00137     // Take over the refcount from TcpTransport::connect_datalink().
00138 
00139     TcpTransport& transport = static_cast<TcpTransport&>(link_->impl());
00140 
00141     const bool is_loop(local_address_ == remote_address_);
00142     const PriorityKey key(transport_priority_, remote_address_,
00143                           is_loop, false /* !active */);
00144 
00145     int active_open_ = active_open();
00146 
00147     int connect_tcp_datalink_ = transport.connect_tcp_datalink(*link_, rchandle_from(this));
00148 
00149     if (active_open_ == -1 || connect_tcp_datalink_ == -1) {
00150       // if (active_open() == -1 ||
00151       //       transport->connect_tcp_datalink(link_, self) == -1) {
00152 
00153       transport.async_connect_failed(key);
00154 
00155       return -1;
00156     }
00157 
00158     return 0;
00159   }
00160 
00161   // The passed-in arg is really the acceptor object that created this
00162   // TcpConnection object, and is also the caller of this open()
00163   // method.  We need to cast the arg to the TcpAcceptor* type.
00164   TcpAcceptor* acceptor = static_cast<TcpAcceptor*>(arg);
00165 
00166   if (acceptor == 0) {
00167     // The cast failed.
00168     ACE_ERROR_RETURN((LM_ERROR,
00169                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00170                       ACE_TEXT("failed to cast void* arg to ")
00171                       ACE_TEXT("TcpAcceptor* type.\n")),
00172                      -1);
00173   }
00174 
00175   TcpConnection_rch self(this, keep_count());
00176 
00177   // Now we need to ask the TcpAcceptor object to provide us with
00178   // a pointer to the TcpTransport object that "owns" the acceptor.
00179   TcpTransport* transport = acceptor->transport();
00180 
00181   if (!transport) {
00182     // The acceptor gave us a nil transport (smart) pointer.
00183     ACE_ERROR_RETURN((LM_ERROR,
00184                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00185                       ACE_TEXT("acceptor's transport is nil.\n")),
00186                      -1);
00187   }
00188 
00189   // Keep a "copy" of the reference to TcpInst object
00190   // for ourselves.
00191   tcp_config_ =  &acceptor->get_configuration();
00192   local_address_ = tcp_config_->local_address();
00193 
00194   set_sock_options(tcp_config_);
00195 
00196   // We expect that the active side of the connection (the remote side
00197   // in this case) will supply its listening ACE_INET_Addr as the first
00198   // message it sends to the socket.  This is a one-way connection
00199   // establishment protocol message.
00200   passive_setup_ = true;
00201   transport_during_setup_ = transport;
00202   passive_setup_buffer_.size(sizeof(ACE_UINT32));
00203 
00204   if (reactor()->register_handler(this, READ_MASK) == -1) {
00205     ACE_ERROR_RETURN((LM_ERROR,
00206                       ACE_TEXT("(%P|%t) ERROR: TcpConnection::open() - ")
00207                       ACE_TEXT("unable to register with the reactor.%p\n"),
00208                       ACE_TEXT("register_handler")),
00209                      -1);
00210   }
00211 
00212   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::open passive handle=%d.\n",
00213             static_cast<int>(intptr_t(get_handle()))), 2);
00214 
00215   return 0;
00216 }
00217 
00218 int
00219 OpenDDS::DCPS::TcpConnection::handle_setup_input(ACE_HANDLE /*h*/)
00220 {
00221   const ssize_t ret = peer().recv(passive_setup_buffer_.wr_ptr(),
00222                                   passive_setup_buffer_.space(),
00223                                   &ACE_Time_Value::zero);
00224 
00225   if (ret < 0 && errno == ETIME) {
00226     return 0;
00227   }
00228 
00229   VDBG_LVL((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input %@ "
00230             "recv returned %b %m.\n", this, ret), 4);
00231 
00232   if (ret <= 0) {
00233     return -1;
00234   }
00235 
00236   passive_setup_buffer_.wr_ptr(ret);
00237   // Parse the setup message: <len><addr><prio>
00238   // len and prio are network order 32-bit ints
00239   // addr is a string of length len, including null
00240   ACE_UINT32 nlen = 0;
00241 
00242   if (passive_setup_buffer_.length() >= sizeof(nlen)) {
00243 
00244     ACE_OS::memcpy(&nlen, passive_setup_buffer_.rd_ptr(), sizeof(nlen));
00245     passive_setup_buffer_.rd_ptr(sizeof(nlen));
00246     ACE_UINT32 hlen = ntohl(nlen);
00247     passive_setup_buffer_.size(hlen + 2 * sizeof(nlen));
00248 
00249     ACE_UINT32 nprio = 0;
00250 
00251     if (passive_setup_buffer_.length() >= hlen + sizeof(nprio)) {
00252 
00253       const std::string bufstr(passive_setup_buffer_.rd_ptr());
00254       const NetworkAddress network_order_address(bufstr);
00255       network_order_address.to_addr(remote_address_);
00256 
00257       ACE_OS::memcpy(&nprio, passive_setup_buffer_.rd_ptr() + hlen, sizeof(nprio));
00258       transport_priority_ = ntohl(nprio);
00259 
00260       passive_setup_buffer_.reset();
00261       passive_setup_ = false;
00262 
00263       VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00264             "%@ %C:%d->%C:%d, priority==%d, reconnect_state = %C\n", this,
00265             remote_address_.get_host_addr(), remote_address_.get_port_number(),
00266             local_address_.get_host_addr(), local_address_.get_port_number(),
00267             transport_priority_, reconnect_state_string().c_str()));
00268 
00269       // remove from reactor, normal recv strategy setup will add us back
00270       if (reactor()->remove_handler(this, READ_MASK | DONT_CALL) == -1) {
00271         VDBG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::handle_setup_input "
00272               "remove_handler failed %m.\n"));
00273       }
00274 
00275       transport_during_setup_->passive_connection(remote_address_, rchandle_from(this));
00276       connected_ = true;
00277 
00278       return 0;
00279     }
00280   }
00281 
00282   passive_setup_buffer_.rd_ptr(passive_setup_buffer_.base());
00283 
00284   return 0;
00285 }
00286 
00287 int
00288 OpenDDS::DCPS::TcpConnection::handle_input(ACE_HANDLE fd)
00289 {
00290   DBG_ENTRY_LVL("TcpConnection","handle_input",6);
00291 
00292   if (passive_setup_) {
00293     return handle_setup_input(fd);
00294   }
00295   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00296   if (!receive_strategy) {
00297     return 0;
00298   }
00299 
00300   return receive_strategy->handle_dds_input(fd);
00301 }
00302 
00303 int
00304 OpenDDS::DCPS::TcpConnection::handle_output(ACE_HANDLE)
00305 {
00306   DBG_ENTRY_LVL("TcpConnection","handle_output",6);
00307   TcpSendStrategy_rch send_strategy = this->send_strategy();
00308   if (send_strategy) {
00309     if (DCPS_debug_level > 9) {
00310       ACE_DEBUG((LM_DEBUG,
00311                  ACE_TEXT("(%P|%t) TcpConnection::handle_output() [%d] - ")
00312                  ACE_TEXT("sending queued data.\n"),
00313                  id_));
00314     }
00315 
00316     // Process data to be sent from the queue.
00317     if (ThreadSynchWorker::WORK_OUTCOME_MORE_TO_DO
00318         != send_strategy->perform_work()) {
00319 
00320       // Stop handling output ready events when there is nothing to output.
00321       // N.B. This calls back into the reactor.  Is the reactor lock
00322       //      recursive?
00323       send_strategy->schedule_output();
00324     }
00325   }
00326 
00327   return 0;
00328 }
00329 
00330 int
00331 OpenDDS::DCPS::TcpConnection::close(u_long)
00332 {
00333   DBG_ENTRY_LVL("TcpConnection","close",6);
00334 
00335   // TBD SOON - Find out exactly when close() is called.
00336   //            I have no clue when and who might call this.
00337   TcpSendStrategy_rch send_strategy = this->send_strategy();
00338   if (send_strategy)
00339     send_strategy->terminate_send();
00340 
00341   this->disconnect();
00342 
00343   return 0;
00344 }
00345 
00346 const std::string&
00347 OpenDDS::DCPS::TcpConnection::config_name() const
00348 {
00349   return this->link_->impl().config().name();
00350 }
00351 
00352 int
00353 OpenDDS::DCPS::TcpConnection::handle_close(ACE_HANDLE, ACE_Reactor_Mask)
00354 {
00355   DBG_ENTRY_LVL("TcpConnection","handle_close",6);
00356 
00357   if (DCPS_debug_level >= 1) {
00358     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_close() called on transport: %C to %C:%d.\n",
00359                this->config_name().c_str(),
00360                this->remote_address_.get_host_addr(),
00361                this->remote_address_.get_port_number()));
00362   }
00363 
00364   TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00365   bool graceful = receive_strategy && receive_strategy->gracefully_disconnected();
00366 
00367   TcpSendStrategy_rch send_strategy = this->send_strategy();
00368   if (send_strategy) {
00369     if (graceful) {
00370       send_strategy->terminate_send();
00371     } else {
00372       send_strategy->suspend_send();
00373     }
00374   }
00375 
00376   this->disconnect();
00377 
00378   if (graceful) {
00379     this->link_->notify(DataLink::DISCONNECTED);
00380   } else {
00381     this->spawn_reconnect_thread();
00382   }
00383 
00384   return 0;
00385 }
00386 
00387 void
00388 OpenDDS::DCPS::TcpConnection::set_sock_options(const TcpInst* tcp_config)
00389 {
00390 #if defined (ACE_DEFAULT_MAX_SOCKET_BUFSIZ)
00391   int snd_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00392   int rcv_size = ACE_DEFAULT_MAX_SOCKET_BUFSIZ;
00393   //ACE_SOCK_Stream sock = ACE_static_cast(ACE_SOCK_Stream, this->peer() );
00394 #  if !defined (ACE_LACKS_SOCKET_BUFSIZ)
00395 
00396   // A little screwy double negative logic: disabling nagle involves
00397   // enabling TCP_NODELAY
00398   int opt = (tcp_config->enable_nagle_algorithm_ == false);
00399 
00400   if (this->peer().set_option(IPPROTO_TCP, TCP_NODELAY, &opt, sizeof(opt)) == -1) {
00401     ACE_ERROR((LM_ERROR, "Failed to set TCP_NODELAY\n"));
00402   }
00403 
00404   if (this->peer().set_option(SOL_SOCKET,
00405                               SO_SNDBUF,
00406                               (void *) &snd_size,
00407                               sizeof(snd_size)) == -1
00408       && errno != ENOTSUP) {
00409     ACE_ERROR((LM_ERROR,
00410                "(%P|%t) TcpConnection failed to set the send buffer size to %d errno %m\n",
00411                snd_size));
00412     return;
00413   }
00414 
00415   if (this->peer().set_option(SOL_SOCKET,
00416                               SO_RCVBUF,
00417                               (void *) &rcv_size,
00418                               sizeof(int)) == -1
00419       && errno != ENOTSUP) {
00420     ACE_ERROR((LM_ERROR,
00421                "(%P|%t) TcpConnection failed to set the receive buffer size to %d errno %m \n",
00422                rcv_size));
00423     return;
00424   }
00425 
00426 #  else
00427   ACE_UNUSED_ARG(tcp_config);
00428   ACE_UNUSED_ARG(snd_size);
00429   ACE_UNUSED_ARG(rcv_size);
00430 #  endif /* !ACE_LACKS_SOCKET_BUFSIZ */
00431 
00432 #else
00433   ACE_UNUSED_ARG(tcp_config);
00434 #endif /* !ACE_DEFAULT_MAX_SOCKET_BUFSIZ */
00435 }
00436 
00437 int
00438 OpenDDS::DCPS::TcpConnection::active_establishment(bool initiate_connect)
00439 {
00440   DBG_ENTRY_LVL("TcpConnection","active_establishment",6);
00441 
00442   // Safety check - This should not happen since is_connector_ defaults to
00443   // true and the role in a connection connector is not changed when reconnecting.
00444   if (this->is_connector_ == false) {
00445     ACE_ERROR_RETURN((LM_ERROR,
00446                       "(%P|%t) ERROR: Failed to connect because it's previously an acceptor.\n"),
00447                      -1);
00448   }
00449 
00450   if (this->shutdown_)
00451     return -1;
00452 
00453   // Now use a connector object to establish the connection.
00454   ACE_SOCK_Connector connector;
00455 
00456   if (initiate_connect && connector.connect(this->peer(), remote_address_) != 0) {
00457 
00458     ACE_DEBUG((LM_DEBUG,
00459                       ACE_TEXT("(%P|%t) DBG: Failed to connect. this->shutdown_=%d %p\n%C"),
00460                       int(this->shutdown_), ACE_TEXT("connect"), this->tcp_config_->dump_to_str().c_str()));
00461                       return -1;
00462 
00463   } else {
00464     this->connected_ = true;
00465     const std::string remote_host = this->remote_address_.get_host_addr();
00466     VDBG((LM_DEBUG, "(%P|%t) DBG:   active_establishment(%C:%d->%C:%d)\n",
00467           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00468           remote_host.c_str(), this->remote_address_.get_port_number()));
00469   }
00470 
00471   // Set the DiffServ codepoint according to the priority value.
00472   DirectPriorityMapper mapper(this->transport_priority_);
00473   this->link_->set_dscp_codepoint(mapper.codepoint(), this->peer());
00474 
00475   set_sock_options(tcp_config_);
00476 
00477   // In order to complete the connection establishment from the active
00478   // side, we need to tell the remote side about our public address.
00479   // It will use that as an "identifier" of sorts.  To the other
00480   // (passive) side, our local_address that we send here will be known
00481   // as the remote_address.
00482   std::string address = tcp_config_->get_public_address();
00483   ACE_UINT32 len = static_cast<ACE_UINT32>(address.length()) + 1;
00484 
00485   ACE_UINT32 nlen = htonl(len);
00486 
00487   if (this->peer().send_n(&nlen,
00488                           sizeof(ACE_UINT32)) == -1) {
00489     // TBD later - Anything we are supposed to do to close the connection.
00490     ACE_ERROR_RETURN((LM_ERROR,
00491                       "(%P|%t) ERROR: Unable to send address string length to "
00492                       "the passive side to complete the active connection "
00493                       "establishment.\n"),
00494                      -1);
00495   }
00496 
00497   if (this->peer().send_n(address.c_str(), len)  == -1) {
00498     // TBD later - Anything we are supposed to do to close the connection.
00499     ACE_ERROR_RETURN((LM_ERROR,
00500                       "(%P|%t) ERROR: Unable to send our address to "
00501                       "the passive side to complete the active connection "
00502                       "establishment.\n"),
00503                      -1);
00504   }
00505 
00506   ACE_UINT32 npriority = htonl(this->transport_priority_);
00507 
00508   if (this->peer().send_n(&npriority, sizeof(ACE_UINT32)) == -1) {
00509     // TBD later - Anything we are supposed to do to close the connection.
00510     ACE_ERROR_RETURN((LM_ERROR,
00511                       "(%P|%t) ERROR: Unable to send publication priority to "
00512                       "the passive side to complete the active connection "
00513                       "establishment.\n"),
00514                      -1);
00515   }
00516 
00517   return 0;
00518 }
00519 
00520 /// This function is called to re-establish the connection. If this object
00521 /// is the connector side of the connection then it tries to reconnect to the
00522 /// remote, if it's the acceptor side of the connection then it schedules a timer
00523 /// to check if it passively accepted a connection from remote.
00524 /// The on_new_association true indicates this is called when the connection is
00525 /// previous lost and new association is added. The connector side needs to try to
00526 /// actively reconnect to remote.
00527 int
00528 OpenDDS::DCPS::TcpConnection::reconnect(bool on_new_association)
00529 {
00530   DBG_ENTRY_LVL("TcpConnection","reconnect",6);
00531   if (DCPS_debug_level >= 1) {
00532     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect initiated on transport: %C to %C:%d.\n",
00533                this->config_name().c_str(),
00534                this->remote_address_.get_host_addr(),
00535                this->remote_address_.get_port_number()));
00536   }
00537 
00538   if (on_new_association) {
00539     if (DCPS_debug_level >= 1) {
00540       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect on new association\n"));
00541     }
00542     return this->active_reconnect_on_new_association();
00543   }
00544 
00545   // If on_new_association is false, it's called by the reconnect task.
00546   // We need make sure if the link release is pending. If does, do
00547   // not try to reconnect.
00548   else if (!this->link_->is_release_pending()) {
00549     if (DCPS_debug_level >= 1) {
00550       ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect release not currently pending\n"));
00551     }
00552     // Try to reconnect if it's connector previously.
00553     if (this->is_connector_ && this->active_reconnect_i() == -1) {
00554       if (DCPS_debug_level >= 1) {
00555         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is connector but active_reconnect_i failed\n"));
00556       }
00557       return -1;
00558     }
00559 
00560     // Schedule a timer to see if a incoming connection is accepted when timeout.
00561     else if (!this->is_connector_ && this->passive_reconnect_i() == -1) {
00562       if (DCPS_debug_level >= 1) {
00563         ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect is acceptor but passive_reconnect_i failed\n"));
00564       }
00565       return -1;
00566     }
00567 
00568   }
00569   if (DCPS_debug_level >= 1) {
00570     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::reconnect returning 0\n"));
00571   }
00572   return 0;
00573 }
00574 
00575 int
00576 OpenDDS::DCPS::TcpConnection::active_open()
00577 {
00578   DBG_ENTRY_LVL("TcpConnection","active_open",6);
00579 
00580   GuardType guard(reconnect_lock_);
00581   if (this->shutdown_)
00582     return -1;
00583 
00584   if (connected_.value()) {
00585     return 0;
00586   }
00587 
00588   return active_establishment(false /* !initiate_connect */);
00589 }
00590 
00591 int
00592 OpenDDS::DCPS::TcpConnection::active_reconnect_on_new_association()
00593 {
00594   DBG_ENTRY_LVL("TcpConnection","active_reconnect_on_new_association",6);
00595   GuardType guard(this->reconnect_lock_);
00596   if (this->shutdown_)
00597     return -1;
00598 
00599   if (this->connected_ == true)
00600     return 0;
00601 
00602   else if (this->active_establishment() == 0) {
00603     this->reconnect_state_ = INIT_STATE;
00604     TcpSendStrategy_rch send_strategy = this->send_strategy();
00605     if (send_strategy)
00606       send_strategy->resume_send();
00607     return 0;
00608   }
00609 
00610   return -1;
00611 }
00612 
00613 // This method is called on acceptor side when the lost connection is detected.
00614 // A timer is scheduled to check if a new connection is created within the
00615 // passive_reconnect_duration_ period.
00616 int
00617 OpenDDS::DCPS::TcpConnection::passive_reconnect_i()
00618 {
00619   DBG_ENTRY_LVL("TcpConnection","passive_reconnect_i",6);
00620   GuardType guard(this->reconnect_lock_);
00621   if (this->shutdown_)
00622     return -1;
00623 
00624   // The passive_reconnect_timer_id_ is used as flag to allow the timer scheduled just once.
00625   if (this->reconnect_state_ == INIT_STATE) {
00626     // Mark the connection lost since the recv/send just failed.
00627     this->connected_ = false;
00628 
00629     if (this->tcp_config_->passive_reconnect_duration_ == 0)
00630       return -1;
00631 
00632     ACE_Time_Value timeout(this->tcp_config_->passive_reconnect_duration_/1000,
00633                            this->tcp_config_->passive_reconnect_duration_%1000 * 1000);
00634     this->reconnect_state_ = PASSIVE_WAITING_STATE;
00635     this->link_->notify(DataLink::DISCONNECTED);
00636 
00637     // It is possible that the passive reconnect is called after the new connection
00638     // is accepted and the receive_strategy of this old connection is reset to nil.
00639     TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00640     if (this->receive_strategy()) {
00641 
00642       // Give a copy to reactor.
00643       this->passive_reconnect_timer_id_ = receive_strategy->get_reactor()->schedule_timer(this, 0, timeout);
00644 
00645       if (this->passive_reconnect_timer_id_ == -1) {
00646         ACE_ERROR_RETURN((LM_ERROR,
00647                           ACE_TEXT("(%P|%t) ERROR: TcpConnection::passive_reconnect_i")
00648                           ACE_TEXT(", %p.\n"), ACE_TEXT("schedule_timer")),
00649                          -1);
00650       }
00651     }
00652   }
00653 
00654   return 0;
00655 }
00656 
00657 // This is the active reconnect implementation. The backoff algorithm is used as the
00658 // reconnect strategy. e.g.
00659 // With conn_retry_initial_interval = 500, conn_retry_backoff_multiplier = 2.0 and
00660 // conn_retry_attempts = 6 the reconnect attempts will be:
00661 // - first at 0 seconds(upon detection of the disconnect)
00662 // - second at 0.5 seconds
00663 // - third at 1.0 (2*0.5) seconds
00664 // - fourth at 2.0 (2*1.0) seconds
00665 // - fifth at 4.0 (2*2.0) seconds
00666 // - sixth at  8.0 (2*4.0) seconds
00667 int
00668 OpenDDS::DCPS::TcpConnection::active_reconnect_i()
00669 {
00670   DBG_ENTRY_LVL("TcpConnection","active_reconnect_i",6);
00671 
00672   GuardType guard(this->reconnect_lock_);
00673   if (this->shutdown_)
00674     return -1;
00675 
00676   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00677         "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00678         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00679         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00680         this->reconnect_state_string().c_str()));
00681   if (DCPS_debug_level >= 1) {
00682     ACE_DEBUG((LM_DEBUG, "(%P|%t) DBG:   TcpConnection::"
00683           "active_reconnect_i(%C:%d->%C:%d) reconnect_state = %C\n",
00684           this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00685           this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00686           this->reconnect_state_string().c_str()));
00687   }
00688   // We need reset the state to INIT_STATE if we are previously reconnected.
00689   // This would allow re-establishing connection after the re-established
00690   // connection lost again.
00691   if (ACE_OS::gettimeofday() - this->last_reconnect_attempted_ > reconnect_delay
00692       && this->reconnect_state_ == RECONNECTED_STATE) {
00693     VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00694           "We are in RECONNECTED_STATE and now flip reconnect state to INIT_STATE.\n"));
00695     this->reconnect_state_ = INIT_STATE;
00696   }
00697 
00698   if (this->reconnect_state_ == INIT_STATE) {
00699     // Suspend send once.
00700     TcpSendStrategy_rch send_strategy = this->send_strategy();
00701     if (send_strategy)
00702       send_strategy->suspend_send();
00703     this->reconnect_lock_.release();
00704     this->disconnect();
00705     this->reconnect_lock_.acquire();
00706 
00707     if (this->shutdown_)
00708       return -1;
00709 
00710     if (this->tcp_config_->conn_retry_attempts_ > 0) {
00711       this->link_->notify(DataLink::DISCONNECTED);
00712     }
00713 
00714     // else the conn_retry_attempts is 0 then we do not need this extra
00715     // notify_disconnected() since the user application should get the
00716     // notify_lost() without delay.
00717 
00718     double retry_delay_msec = this->tcp_config_->conn_retry_initial_delay_;
00719     int ret = -1;
00720 
00721     for (int i = 0; i < this->tcp_config_->conn_retry_attempts_; ++i) {
00722       ret = this->active_establishment();
00723 
00724       if (this->shutdown_)
00725         break;
00726 
00727       if (ret == -1) {
00728         ACE_Time_Value delay_tv(((int)retry_delay_msec)/1000,
00729                                 ((int)retry_delay_msec)%1000*1000);
00730         ACE_OS::sleep(delay_tv);
00731         retry_delay_msec *= this->tcp_config_->conn_retry_backoff_multiplier_;
00732 
00733       } else {
00734         break;
00735       }
00736     }
00737 
00738     if (ret == -1) {
00739       if (this->tcp_config_->conn_retry_attempts_ > 0) {
00740         ACE_DEBUG((LM_DEBUG, "(%P|%t) we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00741                    this->config_name().c_str(),
00742                    this->remote_address_.get_host_addr(),
00743                    this->remote_address_.get_port_number()));
00744 
00745       } else {
00746         ACE_DEBUG((LM_DEBUG, "(%P|%t) we did not try to re-establish connection on transport: %C to %C:%d.\n",
00747                    this->config_name().c_str(),
00748                    this->remote_address_.get_host_addr(),
00749                    this->remote_address_.get_port_number()));
00750       }
00751 
00752       this->reconnect_state_ = LOST_STATE;
00753       this->link_->notify(DataLink::LOST);
00754       if (send_strategy)
00755         send_strategy->terminate_send();
00756 
00757     } else {
00758       ACE_DEBUG((LM_DEBUG, "(%P|%t) re-established connection on transport: %C to %C:%d.\n",
00759                  this->config_name().c_str(),
00760                  this->remote_address_.get_host_addr(),
00761                  this->remote_address_.get_port_number()));
00762       TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00763       if (receive_strategy->get_reactor()->register_handler(this, ACE_Event_Handler::READ_MASK) == -1) {
00764         ACE_ERROR_RETURN((LM_ERROR,
00765                           "(%P|%t) ERROR: OpenDDS::DCPS::TcpConnection::active_reconnect_i() can't register "
00766                           "with reactor %X %p\n", this, ACE_TEXT("register_handler")),
00767                          -1);
00768       }
00769       this->reconnect_state_ = RECONNECTED_STATE;
00770       this->link_->notify(DataLink::RECONNECTED);
00771       send_strategy->resume_send();
00772     }
00773 
00774     this->last_reconnect_attempted_ = ACE_OS::gettimeofday();
00775   }
00776 
00777   return this->reconnect_state_ == LOST_STATE ? -1 : 0;
00778 }
00779 
00780 /// A timer is scheduled on acceptor side to check if a new connection
00781 /// is accepted after the connection is lost.
00782 int
00783 OpenDDS::DCPS::TcpConnection::handle_timeout(const ACE_Time_Value &,
00784                                              const void *)
00785 {
00786   DBG_ENTRY_LVL("TcpConnection","handle_timeout",6);
00787 
00788   GuardType guard(this->reconnect_lock_);
00789 
00790   switch (this->reconnect_state_) {
00791   case PASSIVE_WAITING_STATE: {
00792     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, we tried and failed to re-establish connection on transport: %C to %C:%d.\n",
00793                this->config_name().c_str(),
00794                this->remote_address_.get_host_addr(),
00795                this->remote_address_.get_port_number()));
00796 
00797     this->reconnect_state_ = PASSIVE_TIMEOUT_CALLED_STATE;
00798     // We stay in PASSIVE_TIMEOUT_CALLED_STATE indicates there is no new connection.
00799     // Now we need declare the connection is lost.
00800     this->link_->notify(DataLink::LOST);
00801 
00802     // The handle_timeout may be called after the connection is re-established
00803     // and the send strategy of this old connection is reset to nil.
00804     TcpSendStrategy_rch send_strategy = this->send_strategy();
00805     if (send_strategy)
00806       send_strategy->terminate_send();
00807 
00808     this->reconnect_state_ = LOST_STATE;
00809 
00810     this->tear_link();
00811 
00812   }
00813   break;
00814 
00815   case RECONNECTED_STATE:
00816     // reconnected successfully.
00817     ACE_DEBUG((LM_DEBUG, "(%P|%t) TcpConnection::handle_timeout, re-established connection on transport: %C to %C:%d.\n",
00818                this->config_name().c_str(),
00819                this->remote_address_.get_host_addr(),
00820                this->remote_address_.get_port_number()));
00821     break;
00822 
00823   default :
00824     ACE_ERROR((LM_ERROR,
00825                ACE_TEXT("(%P|%t) ERROR: TcpConnection::handle_timeout, ")
00826                ACE_TEXT(" unknown state or it should not be in state=%d \n"),
00827                reconnect_state_));
00828     break;
00829   }
00830 
00831   return 0;
00832 }
00833 
00834 /// This object would be "old" connection object and the provided is the new
00835 /// connection object.  The "old" connection object will copy its states to
00836 /// to the "new" connection object. This is called by the TcpDataLink
00837 /// when a new connection is accepted (with a new TcpConnection object).
00838 /// We need make the state in "new" connection object consistent with the "old"
00839 /// connection object.
00840 void
00841 OpenDDS::DCPS::TcpConnection::transfer(TcpConnection* connection)
00842 {
00843   DBG_ENTRY_LVL("TcpConnection","transfer",6);
00844 
00845   GuardType guard(this->reconnect_lock_);
00846 
00847   bool notify_reconnect = false;
00848 
00849   switch (this->reconnect_state_) {
00850   case INIT_STATE:
00851     // We have not detected the lost connection and the peer is faster than us and
00852     // re-established the connection. so do not notify reconnected.
00853     break;
00854 
00855   case LOST_STATE:
00856 
00857     // The reconnect timed out.
00858   case PASSIVE_TIMEOUT_CALLED_STATE:
00859     // TODO: If the handle_timeout is called before the old connection
00860     // transfer its state to new connection then should we disconnect
00861     // the new connection or keep it alive ?
00862     // I think we should keep the connection, the user will get a
00863     // lost connection notification and then a reconnected notification.
00864     notify_reconnect = true;
00865     break;
00866 
00867   case PASSIVE_WAITING_STATE: {
00868     // Cancel the timer since we got new connection.
00869     TcpReceiveStrategy_rch receive_strategy = this->receive_strategy();
00870     if (receive_strategy->get_reactor()->cancel_timer(this) == -1) {
00871       ACE_ERROR((LM_ERROR,
00872                  ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00873                  ACE_TEXT(" %p. \n"), ACE_TEXT("cancel_timer")));
00874 
00875     } else
00876       passive_reconnect_timer_id_ = -1;
00877 
00878     notify_reconnect = true;
00879   }
00880   break;
00881 
00882   default :
00883     ACE_ERROR((LM_ERROR,
00884                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00885                ACE_TEXT(" unknown state or it should not be in state=%i \n"),
00886                reconnect_state_));
00887     break;
00888   }
00889 
00890   // Verify if this acceptor side.
00891   if (this->is_connector_ || connection->is_connector_) {
00892     ACE_ERROR((LM_ERROR,
00893                ACE_TEXT("(%P|%t) ERROR: TcpConnection::transfer, ")
00894                ACE_TEXT(" should NOT be called by the connector side \n")));
00895   }
00896 
00897   if (reconnect_thread_) {
00898       ACE_Thread_Manager::instance()->join(reconnect_thread_);
00899       reconnect_thread_ = 0;
00900   }
00901   // connection->receive_strategy_ = this->receive_strategy_;
00902   // connection->send_strategy_ = this->send_strategy_;
00903   connection->remote_address_ = this->remote_address_;
00904   connection->local_address_ = this->local_address_;
00905   connection->tcp_config_ = this->tcp_config_;
00906   connection->link_ = this->link_;
00907 
00908   VDBG((LM_DEBUG, "(%P|%t) DBG:   "
00909         "transfer(%C:%d->%C:%d) passive reconnected. new con %@   "
00910         " old con %@ \n",
00911         this->remote_address_.get_host_addr(), this->remote_address_.get_port_number(),
00912         this->local_address_.get_host_addr(), this->local_address_.get_port_number(),
00913         connection, this));
00914 
00915   if (notify_reconnect) {
00916     this->reconnect_state_ = RECONNECTED_STATE;
00917     this->link_->notify(DataLink::RECONNECTED);
00918   }
00919 
00920 }
00921 
00922 /// This function is called when the backpressure occurs and timed out after
00923 /// "max_output_pause_period". The lost connection notification should be sent
00924 /// and the connection needs be closed since we declared it as a "lost"
00925 /// connection.
00926 void
00927 OpenDDS::DCPS::TcpConnection::notify_lost_on_backpressure_timeout()
00928 {
00929   DBG_ENTRY_LVL("TcpConnection","notify_lost_on_backpressure_timeout",6);
00930   bool notify_lost = false;
00931   {
00932     GuardType guard(this->reconnect_lock_);
00933 
00934     if (this->reconnect_state_ == INIT_STATE) {
00935       this->reconnect_state_ = LOST_STATE;
00936       notify_lost = true;
00937 
00938     }
00939   }
00940 
00941   if (notify_lost) {
00942     this->disconnect();
00943     this->link_->notify(DataLink::LOST);
00944 
00945     TcpSendStrategy_rch send_strategy = this->send_strategy();
00946     if (send_strategy)
00947       send_strategy->terminate_send();
00948   }
00949 
00950 }
00951 
00952 /// This is called by TcpSendStrategy when a send fails
00953 /// and a reconnect should be initiated. This method
00954 /// suspends any sends and kicks the reconnect thread into
00955 /// action.
00956 void
00957 OpenDDS::DCPS::TcpConnection::relink_from_send(bool do_suspend)
00958 {
00959   DBG_ENTRY_LVL("TcpConnection","relink_from_send",6);
00960 
00961   TcpSendStrategy_rch send_strategy = this->send_strategy();
00962   if (do_suspend && send_strategy)
00963     send_strategy->suspend_send();
00964 
00965   this->spawn_reconnect_thread();
00966 }
00967 
00968 /// This is called by TcpReceiveStrategy when a disconnect
00969 /// is detected.  It simply suspends any sends and lets
00970 /// the handle_close() handle the reconnect logic.
00971 void
00972 OpenDDS::DCPS::TcpConnection::relink_from_recv(bool do_suspend)
00973 {
00974   DBG_ENTRY_LVL("TcpConnection","relink_from_recv",6);
00975   TcpSendStrategy_rch send_strategy = this->send_strategy();
00976   if (do_suspend && send_strategy)
00977     send_strategy->suspend_send();
00978 }
00979 
00980 bool
00981 OpenDDS::DCPS::TcpConnection::tear_link()
00982 {
00983   DBG_ENTRY_LVL("TcpConnection","tear_link",6);
00984 
00985   return this->link_->release_resources();
00986 }
00987 
00988 void
00989 OpenDDS::DCPS::TcpConnection::shutdown()
00990 {
00991   DBG_ENTRY_LVL("TcpConnection","shutdown",6);
00992   GuardType guard(this->reconnect_lock_);
00993   this->shutdown_ = true;
00994   ACE_Svc_Handler<ACE_SOCK_STREAM, ACE_NULL_SYNCH>::shutdown();
00995 
00996 }
00997 
00998 ACE_Event_Handler::Reference_Count
00999 OpenDDS::DCPS::TcpConnection::add_reference()
01000 {
01001   RcObject::_add_ref();
01002   return 1;
01003 }
01004 
01005 ACE_Event_Handler::Reference_Count
01006 OpenDDS::DCPS::TcpConnection::remove_reference()
01007 {
01008   RcObject::_remove_ref();
01009   return 1;
01010 }
01011 
01012 void
01013 OpenDDS::DCPS::TcpConnection::spawn_reconnect_thread()
01014 {
01015   DBG_ENTRY_LVL("TcpConnection","spawn_reconnect_thread",6);
01016   GuardType guard(this->reconnect_lock_);
01017   if (!shutdown_) {
01018     // Make sure the associated transport_config outlives the connection object.
01019     TransportInst& transport_config = this->link_->impl().config();
01020     transport_config._add_ref();
01021     // add the reference count to be picked up from the new thread
01022     this->_add_ref();
01023     if (ACE_Thread_Manager::instance()->spawn(&reconnect_thread_fun,
01024                                               this,
01025                                               THR_NEW_LWP|THR_JOINABLE|THR_INHERIT_SCHED,
01026                                               &reconnect_thread_) == -1){
01027       // we need to decrement the reference count when thread creation fails.
01028       this->_remove_ref();
01029       transport_config._remove_ref();
01030     }
01031   }
01032 }
01033 
01034 ACE_THR_FUNC_RETURN
01035 OpenDDS::DCPS::TcpConnection::reconnect_thread_fun(void* arg)
01036 {
01037   DBG_ENTRY_LVL("TcpConnection","reconnect_thread_fun",6);
01038 
01039   // Ignore all signals to avoid
01040   //     ERROR: <something descriptive> Interrupted system call
01041   // The main thread will handle signals.
01042   sigset_t set;
01043   ACE_OS::sigfillset(&set);
01044   ACE_OS::thr_sigsetmask(SIG_SETMASK, &set, NULL);
01045 
01046   // Make sure the associated transport_config outlives the connection object.
01047   RcHandle<TransportInst> transport_config;
01048   TcpConnection_rch connection(static_cast<TcpConnection*>(arg), keep_count());
01049   transport_config = RcHandle<TransportInst>(&connection->link_->impl().config(), keep_count());
01050 
01051   if (connection->reconnect() == -1) {
01052     connection->tear_link();
01053   }
01054 
01055   return 0;
01056 }
01057 
01058 OPENDDS_STRING
01059 OpenDDS::DCPS::TcpConnection::reconnect_state_string() const
01060 {
01061   switch (reconnect_state_) {
01062   case INIT_STATE:
01063     return "INIT_STATE";
01064   case LOST_STATE:
01065     return "LOST_STATE";
01066   case RECONNECTED_STATE:
01067     return "RECONENCTED_STATE";
01068   case PASSIVE_WAITING_STATE:
01069     return "PASSIVE_WAITING_STATE";
01070   case PASSIVE_TIMEOUT_CALLED_STATE:
01071     return "PASSIVE_TIMEOUT_CALLED_STATE";
01072   default:
01073     ACE_ERROR((LM_ERROR, ACE_TEXT(
01074       "OpenDDS::DCPS::TcpConnection::reconnect_state_string(): "
01075       "%d is either completely invalid or at least not defined in this function.\n"),
01076       reconnect_state_
01077     ));
01078     return OPENDDS_STRING("(Unknown Reconnect State: ")
01079       + to_dds_string(reconnect_state_) + ")";
01080   }
01081 }
01082 
01083 OPENDDS_END_VERSIONED_NAMESPACE_DECL
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Enumerator Friends Defines

Generated on 10 Aug 2018 for OpenDDS by  doxygen 1.6.1